irpas技术客

Elasticsearch:在 Java 应用中创建 mappings,批量写入及更新 - Java client 8.x_Elastic 中国社区官方_ela

网络投稿 1879

在我之前的文章 “Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”,我详细地描述了如何在 Java 客户端应用中创建一个索引并对它进行搜索。在那个例子里,我们并没有描述如何创建 mappings。最近,我看到有开发者在评论区里留言想知道如何创建?mappings 并使用 _bulk 来进行批量写入及更新。今天的文章,我是继先前的文章 “Elasticsearch:使用 Elasticsearch Java client 8.0 来连接带有 HTTPS 的集群” 来进行的。在 Elastic Stack 8.x 平台,开始引入 HTTPS 的访问,所以前面的那篇文章是最好的开始。

我将使用 Elastic Stack 8.2 老进行展示。为了方便大家的学习,你可以在地址?GitHub - liu-xiao-guo/ElasticsearchJava-mapping-bulk8?下载下面所讲内容的源码。关于 Java client API 的查找,你可以参考链接?Overview (java-client 8.2.2 API)?

如何在 Java 应用中创建 mappings 及进行批量写入

如何在 Java 应用中创建 mappings 及进行批量写入_哔哩哔哩_bilibili

创建 mappings

其实在最新的 Java 客户端 API 中,它的使用非常像极了我们常见的在 console 中的命令。比如,我们可以使用如下的命令来创建一个索引:

PUT test { "mappings": { "properties": { "id": { "type": "keyword" }, "name": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "price": { "type": "long" } } } }

显然在上面,我们可以看见有一个叫做 mappings 的字段,当然它是操作与一个索引 test 上的。很自然的,在使用请求的时候,我们需要创建 mappings 这个定义。

首先,我们来把代码直接贴出来:

ElasticsearchIndicesClient indices = client.indices(); // Firstly remove "products" if it exists try { DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder() .index("products") .build(); DeleteIndexResponse delete_response = indices.delete(delete_request); System.out.println(delete_response.acknowledged()); } catch (Exception e) { // e.printStackTrace(); } // Secondly remove "test" if it exists try { DeleteIndexRequest delete_request = new DeleteIndexRequest.Builder() .index("test") .build(); DeleteIndexResponse delete_response = indices.delete(delete_request); System.out.println(delete_response.acknowledged()); } catch (Exception e) { // e.printStackTrace(); }

在上面,它们相当于如下的指令:

DELETE products DELETE test

为了确保下面的命令成功,我们首先删除已经创建过的 products 及 test 索引,如果它们之前已经被创建过。这是一个很简单的操作,就是发送一个 DELETE 指令。

接下来,我们在一个文件中定义一个如下的 mappings:

mappings.json

{ "properties" : { "id" : { "type" : "keyword" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "price" : { "type" : "long" } } }

这是我们想要的一个 mapping。它定义了索引中想要的字段的类型。

我们接着使用如下的命令来创建一个叫做 test 的索引:

String mappingPath = System.getProperty("user.dir") + "/mappings.json"; JsonpMapper mapper = client._transport().jsonpMapper(); String mappings_str = new String(Files.readAllBytes(Paths.get(mappingPath))); System.out.println("mappings are: " + mappings_str); JsonParser parser = mapper.jsonProvider() .createParser(new StringReader( mappings_str )); client.indices() .create(createIndexRequest -> createIndexRequest.index("test") .mappings(TypeMapping._DESERIALIZER.deserialize(parser, mapper)));

首先,我们读入 mappings.json 文件,并使用 client.indices 来创建 test 索引。显然和我们的 console 中的命令很相似。从一个叫做 test 的索引中创建一个 mappings。

当我们成功运行上面的命令后,我们可以在 Kibana 中进行查看:

GET test/_mapping

它的响应为:

{ "test" : { "mappings" : { "properties" : { "id" : { "type" : "keyword" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "price" : { "type" : "long" } } } } }

接下来,我们使用另外一种方法来创建 mappings:

String mappings = "{\n" + " \"properties\" : {\n" + " \"id\" : {\n" + " \"type\" : \"keyword\" \n" + " },\n"+ " \"name\" : {\n" + " \"type\" : \"text\",\n" + " \"fields\" : {\n" + " \"keyword\" : {\n" + " \"type\" : \"keyword\",\n" + " \"ignore_above\" : 256 \n" + " }\n" + " } \n" + " }, \n" + " \"price\" : { \n" + " \"type\" : \"long\" \n" + " } \n" + " }\n" + "}\n"; System.out.println( "mappings are: " + mappings ); JsonpMapper mapper1 = client._transport().jsonpMapper(); JsonParser parser1 = Json.createParser(new StringReader(mappings)); CreateIndexRequest request_create = new CreateIndexRequest.Builder() .index("products") .mappings(TypeMapping._DESERIALIZER.deserialize(parser1, mapper1)) .build(); CreateIndexResponse response_create = indices.create(request_create); System.out.println(response_create.acknowledged());

在上面,我使用了一个字符串 mappings 来定义索引 products 的 mappings。我们可以通过打印的方法来检查字符串的输出是否正确。我们必须确保这个字符串输出和我们在 console 里的一致性,否则会造成错误。接下来的代码和之前的一样。

当我们成功运行上面的代码后,我们可以在 Kibana 中进行查看:

GET products/_mapping

它的响应是:

{ "products" : { "mappings" : { "properties" : { "id" : { "type" : "keyword" }, "name" : { "type" : "text", "fields" : { "keyword" : { "type" : "keyword", "ignore_above" : 256 } } }, "price" : { "type" : "long" } } } } }

显然它是按照我们的需求来创建的 mappings。

使用 _bulk 来进行批量写入

在我们写入的时候,_bulk 指令可以一次性大量写入我们的文档,从而提高效率。那么我们该如何在 Java 里进行实现呢?

Product prod1 = new Product("prod1", "washing machine", 42); Product prod2 = new Product("prod2", "TV", 42); List<Product> products = new ArrayList<Product>(); products.add( prod1 ); products.add( prod2 ); BulkRequest.Builder br = new BulkRequest.Builder(); for (Product product : products) { br.operations(op -> op .index(idx -> idx .index("products") .id(product.getId()) .document(product) ) ); } BulkResponse result = client.bulk(br.refresh(Refresh.WaitFor).build()); if (result.errors()) { System.out.println("Bulk had errors"); for (BulkResponseItem item: result.items()) { if (item.error() != null) { System.out.println(item.error().reason()); } } }

在上面,当我写入的时候,我们强制使用 refresh 以使得写入的文档在紧接下来的操作中变为可以搜索的。关于 refresh 的操作,你可以阅读我的另外一篇文章 “Elasticsearch:Elasticsearch 中的 refresh 和 flush 操作指南”。实际上它的操作也非常简单。如上面的代码所示,我们首先创建两个文档的列表,然后使用 BulkRequest 来创建请求。在请求中,我们使用了 index 的操作。上面的这个操作非常像如下的这个命令:

POST _bulk { "index" : { "_index" : "produtcs", "_id" : "prod1" } } { "id" : "prod1", "name": "washing machine", "price": 42 } { "index" : { "_index" : "produtcs", "_id" : "prod2" } } { "id" : "prod2", "name": "TV", "price": 42 }

运行上面的命令后,我们执行如下的命令来进行查看:

GET products/_search?filter_path=**.hits

上面命令显示的结果为:

{ "hits" : { "hits" : [ { "_index" : "products", "_id" : "prod1", "_score" : 1.0, "_source" : { "id" : "prod1", "name" : "washing machine", "price" : 42 } }, { "_index" : "products", "_id" : "prod2", "_score" : 1.0, "_source" : { "id" : "prod2", "name" : "TV", "price" : 42 } } ] } }

显然我们的写入是成功的,并且是一次请求,同时写入了两个文档。在实际的使用中,我们可以写入成百上千的文档。

使用 update by query 进行批量更新

我仔细看了以之前的那个开发者在我的文章 “?“Elasticsearch:使用最新的 Elasticsearch Java client 8.0 来创建索引并搜索”,他的需求是如何使用 update by queryAPI 来进行批量修改文档。在我们的 console 中,我们可以打入如下的命令来进行修改一个文档:

POST products/_update_by_query { "query": { "match": { "id": "prod1" } }, "script": { "source": """ ctx._source['price'] = 50 """, "lang": "painless" } }

当我们运行上面的命令后,我们重新查看之前写入的文档:

GET products/_search?filter_path=**.hits { "hits" : { "hits" : [ { "_index" : "products", "_id" : "prod2", "_score" : 1.0, "_source" : { "id" : "prod2", "name" : "TV", "price" : 42 } }, { "_index" : "products", "_id" : "prod1", "_score" : 1.0, "_source" : { "price" : 50, "name" : "washing machine", "id" : "prod1" } } ] } }

从上面的命令中,我们可以看出来 prod1 的 price 被更新为 50。

接下来,我们来通过代码的形式来更新这个文档:

UpdateByQueryRequest updateByQueryRequest = new UpdateByQueryRequest.Builder() .index("products") .query(q -> q .match( m -> m .field("id") .query("prod1") ) ) .script(s -> s.inline( src -> src .lang("painless") .source("ctx._source['price'] = 50") )) .build(); UpdateByQueryResponse response_update = client.updateByQuery(updateByQueryRequest); System.out.println("updated : " + response_update.updated());

在上面,我们的代码其实和之前的请求非常相似。它操作于一个索引 products 之上,并对它进行 query。我们使用 script 来对它进行修改。运行上面的代码,我们可以看到如下的输出:

updated : 1

它表明的我们的更新是成功的。当然我们也可以在 Kibana 中进行查看。我们会看到文档确实被更新了。

好了,进行的文章就分享到这里。希望大家学到知识!


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #ElasticSearch #JAVA #mapping #在我之前的文章 #client