irpas技术客

flink sql 使用streamx平台 从kafka读取数据写入es_fyl005_flink sql 写入es

未知 4190

版本说明:

flink 1.12 es 6.3

1、查询 flink官网。发现有 sql 可以直接写入 es的 ? ? ? ? flink官网链接???????Apache Flink 1.12 Documentation: Elasticsearch SQL Connector

创建sink到es中的表

?上图中有一个参数需要注意:

document-type :在es-7版本中,不需要写。但是在es-6版本中就需要写了。

原因(我感觉是):

????????6.0版本之前每个索引里都可以有多个type;

????????6.0版本之后每个索引里面只能有一个Type,一般使用_doc代替了。

2、根据自己的配置书写demo CREATE TABLE `rsl_bas_road_base_kafka` ( `id` STRING COMMENT 'id', `road_name` STRING COMMENT '路线名称', `road_dir` BIGINT COMMENT '路线方向', `create_time` STRING COMMENT 'yyyyMMdd 时间' ) WITH ( 'connector' = 'kafka', 'format' = 'json', 'topic' = 'rsl_bas_road_base_kafka_3', 'scan.startup.mode' = 'earliest-offset', 'properties.bootstrap.servers' = '10.39.235.24:6667,10.39.235.142:6667', 'properties.group.id' = 'rsl_bas_road_base_kafka_2' ); CREATE TABLE myUserTable ( id BIGINT COMMENT 'id', road_name STRING COMMENT '路线名称', road_dir BIGINT COMMENT '路线方向', create_time STRING COMMENT 'yyyyMMdd 时间' ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = 'http://127.0.0.1:8200', 'index' = 'index_test', 'document-type' = 'rsl_bas_road_base' ); insert into myUserTable select cast(id as bigint) as id, road_name as road_name, road_dir as road_dir, create_time as create_time from rsl_bas_road_base_kafka ;

代码写好后就是test了

3、创建kafka

bin/kafka-topics.sh --zookeeper 10.39.235.24:2181,10.39.235.142:2181 --create -replication-factor 1 --partitions 1 --topic rsl_bas_road_base_kafka_3

4、创建 es中的index

由于test阶段es的可视化工具暂时选用Elastic HD,不能直接在es HD中直接创建index。test后可以考虑换用?kibana,这个可视化工具还不错。

curl -XPUT 'http://127.0.0.1:8200/index_test' -H 'content-Type:application/json' -d '{ "mappings": { "rsl_bas_road_base":{ "properties":{ "id":{ "type":"long" }, "road_name":{ "type": "text" }, "road_dir":{ "type":"long" }, "create_time":{ "type":"date" } } } } }' 5、往 kafka中mock数据 public class MockSanSi { private static final String JING_XIOING_TEST_BOOT_STRAP_SERVER="10.39.235.24:6667"; public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", JING_XIOING_TEST_BOOT_STRAP_SERVER); props.put("acks", "-1"); props.put("retries", 0); // props.put("batch.size", 16384); props.put("linger.ms", 1); // props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for (int i = 0;i < 100; i++){ String j; j = Integer.toString(i); String meesage = "{\"id\":\"300"+j+"\",\"road_name\":\"京雄test\",\"road_dir\":2,\"create_time\"" + ":\"2022-03-28\"}"; int num = 0; while (true) { System.out.println(">>>>>>"); System.out.println(meesage); // 超过10000条停止 if (num > 10000) { break; } num++; // 发送kafka小心 json producer.send(new ProducerRecord<>("rsl_bas_road_base_kafka_3", String.valueOf(num), meesage)); // try { // Thread.sleep(1); // } catch (InterruptedException e) { // e.printStackTrace(); // } } } producer.close(); } } 6、查看es中的数据?

如下是查询特定的,加了filter过滤条件

curl -XGET 'http://127.0.0.1:8200/index_test/_search?pretty' -H 'Content-Type:application/json' -d '{ "query": { "match":{ "id":33 } } }'

结果展示:

由于之前mock了少量数据,方便查询。后面才是写亿级别的数据,进行 kafka 和 es 的压测的。下图是有?3000万以上的数据时,查询还是很快的。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #写入es #版本说明flink #112 #es #361查询 #flink官网