恕我直言,我也是才知道ElasticSearch条件更新是这么玩的
背景
ElasticSearch 的使用度越来越普及了,很多公司都在使用。有做日志搜索的,有做商品搜索的,有做订单搜索的。
大部分使用场景都是通过程序定期去导入数据到 ElasticSearch 中,或者通过 CDC 的方式来构建索引。在这种场景下,更新数据都是单条更新,比如 ID=1 的数据发生了修改操作,那么就会把 ElasticSearch 中 ID=1 的这条数据更新下。
但有些场景下需要根据条件同时更新多条数据,就像 Mysql 中我们使用 Update Table Set Name=XXX where Age=18 去更新一批数据一样。
正好有同学微信问我怎么批量更新,接下来就看看在 ElasticSearch 中是如何去进行按条件更新的操作。
单条更新
ElasticSearch 的客户端官方推荐使用 elasticsearch-rest-high-level-client。所以本文也是基于 elasticsearch-rest-high-level-client 来构建代码。
首先来回顾下单条数据的更新是怎么做的,代码如下:
UpdateRequest updateRequest = new UpdateRequest(index, type, id);
updateRequest.doc(documentJson, XContentType.JSON);
restHighLevelClient.update(updateRequest, options);
构建 UpdateRequest 的时候就指定了索引,类型,ID 三个字段,也就精确到了某一条数据,所以更新的自然也是这一条数据。
条件更新
首先我们准备几条测试数据,如下:
{
id: 1,
title: "Java怎么学",
type: 1,
userId: 1,
tags: [
"java"
],
textContent: "我要学Java",
status: 1,
heat: 100
}
{
id: 2,
title: "Java怎么学",
type: 1,
userId: 1,
tags: [
"java"
],
textContent: "我要学Java",
status: 1,
heat: 100
}
假如我们的需求是将 userId=1 的所有文档数据改成无效,也就是 status=0。如果不用按条件更新,你就得查询出 userId=1 的所有数据,然后一条条更新,这就太慢了。
下面看看按条件更新是如何使用的,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"ctx._source['status']=0;"
},
"query": {
"term": {
"userId": 1
}
}
}
按条件更新需要使用_update_by_query 来进行,query 用于指定更新数据的匹配条件,script 用于更新的逻辑。
详细使用文档:
https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html[1]
https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html[2]
在 Java 代码中如何实现条件更新呢?
UpdateByQueryRequest request = new UpdateByQueryRequest("article_v1");
request.setQuery(new TermQueryBuilder("userId", 1));
request.setScript(new Script("ctx._source['status']=0;"));
restHighLevelClient.updateByQuery(request, RequestOptions.DEFAULT);
是不是也很简单,跟单条数据更新差不多,使用 UpdateByQueryRequest 构建更新对象,然后设置 Query 和 Script 就可以了。
条件更新数组
比如我们的需求是要移除 tags 中的 java,如下:
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"ctx._source['tags'].removeIf(item -> item == 'java');"
},
"query": {
"term": {
"userId": 1
}
}
}
新增的话只需要将 removeIf 改成 add 就可以了。
ctx._source['tags'].add('java');
如果有特殊的业务逻辑,Script 中还可以写判断来判断是否需要修改。
POST http://47.105.66.210:9200/article_v1/doc/_update_by_query
{
"script": {
"source":"if(ctx._source.type == 11) {ctx._source['tags'].add('java');}"
},
"query": {
"term": {
"userId": 1
}
}
}
封装通用的条件更新
大部分场景下的更新都比较简单,根据某个字段去更新某个值,或者去更新多个值。在 Java 中如果每个地方都去写脚本,就重复了,最好是抽一个比较通用的方法来更新。
下面是简单的示列,其中还有很多需要考虑的点,像数据类型我只处理了数字,字符串,和 List,其他的大家需要自己去扩展。
public BulkByScrollResponse updateByQuery(String index, QueryBuilder query, Map<String, Object> document) {
UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest(index);
updateByQueryRequest.setQuery(query);
StringBuilder script = new StringBuilder();
Set<String> keys = document.keySet();
for (String key : keys) {
String appendValue = "";
Object value = document.get(key);
if (value instanceof Number) {
appendValue = value.toString();
} else if (value instanceof String) {
appendValue = "'" + value.toString() + "'";
} else if (value instanceof List){
appendValue = JsonUtils.toJson(value);
} else {
appendValue = value.toString();
}
script.append("ctx._source.").append(key).append("=").append(appendValue).append(";");
}
updateByQueryRequest.setScript(new Script(script.toString()));
return updateByQuery(updateByQueryRequest, RequestOptions.DEFAULT);
}
public BulkByScrollResponse updateByQuery(UpdateByQueryRequest updateByQueryRequest, RequestOptions options) {
Map<String, Object> catData = new HashMap<>(1);
catData.put(ElasticSearchConstant.UPDATE_BY_QUERY_REQUEST, updateByQueryRequest.toString());
return CatTransactionManager.newTransaction(() -> {
try {
return restHighLevelClient.updateByQuery(updateByQueryRequest, options);
}catch (IOException e) {
throw new RuntimeException(e);
}
}, ElasticSearchConstant.ES_CAT_TYPE, ElasticSearchConstant.UPDATE, catData);
}
如果有了这么一个方法,那么使用方式如下:
@Test
public void testUpdate5() {
Map<String, Object> document = new HashMap<>();
document.put("title", "Java");
document.put("status", 0);
document.put("tags", Lists.newArrayList("JS", "CSS"));
kittyRestHighLevelClient.updateByQuery(elasticSearchIndexConfig.getArticleSaveIndexName(), new TermQueryBuilder("userId", 1), document);
}
关于作者:尹吉欢,简单的技术爱好者,《Spring Cloud 微服务-全栈技术与案例解析》, 《Spring Cloud 微服务 入门 实战与进阶》作者, 公众号 猿天地 发起人。
参考资料
[1]docs-update-by-query.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update-by-query.html
[2]modules-scripting-using.html: https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-scripting-using.html
- 得到真实外网IP、IP所在国家、省份、地区
- 机器学习在智能制造中的应用!
- sql2008 附加数据库时 错误5123
- Logistic Regression Models分析交互式问答译
- 照虎画猫写自己的Spring——依赖注入
- Logistic Regression Models分析交互式问答译
- Asp.Net开发等级星使用(Jquery Rating)
- Enterprise Library Policy Injection Application Block 之四:如何控制CallHandler的执行顺序
- 人工智能时代的艺术
- asp.net生成静态页
- Enterprise Library深入解析与灵活应用(1):通过Unity Extension实现和Policy Injection Application Block的集成
- 照虎画猫写自己的Spring
- 照虎画猫写自己的Spring
- mybatis 框架实战,实现数据库的增删改查
- JavaScript 教程
- JavaScript 编辑工具
- JavaScript 与HTML
- JavaScript 与Java
- JavaScript 数据结构
- JavaScript 基本数据类型
- JavaScript 特殊数据类型
- JavaScript 运算符
- JavaScript typeof 运算符
- JavaScript 表达式
- JavaScript 类型转换
- JavaScript 基本语法
- JavaScript 注释
- Javascript 基本处理流程
- Javascript 选择结构
- Javascript if 语句
- Javascript if 语句的嵌套
- Javascript switch 语句
- Javascript 循环结构
- Javascript 循环结构实例
- Javascript 跳转语句
- Javascript 控制语句总结
- Javascript 函数介绍
- Javascript 函数的定义
- Javascript 函数调用
- Javascript 几种特殊的函数
- JavaScript 内置函数简介
- Javascript eval() 函数
- Javascript isFinite() 函数
- Javascript isNaN() 函数
- parseInt() 与 parseFloat()
- escape() 与 unescape()
- Javascript 字符串介绍
- Javascript length属性
- javascript 字符串函数
- Javascript 日期对象简介
- Javascript 日期对象用途
- Date 对象属性和方法
- Javascript 数组是什么
- Javascript 创建数组
- Javascript 数组赋值与取值
- Javascript 数组属性和方法
- 非易失性WAL buffer实现解析(三)
- android实现切换日期左右无限滑动效果
- 疯子的算法总结(七) 字符串算法之 manacher 算法 O(N)解决回文串
- PostgreSQL WAL解析:构建WAL记录准备
- CodeForces - 225C. Barcode(DP)
- android studio 3.0 service项目背景音乐实现
- 疯子的算法总结(六) 复杂排序算法 ① 归并排序 merge_sort()
- PostgreSQL扫描方法综述
- CodeForces - 224C. Bracket Sequence (栈模拟)简单做法
- XLOG段文件跳号现象分析
- codeforce 227E 矩阵快速幂求斐波那契+N个连续数求最大公约数+斐波那契数列的性质
- 疯子的算法总结(五) 矩阵乘法 (矩阵快速幂)
- codeforce 227D Naughty Stone Piles (贪心+递归+递推)
- POJ3614防晒霜 这个贪心有点东西(贪心+优先队列)
- 环形均分纸牌问题(中位数)