irpas技术客

ES - 滚动查询(scroll)_莫莫绵_es scroll查询

网络投稿 775

Elasticsearch - 滚动查询scroll 简介实践中我使用到滚动的场景from-size分页的缺点json处理步骤案例如下 java 处理步骤代码逻辑简化版java代码如下:


简介

? ?Elasticsearch滚动查询也叫游标查询

? ?适合那种需要一次性或分批拉出大量数据做离线处理、迁移等。可以提升点效率。

实践中我使用到滚动的场景 需求需要从几个不同的es数据源拉取、截取数据,合到一个新的业务数据源中。每天夜里有定时任务需要拉取某天的索引数据,根据某个字段去重后拿去做离线业务处理。

注意:scroll不适合支持那种实时的和用户交互的前端分页工作,实时分页查询可以使用from-size方式。但同时from-size也不适用上述离线大数据量处理业务场景。

from-size分页的缺点 GET /{index_name}/_search { "from":0, "size":10 }

? ?es客户端实时分页一般使用from-size。如果有100条数据,按size=10共分10页,那么当用户查询第n页的时候,实际上es是把前n页的数据全部找出来,再去除前n-1页最后得到需要的数据返回,查最后一页就相当于全扫描。其中利弊大家自行思考。所以离线大批量数据的处理业务或迁移不适合使用from-size方式查询。

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); searchSourceBuilder.from((page.getPageNum() - 1) * page.getPageSize()); searchSourceBuilder.size(page.getPageSize()); json处理步骤

? ?我们可以给初始化查询传递参数scroll=5m ,es会返回一个_scroll_id,这是一个base64编码的长字符串,用于下次查询时传入。5m表示_scroll_id缓存5分钟,之后自动过期,可以根据需要配置。size可以指定每次滚动拉取多少数据。不过如果你做了分片,查询结果可能超过指定的 size 大小。

案例如下

例如:第一次查询

GET /sms/_search?scroll=5m { "size": 20, "query": { "bool": { "must": [ { "match": { "userId": "9d995c0b90fe4128896a1a84eca213bf" } } ] } } }

返回:

{ "_scroll_id": "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==", "took": 6, ...... }

之后我们把上一次得到的_scroll_id拿到按以下查询即可得到下一轮的数据。

GET /_search/scroll/ { "scroll":"1m", "scroll_id":"DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==" }

除了等待scroll_id过期时间之外,我们也可以手动删除scroll_id:

// 手动删除scroll_id的方法 DELETE /_search/scroll { "scroll_id" : "DnF1ZXJ5VGhlbkZldGNoBgAAAAAATJH1FlFTYzlSZ0VNVGdlM2o0T0dTX2tVUncAAAAAAE0-zBZQUVp6Sy04X1J1NjJCaVZfQUhHWjFnAAAAAABMkfYWUVNjOVJnRU1UZ2UzajRPR1Nfa1VSdwAAAAAATXVxFk83UWRhNGg3UmxTQnpXTEUzd0dreXcAAAAAAEyR9xZRU2M5UmdFTVRnZTNqNE9HU19rVVJ3AAAAAABNPs0WUFFaekstOF9SdTYyQmlWX0FIR1oxZw==" } java 处理步骤 代码逻辑 我们可以使用一个循环去查询,第一次查询的时候按需要的查询条件处理,加上参数scroll即可,之后的查询均使用GET /_search/scroll/ 传递_scroll_id查询,如果返回数据为空则终止循环。

es version: 6.3.2

<dependency> <groupId>io.searchbox</groupId> <artifactId>jest</artifactId> </dependency> 简化版java代码如下: String rollId = null; //滚动查询 while (true) { // 第一次查询 if (rollId == null) { // 按业务需求查询 JestResult jestResult = jestSmsClient.execute(getSmsShortSearchBuilder(createDate)); if (jestResult.isSucceeded()) { // 获取scroll_id以备下次查询 rollId = jestResult.getJsonObject().get("_scroll_id").getAsString(); //获取查询结果业务处理 result = ....... } } else { // 后续查询使用rollId JestResult jestResult = jestSmsClient.execute(new SearchScroll.Builder(rollId, "5m").build()); if (jestResult.isSucceeded()) { // 获取scroll_id以备下次查询 rollId = jestResult.getJsonObject().get("_scroll_id").getAsString(); //获取查询结果业务处理 result = ....... } } // 没有数据返回,你也可以认为:如果返回的数据长度小于size时可以跳出循环。没毛病可能会少调用一次es。不过可能会由于多个分片导致并没有少调用一次 if(CollectionUtils.isEmpty(result)) { break; } }

稍微写详细点凑下字数:

public final static String ES_INDEX = "es索引名"; public final static String ES_TYPE = "_doc"; String rollId = null; //滚动查询 while (true) { List<Xxxx> result = Lists.newArrayList(); //首次访问 if (rollId == null) { try { SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder(); BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); searchSourceBuilder.query(boolQueryBuilder); searchSourceBuilder.size(10000); searchSourceBuilder.fetchSource(new String[]{"sendUrl"}, new String[]{}); Search search = new Search.Builder(searchSourceBuilder.toString()).addIndex( Constants.ES_INDEX).addType(Constants.ES_TYPE) .setParameter("preference", "_primary_first") .setParameter(Parameters.SCROLL, "5m") .build(); JestResult jestResult = jestSmsClient.execute(search); if (jestResult.isSucceeded()) { rollId = jestResult.getJsonObject().get("_scroll_id").getAsString(); result = jestResult.getSourceAsObjectList(Xxxx.class, false); } } catch (IOException e) { log.error("异常信息:", e); return null; } } else { try { JestResult jestResult = jestSmsClient.execute(new SearchScroll.Builder(rollId, "5m").build()); if (jestResult.isSucceeded()) { rollId = jestResult.getJsonObject().get("_scroll_id").getAsString(); result = jestResult.getSourceAsObjectList(Xxxx.class, false); } } catch (IOException e) { log.error("异常信息:", e); return null; } } if(CollectionUtils.isEmpty(result)) { break; } //对返回数据做处理result ...省略 }

ok,结束了。莫莫绵溜了溜了。。


水平有限,如果你觉得上述有任何疑问、不足、错误的地方,欢迎在评论区指正。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #es #scroll查询