方案:
- 使用
HAproxy
:当其中一台ElasticSearch Master
宕掉时,ElasticSearch集群
会自动将运行正常的节点提升为Master
,但HAproxy
不会将失败的请求重新分发到新的Master Node
。 - 使用
ElasticSearch
:单search load balancer(外层负载均衡节点)
、双coordinator(调度节点)
、若干workhorse(数据节点)
。先后在200并发Index、200并发Update测试下(跑在虚拟机下,线程太多就卡爆了),并前后分别测试了Down掉一台主coordinator
、Down掉一台workhorse
,都没有引起数据异常,集群工作正常
。
先贴一下使用HAproxy
搭建集群失败的配置吧:
#全局配置global daemon nbproc 4 pidfile /tmp/haproxy.pid#默认配置defaults mode http #默认的模式mode { tcp|http|health },tcp是4层,http是7层,health只会返回OK retries 2 #两次连接失败就认为是服务器不可用,也可以通过后面设置 option redispatch #当serverId对应的服务器挂掉后,强制定向到其他健康的服务器 option httpclose #HAProxy会针对客户端的第一条请求的返回添加cookie并返回给客户端,客户端发送后续请求时会发送此cookie到HAProxy #option abortonclose #当服务器负载很高的时候,自动结束掉当前队列处理比较久的链接 maxconn 4096 #默认的最大连接数 timeout connect 5000ms #连接超时 timeout client 30000ms #客户端超时 timeout server 30000ms #服务器超时 timeout check 2000 #心跳检测超时 log 127.0.0.1 local0 err #[err warning info debug]#统计页面配置listen admin_stats bind 0.0.0.0:8888 #监听端口 mode http #http的7层模式 option httplog #采用http日志格式 #log 127.0.0.1 local0 err maxconn 10 stats refresh 30s #统计页面自动刷新时间 stats uri / #统计页面url stats realm XingCloud\ Haproxy #统计页面密码框上提示文本 stats auth admin:admin #统计页面用户名和密码设置 #stats hide-version #隐藏统计页面上HAProxy的版本信息#ElasticSearch Frontendfrontend eshttp bind 0.0.0.0:9200 mode tcp use_backend eshttp_server#ElasticSearch Backendbackend eshttp_server server eshttp1 vm12:9200 cookie 1 check inter 2000 rise 3 fall 3 weight 2 server eshttp2 vm13:9200 cookie 2 check inter 2000 rise 3 fall 3 weight 1 server eshttp3_bk vm14:9200 cookie 3 check inter 1000 rise 3 fall 3 backup
采用ElasticSearch
搭建集群的关键几个配置:
search load balancer(LB负载均衡节点):
cluster.name: harold #集群名称node.name: "harold_lb" #节点名称# 3. You want this node to be neither master nor data node, but# to act as a "search load balancer" (fetching data from nodes,# aggregating results, etc.)#node.master: falsenode.data: falsediscovery.zen.ping.unicast.hosts: ["vm11", "vm12", "vm13", "vm14", "vm15", "vm16"] #自动发现节点hosts
coordinator(Master调度节点):
cluster.name: harold #集群名称node.name: "harold_coordinator_1" #节点名称# 2. You want this node to only serve as a master: to not store any data and# to have free resources. This will be the "coordinator" of your cluster.#node.master: truenode.data: falsediscovery.zen.ping.unicast.hosts: ["vm11", "vm12", "vm13", "vm14", "vm15", "vm16"] #自动发现节点hosts
workhorse(Data数据节点):
cluster.name: harold #集群名称node.name: "harold_data_1" #节点名称# 1. You want this node to never become a master node, only to hold data.# This will be the "workhorse" of your cluster.#node.master: falsenode.data: truediscovery.zen.ping.unicast.hosts: ["vm11", "vm12", "vm13", "vm14", "vm15", "vm16"] #自动发现节点hosts
配置完,启动后,/_plugin/head/页面应该是这个样子:
这样配置完的集群应该就是类似这样的:
可以使用curl初始化Index的主分片
与复制分片
:
curl -XPUT -d'{"settings":{"number_of_shards":6, "number_of_replicas":1}}' http://vm11:9200/app1
Tip:
number_of_shards
主分片在集群中的总数量
number_of_replicas
每个主分片的复制分片数量 #复制分片在今后的分布式集群变化过程中,随时都可以根据业务进行新增或减少:curl -XPUT -d'{"number_of_replicas":2}' http://vm11:9200/app1/_settings
#另外,ElasticSearch在没有任何索引的情况下新增一个文档,便自动创建了索引,为避免发生这种情况,可以在配置文件中添加:action.auto_create_index: false
删除Index:
curl -XDELETE http://vm11:9200/app1
在当前版本中,这样组建集群会有一个小问题:
当单独把Master Coordinator
Down掉后,/_plugin/head/插件页面会是这个样子:
贴下PHP操作ElasticSearch的多进程并发测试代码吧,做下记录:
sum % $this->process !== 0 && die("invalid num. \n"); $this->num_per_proc = $this->sum / $this->process; } private function insert() { $es = new ClientBuilder(); $es->setHosts($this->hosts); $client = $es->build(); $words = str_split("abcdefghijklmnopqrstuvwxyz"); $birth_year = []; for ($i = 1; $i <= 50; $i++) { $birth_year[] = 1960 + $i; } $type = ['1', '2', '3', '4']; $process = []; for ($p = 0; $p < $this->process; $p++) { $process[] = new \swoole_process(function () use ($client, $birth_year, $type, $words, $p) { for ($i = $this->num_per_proc * $p; $i < $this->num_per_proc * ($p + 1); $i++) { $client->index([ 'index' => $this->index, 'type' => $this->type, 'id' => $i, 'body' => [ 'birth_year' => $birth_year[array_rand($birth_year)], 'type' => $type[array_rand($type)], 'name' => $words[mt_rand(0, 25)] . $words[mt_rand(0, 25)] . $words[mt_rand(0, 25)] . $words[mt_rand(0, 25)], 'height' => mt_rand(150, 200), 'weight' => mt_rand(40, 200), 'test' => 1, 'userid' => $i ] ]); } }); } foreach ($process as $p) { $pid = $p->start(); echo $pid . "\n"; } } private function update() { $es = new ClientBuilder(); $es->setHosts($this->hosts); $client = $es->build(); $process = []; for ($i = 0; $i < $this->process; $i++) { $process[] = new \swoole_process(function () use ($client, $i) { $response = $client->search([ 'index' => $this->index, 'type' => $this->type, 'size' => $this->num_per_proc, 'from' => $this->num_per_proc * $i, 'sort' => "userid:asc" ]); foreach ($response['hits']['hits'] as $v) { $id = $v['_id']; $test = $v['_source']['test']; $test++; file_put_contents("/tmp/s", $test . "\n", FILE_APPEND); $client->update([ 'index' => $this->index, 'type' => $this->type, 'id' => $id, 'body' => [ 'doc' => [ 'test' => $test ] ] ]); } }); } foreach ($process as $p) { $pid = $p->start(); echo $pid . "\n"; } } private function gets() { $es = new ClientBuilder(); $es->setHosts($this->hosts); $client = $es->build(); $response = $client->search([ 'index' => $this->index, 'type' => $this->type, 'size' => 5000, 'from' => 500, 'sort' => "userid:asc" ]); foreach ($response['hits']['hits'] as $v) { $id = $v['_id']; $test = $v['_source']['test']; // file_put_contents("/tmp/s", $test . "\n", FILE_APPEND); var_dump($test); } } /** * Execute the console command. * * @return mixed */ public function handle() { $this->insert(); }}