Elasticsearch分布式
1. Elasticsearch分布式特性
1.1 复杂分布式机制
Elasticsearch是一套分布式的系统,分布式是为了应对大数据量
隐藏了复杂的分布式机制
分片机制(我们之前随随便便就将一些document插入到es集群中去了,我们有没有care过数据怎么进行分片的,数据到哪个shard中去)
cluster discovery(集群发现机制,我们之前在做那个集群status从yellow转green的实验里,直接启动了第二个es进程,那个进程作为一个node自动就发现了集群,并且加入了进去,还接受了部分数据,replica shard)
shard负载均衡(举例,假设现在有3个节点,总共有25个shard要分配到3个节点上去,es会自动进行均匀分配,以保持每个节点的均衡的读写负载请求)
shard副本,请求路由,集群扩容,shard重分配
1.2 垂直扩容与水平扩容
垂直扩容:采购更强大的服务器,成本非常高昂,而且会有瓶颈,假设世界上最强大的服务器容量就是10T,但是当你的总数据量达到5000T的时候,你要采购多少台最强大的服务器啊
水平扩容:业界经常采用的方案,采购越来越多的普通服务器,性能比较一般,但是很多普通服务器组织在一起,就能构成强大的计算和存储能力
普通服务器:1T,1万,100万
强大服务器:10T,50万,500万
扩容对应用程序的透明性
1.3 rebalance
增减或减少节点时的数据rebalance
保持负载均衡
1.4 master节点
(1)创建或删除索引
(2)增加或删除节点
1.5 节点平等的分布式架构
(1)节点对等,每个节点都能接收所有的请求
(2)自动请求路由
(3)响应收集
2. replica&shard机制
2.1 shard分片
- index包含多个shard
- 每个shard都是一个最小工作单元,承载部分数据,lucene实例,完整的建立索引和处理请求的能力
- 增减节点时,shard会自动在nodes中负载均衡
- primary shard和replica shard,每个document肯定只存在于某一个primary shard以及其对应的replica shard中,不可能存在于多个primary shard
- replica shard是primary shard的副本,负责容错,以及承担读请求负载
- primary shard的数量在创建索引的时候就固定了,replica shard的数量可以随时修改
- primary shard的默认数量是5,replica默认是1,默认有10个shard,5个primary shard,5个replica shard
- primary shard不能和自己的replica shard放在同一个节点上(否则节点宕机,primary shard和副本都丢失,起不到容错的作用),但是可以和其他primary shard的replica shard放在同一个节点上

2.2 单Node平台的Index
- 单node环境下,创建一个index,有3个primary shard,3个replica shard
- 集群status是yellow
- 这个时候,只会将3个primary shard分配到仅有的一个node上去,另外3个replica shard是无法分配的
- 集群可以正常工作,但是一旦出现节点宕机,数据全部丢失,而且集群不可用,无法承接任何请求
PUT /test_index
{
"settings" : {
"number_of_shards" : 3,
"number_of_replicas" : 1
}
}

2.3 双Node平台的Index
(1)replica shard分配:3个primary shard,3个replica shard,1 node
(2)primary ---> replica同步
(3)读请求:primary/replica

2.4 扩容过程
- primary&replica自动负载均衡,6个shard,3 primary,3 replica
- 每个node有更少的shard,IO/CPU/Memory资源给每个shard分配更多,每个shard性能更好
- 扩容的极限,6个shard(3 primary,3 replica),最多扩容到6台机器,每个shard可以占用单台服务器的所有资源,性能最好
- 超出扩容极限,动态修改replica数量,9个shard(3primary,6 replica),扩容到9台机器,比3台机器时,拥有3倍的读吞吐量
- 3台机器下,9个shard(3 primary,6 replica),资源更少,但是容错性更好,最多容纳2台机器宕机,6个shard只能容纳1台机器宕机
- 这里的这些知识点,你综合起来看,就是说,一方面告诉你扩容的原理,怎么扩容,怎么提升系统整体吞吐量;另一方面要考虑到系统的容错性,怎么保证提高容错性,让尽可能多的服务器宕机,保证数据不丢失

2.5 容错过程
- 9 shard,3 node
- master node宕机,自动master选举,red
- replica容错:新master将replica提升为primary shard,yellow
- 重启宕机node,master copy replica到该node,使用原有的shard并同步宕机后的修改,green

3. Elasticsearch元数据
查询 document 数据
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 1,
"found": true,
"_source": {
"test_content": "test test"
}
}
3.1 _index元数据
- 代表一个document存放在哪个index中
- 类似的数据放在一个索引,非类似的数据放不同索引:product index(包含了所有的商品),sales index(包含了所有的商品销售数据),inventory index(包含了所有库存相关的数据)。如果你把比如product,sales,human resource(employee),全都放在一个大的index里面,比如说company index,不合适的。
- index中包含了很多类似的document:类似是什么意思,其实指的就是说,这些document的fields很大一部分是相同的,你说你放了3个document,每个document的fields都完全不一样,这就不是类似了,就不太适合放到一个index里面去了。
- 索引名称必须是小写的,不能用下划线开头,不能包含逗号:product,website,blog

3.2 _type元数据
- 代表document属于index中的哪个类别(type)
- 一个索引通常会划分为多个type,逻辑上对index中有些许不同的几类数据进行分类:因为一批相同的数据,可能有很多相同的fields,但是还是可能会有一些轻微的不同,可能会有少数fields是不一样的,举个例子,就比如说,商品,可能划分为电子商品,生鲜商品,日化商品,等等。
- type名称可以是大写或者小写,但是同时不能用下划线开头,不能包含逗号
3.3 _id元数据
- 代表document的唯一标识,与index和type一起,可以唯一标识和定位一个document
- 我们可以手动指定document的id(put /index/type/id),也可以不指定,由es自动为我们创建一个id
3.4 _id生成
3.4.1 手动指定document id
(1)根据应用情况来说,是否满足手动指定document id的前提:
一般来说,是从某些其他的系统中,导入一些数据到es时,会采取这种方式,就是使用系统中已有数据的唯一标识,作为es中document的id。举个例子,比如说,我们现在在开发一个电商网站,做搜索功能,或者是OA系统,做员工检索功能。这个时候,数据首先会在网站系统或者IT系统内部的数据库中,会先有一份,此时就肯定会有一个数据库的primary key(自增长,UUID,或者是业务编号)。如果将数据导入到es中,此时就比较适合采用数据在数据库中已有的primary key。
如果说,我们是在做一个系统,这个系统主要的数据存储就是es一种,也就是说,数据产生出来以后,可能就没有id,直接就放es一个存储,那么这个时候,可能就不太适合说手动指定document id的形式了,因为你也不知道id应该是什么,此时可以采取下面要讲解的让es自动生成id的方式。
(2)put /index/type/id
PUT /test_index/test_type/2
{
"test_content": "my test"
}
3.4.2 自动生成document id
(1)post /index/type
POST /test_index/test_type
{
"test_content": "my test"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "AVp4RN0bhjxldOOnBxaE",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
(2)自动生成的id,长度为20个字符,URL安全,base64编码,GUID,分布式系统并行生成时不可能会发生冲突

3.5 _source元数据
3.5.1 _source元数据
put /test_index/test_type/1
{
"test_field1": "test field1",
"test_field2": "test field2"
}
get /test_index/test_type/1
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field1": "test field1",
"test_field2": "test field2"
}
}
_source元数据:就是说,我们在创建一个document的时候,使用的那个放在request body中的json串,默认情况下,在get的时候,会原封不动的给我们返回回来。
3.5.2 定制返回结果
定制返回的结果,指定_source中,返回哪些field
GET /test_index/test_type/1?_source=test_field1,test_field2
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field2": "test field2"
}
}
3.6 document修改
3.6.1 document的全量替换
- 语法与创建文档是一样的,如果document id不存在,那么就是创建;如果document id已经存在,那么就是全量替换操作,替换document的json串内容
- document是不可变的,如果要修改document的内容,第一种方式就是全量替换,直接对document重新建立索引,替换里面所有的内容
- es会将老的document标记为deleted,然后新增我们给定的一个document,当我们创建越来越多的document的时候,es会在适当的时机在后台自动删除标记为deleted的document

3.6.2 document的强制创建
- 创建文档与全量替换的语法是一样的,有时我们只是想新建文档,不想替换文档,如果强制进行创建呢?
- PUT /index/type/id?op_type=create,PUT /index/type/id/_create
3.6.3 document的删除
- DELETE /index/type/id
- 不会理解物理删除,只会将其标记为deleted,当数据越来越多的时候,在后台自动删除
4. Elasticsearch并发控制
4.1 Elasticsearch并发问题

4.2 悲观锁和乐观锁

4.3 Elasticsearch中乐观锁
(1)_version元数据
PUT /test_index/test_type/6
{
"test_field": "test test"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "6",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
第一次创建一个document的时候,它的 _version 内部版本号就是1;以后,每次对这个document执行修改或者删除操作,都会对这个 _version 版本号自动加1;哪怕是删除,也会对这条数据的版本号加1
{
"found": true,
"_index": "test_index",
"_type": "test_type",
"_id": "6",
"_version": 4,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
}
}
我们会发现,在删除一个document之后,可以从一个侧面证明,它不是立即物理删除掉的,因为它的一些版本号等信息还是保留着的。先删除一条document,再重新创建这条document,其实会在delete version基础之上,再把version号加1

4.4 ES内部version乐观锁
4.4.1 构造一条数据
PUT /test_index/test_type/7
{
"test_field": "test test"
}
4.4.2 两个客户端获取该数据
这两个客户端分别为 ClientA 和 ClientB
GET test_index/test_type/7
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 1,
"found": true,
"_source": {
"test_field": "test test"
}
}
4.4.3 ClientA修改该数据
同时带上数据的版本号,确保es中的数据的版本号,跟客户端中的数据的版本号是相同的,才能修改
PUT /test_index/test_type/7?version=1
{
"test_field": "test client 1"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
4.4.4 ClientB修改数据
ClientB尝试基于version=1的数据去进行修改,同样带上version版本号,进行乐观锁的并发控制
PUT /test_index/test_type/7?version=1
{
"test_field": "test client 2"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "3",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][7]: version conflict, current version [2] is different than the one provided [1]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "3",
"index": "test_index"
},
"status": 409
}
4.4.5 使用正确的方式更新
在乐观锁成功阻止并发问题之后,尝试正确的完成更新
GET /test_index/test_type/7
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 2,
"found": true,
"_source": {
"test_field": "test client 1"
}
}
基于最新的数据和版本号,去进行修改,修改时带上最新的版本号,可能这个步骤会需要反复执行好几次,才能成功,特别是在多线程并发更新同一条数据很频繁的情况下
PUT /test_index/test_type/7?version=2
{
"test_field": "test client 2"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "7",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
4.5 ES外部version乐观锁
es提供了一个feature,就是说,你可以不用它提供的内部 _version版本号来进行并发控制,可以基于你自己维护的一个版本号来进行并发控制。举个列子,假如你的数据在mysql里也有一份,然后你的应用系统本身就维护了一个版本号,无论是什么自己生成的,程序控制的。这个时候,你进行乐观锁并发控制的时候,可能并不是想要用es内部的_version来进行控制,而是用你自己维护的那个version来进行控制。
// 基于内部version
?version=1
// 基于外部version
?version=1&version_type=external
内部version和外部version的区别
内部version:
只有当你提供的 version 与es中的 version 一模一样的时候,才可以进行修改,只要不一样,就报错;
外部version:
当version_type=external的时候,只有当你提供的version比es中的version大的时候,才能完成修改
es,_version=1,?version=1,才能更新成功
es,_version=1,?version>1&version_type=external,才能成功,比如说?version=2&version_type=external
4.5.1 构造一条数据
PUT /test_index/test_type/8
{
"test_field": "test"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true
}
4.5.2 两个客户端获取该数据
GET /test_index/test_type/8
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 1,
"found": true,
"_source": {
"test_field": "test"
}
}
4.5.3 ClientA修改数据
ClientA先进行修改,此时客户端程序是在自己的数据库中获取到了这条数据的最新版本号,比如说是2
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 1"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
4.5.4 ClientB修改数据
ClientB同时拿到了自己数据库中维护的那个版本号,也是2,同时基于version=2发起了修改
PUT /test_index/test_type/8?version=2&version_type=external
{
"test_field": "test client 2"
}
{
"error": {
"root_cause": [
{
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
}
],
"type": "version_conflict_engine_exception",
"reason": "[test_type][8]: version conflict, current version [2] is higher or equal to the one provided [2]",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "1",
"index": "test_index"
},
"status": 409
}
4.5.5 使用正确方式更新
在并发控制成功后,重新基于最新的版本号发起更新
GET /test_index/test_type/8
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 2,
"found": true,
"_source": {
"test_field": "test client 1"
}
}
或者
PUT /test_index/test_type/8?version=3&version_type=external
{
"test_field": "test client 2"
}
{
"_index": "test_index",
"_type": "test_type",
"_id": "8",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false
}
5. partial update
5.1 什么是partial update
正常使用全量替换更新操作
PUT /index/type/id,创建文档&替换文档,就是一样的语法
一般对应到应用程序中,每次的执行流程基本是这样的:
- 应用程序先发起一个get请求,获取到document,展示到前台界面,供用户查看和修改
- 用户在前台界面修改数据,发送到后台
- 后台代码,会将用户修改的数据在内存中进行执行,然后封装好修改后的全量数据
- 然后发送PUT请求,到es中,进行全量替换
- es将老的document标记为deleted,然后重新创建一个新的document
partial update(部分更新)
post /index/type/id/_update { "doc": { "要修改的少数几个field即可,不需要全量的数据" } }
看起来,好像就比较方便了,每次就传递少数几个发生修改的field即可,不需要将全量的document数据发送过去
5.2 partial update原理及优点
partial update,看起来很方便的操作,实际内部的原理是什么样子的,然后它的优点是什么

全量update时,先从es中查出document数据,然后合并要修改内容之后,再写会es。整个过程发生在java内部;
partial update时,整个过程发生在es内部一个shard上。
5.3 验证partial update
新增一条数据
PUT /test_index/test_type/10
{
"test_field1": "test1",
"test_field2": "test2"
}
partial update 更新
POST /test_index/test_type/10/_update
{
"doc": {
"test_field2": "updated test2"
}
}
查询更新后的结果
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : "10",
"_version" : 2,
"_primary_term" : 2,
"found" : true,
"_source" : {
"test_field1" : "test1",
"test_field2" : "test update 2"
}
}
5.4 groovy脚本partial update
es,其实是有个内置的脚本支持的,可以基于groovy脚本实现各种各样的复杂操作
基于groovy脚本,如何执行partial update
es scripting module,我们会在高手进阶篇去讲解,这里就只是初步讲解一下
新增测试数据
PUT /test_index/test_type/11
{
"num": 0,
"tags": []
}
5.4.1 内置脚本
partial update更新
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1"
}
查询更新后结果
{
"_index": "test_index",
"_type": "test_type",
"_id": "11",
"_version": 2,
"found": true,
"_source": {
"num": 1,
"tags": []
}
}
5.4.2 外部脚本
在安装目录 config/scripts 下新增脚本 test-add-tags.groovy,内容如下
E:\Develop\OpenSource\elasticsearch-5.2.0\config\scripts\test-add-tags.groovy
ctx._source.tags+=new_tag
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-add-tags",
"params": {
"new_tag": "tag1"
}
}
}
5.4.3 用脚本删除文档
在 config/scripts目录下新增 test-delete-document.groovy
ctx.op = ctx._source.num == count ? 'delete' : 'none'
POST /test_index/test_type/11/_update
{
"script": {
"lang": "groovy",
"file": "test-delete-document",
"params": {
"count": 1
}
}
}
5.4.4 upsert操作
POST /test_index/test_type/11/_update
{
"doc": {
"num": 1
}
}
文档 11 已被 5.4.3 步骤删除,partial update 出现如下错误
{
"error": {
"root_cause": [
{
"type": "document_missing_exception",
"reason": "[test_type][11]: document missing",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "4",
"index": "test_index"
}
],
"type": "document_missing_exception",
"reason": "[test_type][11]: document missing",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "4",
"index": "test_index"
},
"status": 404
}
如果指定的document不存在,就执行upsert中的初始化操作;如果指定的document存在,就执行doc或者script指定的partial update操作
POST /test_index/test_type/11/_update
{
"script" : "ctx._source.num+=1",
"upsert": {
"num": 0,
"tags": []
}
}
5.5 partial update乐观锁

partial update fail 之后策略
// 重试
post /index/type/id/_update?retry_on_conflict=5
// 版本号更新
post /index/type/id/_update?version=6
6. mget批量查询
6.1 批量查询的好处
就是一条一条的查询,比如说要查询100条数据,那么就要发送100次网络请求,这个开销还是很大的
如果进行批量查询的话,查询100条数据,就只要发送1次网络请求,网络请求的性能开销缩减100倍
6.2 mget的语法
6.2.1 一条一条的查询
GET /test_index/test_type/1
GET /test_index/test_type/2
6.2.2 mget批量查询
要查询的数据在不同index和type下
GET /_mget
{
"docs" : [
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 1
},
{
"_index" : "test_index",
"_type" : "test_type",
"_id" : 2
}
]
}
{
"docs": [
{
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 2,
"found": true,
"_source": {
"test_field1": "test field1",
"test_field2": "test field2"
}
},
{
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 1,
"found": true,
"_source": {
"test_content": "my test"
}
}
]
}
6.2.3 mget查询同个index
如果查询的document是一个index下的不同type种的话
GET /test_index/_mget
{
"docs" : [
{
"_type" : "test_type",
"_id" : 1
},
{
"_type" : "test_type",
"_id" : 2
}
]
}
6.2.4 mget查询同个ype
如果查询的数据都在同一个index下的同一个type下,最简单了
GET /test_index/test_type/_mget
{
"ids": [1, 2]
}
6.3 mget的重要性
可以说mget是很重要的,一般来说,在进行查询的时候,如果一次性要查询多条数据的话,那么一定要用batch批量操作的api
尽可能减少网络开销次数,可能可以将性能提升数倍,甚至数十倍,非常非常之重要
7. bulk批量操作
7.1 bulk语法
语法说明
每一个操作要两个json串,语法如下:
{"action": {"metadata"}}
{"data"}
有哪些类型的操作可以执行呢?
- delete:删除一个文档,只要1个json串就可以了
POST /_bulk { "delete": { "_index": "test_index", "_type": "test_type", "_id": "3" }} - create:PUT /index/type/id/_create,强制创建
POST /_bulk { "create": { "_index": "test_index", "_type": "test_type", "_id": "12" }} { "test_field": "test12" } - index:普通的put操作,可以是创建文档,也可以是全量替换文档
POST /_bulk { "index": { "_index": "test_index", "_type": "test_type", "_id": "2" }} { "test_field": "replaced test2" } - update:执行的partial update操作
POST /_bulk { "update": { "_index": "test_index", "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} } { "doc" : {"test_field2" : "bulk test1"} }
举例,比如你现在要创建一个文档,放bulk里面,看起来会是这样子的:
{"index": {"_index": "test_index", "_type", "test_type", "_id": "1"}}
{"test_field1": "test1", "test_field2": "test2"}
bulk api对json的语法,有严格的要求,每个json串不能换行,只能放一行,同时一个json串和一个json串之间,必须有一个换行
{
"took": 41,
"errors": true,
"items": [
{
"delete": {
"found": true,
"_index": "test_index",
"_type": "test_type",
"_id": "10",
"_version": 3,
"result": "deleted",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "3",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"create": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"status": 409,
"error": {
"type": "version_conflict_engine_exception",
"reason": "[test_type][2]: version conflict, document already exists (current version [1])",
"index_uuid": "6m0G7yx7R1KECWWGnfH1sw",
"shard": "2",
"index": "test_index"
}
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "4",
"_version": 1,
"result": "created",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": true,
"status": 201
}
},
{
"index": {
"_index": "test_index",
"_type": "test_type",
"_id": "2",
"_version": 2,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"created": false,
"status": 200
}
},
{
"update": {
"_index": "test_index",
"_type": "test_type",
"_id": "1",
"_version": 3,
"result": "updated",
"_shards": {
"total": 2,
"successful": 1,
"failed": 0
},
"status": 200
}
}
]
}
bulk操作中,任意一个操作失败,是不会影响其他的操作的,但是在返回结果里,会告诉你异常日志
bulk操作同个index
POST /test_index/_bulk
{ "delete": { "_type": "test_type", "_id": "3" }}
{ "create": { "_type": "test_type", "_id": "12" }}
{ "test_field": "test12" }
{ "index": { "_type": "test_type" }}
{ "test_field": "auto-generate id test" }
{ "index": { "_type": "test_type", "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_type": "test_type", "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
bulk操作同个index同个type
POST /test_index/test_type/_bulk
{ "delete": { "_id": "3" }}
{ "create": { "_id": "12" }}
{ "test_field": "test12" }
{ "index": { }}
{ "test_field": "auto-generate id test" }
{ "index": { "_id": "2" }}
{ "test_field": "replaced test2" }
{ "update": { "_id": "1", "_retry_on_conflict" : 3} }
{ "doc" : {"test_field2" : "bulk test1"} }
7.2 bulk size最佳大小
bulk request会加载到内存里,如果太大的话,性能反而会下降,因此需要反复尝试一个最佳的bulk size。一般从1000~5000条数据开始,尝试逐渐增加。另外,如果看大小的话,最好是在5~15MB之间。
7.3 bulk格式分析
bulk中的每个操作都可能要转发到不同的node的shard去执行
如果采用比较良好的json数组格式
允许任意的换行,整个可读性非常棒,读起来很爽,es拿到那种标准格式的json串以后,要按照下述流程去进行处理
- 将json数组解析为JSONArray对象,这个时候,整个数据,就会在内存中出现一份一模一样的拷贝,一份数据是json文本,一份数据是JSONArray对象
- 解析json数组里的每个json,对每个请求中的document进行路由
- 为路由到同一个shard上的多个请求,创建一个请求数组
- 将这个请求数组序列化
- 将序列化后的请求数组发送到对应的节点上去
耗费更多内存,更多的jvm gc开销
我们之前提到过bulk size最佳大小的那个问题,一般建议说在几千条那样,然后大小在10MB左右,所以说,可怕的事情来了。假设说现在100个bulk请求发送到了一个节点上去,然后每个请求是10MB,100个请求,就是1000MB = 1GB,然后每个请求的json都copy一份为jsonarray对象,此时内存中的占用就会翻倍,就会占用2GB的内存,甚至还不止。因为弄成jsonarray之后,还可能会多搞一些其他的数据结构,2GB+的内存占用。
占用更多的内存可能就会积压其他请求的内存使用量,比如说最重要的搜索请求,分析请求,等等,此时就可能会导致其他请求的性能急速下降
另外的话,占用内存更多,就会导致java虚拟机的垃圾回收次数更多,跟频繁,每次要回收的垃圾对象更多,耗费的时间更多,导致es的java虚拟机停止工作线程的时间更多现在的奇特格式
{"action": {"meta"}}\n {"data"}\n {"action": {"meta"}}\n {"data"}\n- 不用将其转换为json对象,不会出现内存中的相同数据的拷贝,直接按照换行符切割json
- 对每两个一组的json,读取meta,进行document路由
- 直接将对应的json发送到node上去
最大的优势在于,不需要将json数组解析为一个JSONArray对象,形成一份大数据的拷贝,浪费内存空间,尽可能地保证性能
8. Document原理分析
8.1 document路由原理
document路由到shard上是什么意思?

路由算法:shard = hash(routing) % number_of_primary_shards
举个例子,一个index有3个primary shard,P0,P1,P2
每次增删改查一个document的时候,都会带过来一个routing number,默认就是这个document的_id(可能是手动指定,也可能是自动生成)
假设 routing = _id,当前_id=1会将这个routing值,传入一个hash函数中,产出一个routing值的hash值,hash(routing) = 21
然后将hash函数产出的值对这个index的primary shard的数量求余数,21 % 3 = 0
name,这个document就放在P0上。决定一个document在哪个shard上,最重要的一个值就是routing值,默认是_id,也可以手动指定,相同的routing值,每次过来,从hash函数中,产出的hash值一定是相同的
无论hash值是几,无论是什么数字,对number_of_primary_shards求余数,结果一定是在0~number_of_primary_shards-1之间这个范围内的。0,1,2。
_id or custom routing value
默认的routing就是_id
也可以在发送请求的时候,手动指定一个routing value,比如说put /index/type/id?routing=user_id手动指定routing value是很有用的,可以保证说,某一类document一定被路由到一个shard上去,那么在后续进行应用级别的负载均衡,以及提升批量读取的性能的时候,是很有帮助的
primary shard数量不可变
修改 primary shard数量之后,根据hash公式判断数据在哪个shard时,修改前后判断的位置就不一样,导致数据丢失
8.2 document增删改原理
- 客户端选择一个node发送请求过去,这个node变为coordinating node(协调节点)
- coordinating node,对document进行路由,将请求转发给对应的node(有primary shard)
- 实际的node上的primary shard处理请求,然后将数据同步到replica node
- coordinating node,如果发现primary node和所有replica node都搞定之后,就返回响应结果给客户端

8.3 document写一致性
consistency,one(primary shard),all(all shard),quorum(default)
我们在发送任何一个增删改操作的时候,比如说 put /index/type/id,都可以带上一个consistency参数,指明我们想要的写一致性是什么?
put /index/type/id?consistency=quorum- one:要求我们这个写操作,只要有一个primary shard是active活跃可用的,就可以执行
- all:要求我们这个写操作,必须所有的primary shard和replica shard都是活跃的,才可以执行这个写操作
= quorum:默认的值,要求所有的shard中,必须是大部分的shard都是活跃的,可用的,才可以执行这个写操作
quorum机制,写之前必须确保大多数shard都可用,int( (primary + number_of_replicas) / 2 ) + 1,当number_of_replicas>1时才生效
quroum = int( (primary + number_of_replicas) / 2 ) + 1
举个例子,3个primary shard,number_of_replicas=1,总共有3 + 3 * 1 = 6个shard
quorum = int( (3 + 1) / 2 ) + 1 = 3
所以,要求6个shard中至少有3个shard是active状态的,才可以执行这个写操作如果节点数少于quorum数量,可能导致quorum不齐全,导致无法执行任何写操作
3个primary shard,replica=1,要求至少3个shard是active,3个shard按照之前学习的shard&replica机制,必须在不同的节点上,如果说只有1台机器的话,是不是有可能出现说,3个shard都没法分配齐全,此时就可能会出现写操作无法执行的情况
1个primary shard,replica=3,quorum=((1 + 3) / 2) + 1 = 3,要求1个primary shard + 3个replica shard = 4个shard,其中必须有3个shard是要处于active状态的。如果这个时候只有2台机器的话,会出现什么情况呢?
es提供了一种特殊的处理场景:当number_of_replicas>1时quorum机制才生效。因为假如说,你就一个primary shard,replica=1,此时就2个shard
(1 + 1 / 2) + 1 = 2,要求必须有2个shard是活跃的,但是可能就1个node,此时就1个shard是活跃的,如果你不特殊处理的话,导致我们的单节点集群就无法工作quorum不齐全时,wait默认1分钟,timeout,100,30s
等待期间,期望活跃的shard数量可以增加,最后实在不行,就会timeout
我们其实可以在写操作的时候,加一个timeout参数,比如说put /index/type/id?timeout=30,这个就是说自己去设定quorum不齐全的时候,es的timeout时长,可以缩短,也可以增长
8.4 document查询原理
- 客户端发送请求到任意一个node,成为coordinate node
- coordinate node对document进行路由,将请求转发到对应的node,此时会使用round-robin随机轮询算法,在primary shard以及其所有replica中随机选择一个,让读请求负载均衡
- 接收请求的node返回document给coordinate node
- coordinate node返回document给客户端
- 特殊情况:document如果还在建立索引过程中,可能只有primary shard有,任何一个replica shard都没有,此时可能会导致无法读取到document,但是document完成索引建立之后,primary shard和replica shard就都有
