irpas技术客

Flink 消费 Kafka 数据实时落Apache doris数据仓库(KFD)_机器爱上学习_flink消费kafka数据

未知 4850

1.概述

Apache Doris(原百度 Palo)是一款基于大规模并行处理技术的分布式 SQL 数据仓库,由百度在 2017 年开源,2018 年 8 月进入 Apache 孵化器。

Apache Doris是一个现代化的MPP分析型数据库产品。仅需亚秒级响应时间即可获得查询结果,有效地支持实时数据分析。Apache Doris的分布式架构非常简洁,易于运维,并且可以支持10PB以上的超大数据集。

Apache Doris可以满足多种数据分析需求,例如固定历史报表,实时数据分析,交互式数据分析和探索式数据分析等。令您的数据分析工作更加简单高效!

2.场景介绍

这里我们介绍的是通过Doris提供的Stream load 结合Flink计算引擎怎么实现数据实时快速入库操作。

使用环境如下:

mysql 5.x/8.x (主要是业务数据库) kafka 2.11 (消息队列) flink 1.10.1 (流式计算引擎) doris 0.14.7 (核心数仓) Canal (Mysql binlog数据采集工具) 3.实现方案

这里我们采用的历史数据离线处理+增量数据实时处理的架构

3.1 历史数据离线处理

历史数据离线处理方式,这里我们使用是Doris ODBC外表方式,将mysql的表映射到doris里,然后使用

insert into <doris_table_name> select * from <mysql_odbc_external_table> 3.1.1 外表创建方法

首先Apache Doris 0.13.x以上版本 要在所有的BE节点安装对应数据的ODBC驱动 创建外表 具体可以参考我的另外一篇文章,这里不多做介绍

[Apache doris ODBC外表使用方式]

https://mp.weixin.qq.com/s/J0suRGPNkxD6oHSRFK6KTA 3.2 增量数据实时处理

增量数据的实时处理,这里我们是通过 Canal 监控 Mysql binlog 解析并推送到指定的 Kafka 队列,然后通过 Flink 去实时消费Kafka队列的数据,然后你可以根据自己的需要对数据进行处理,算法等,最后将明细数据或者实时计算的中间结果保存到对应的doris数据表中,这里使用的是stream load,你可以使用Flink doris connector。

3.2.1 doris sink实现

这里我们首先实现一个Flink doris sink

import com.alibaba.fastjson.JSON; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; ? import java.util.ArrayList; import java.util.Arrays; import java.util.List; ? /** * 自定义flink doris sink */ public class DorisSink extends RichSinkFunction<String> { ? private static final Logger log = LoggerFactory.getLogger(DorisSink.class); ? private final static List<String> DORIS_SUCCESS_STATUS = new ArrayList<>(Arrays.asList("Success", "Publish Timeout")); ? private DorisStreamLoad dorisStreamLoad; ? private String columns; ? private String jsonFormat; ? public DorisSink(DorisStreamLoad dorisStreamLoad, String columns, String jsonFormat) { this.dorisStreamLoad = dorisStreamLoad; this.columns = columns; this.jsonFormat = jsonFormat; } ? @Override public void open(Configuration parameters) throws Exception { super.open(parameters); } ? ? /** * 判断StreamLoad是否成功 * * @param respContent streamload返回的响应信息(JSON格式) * @return */ public static Boolean checkStreamLoadStatus(RespContent respContent) { if (DORIS_SUCCESS_STATUS.contains(respContent.getStatus()) && respContent.getNumberTotalRows() == respContent.getNumberLoadedRows()) { return true; } else { return false; } } ? @Override public void invoke(String value, Context context) throws Exception { DorisStreamLoad.LoadResponse loadResponse = dorisStreamLoad.loadBatch(value, columns, jsonFormat); if (loadResponse != null && loadResponse.status == 200) { RespContent respContent = JSON.parseObject(loadResponse.respContent, RespContent.class); if (!checkStreamLoadStatus(respContent)) { log.error("Stream Load fail{}:", loadResponse); } } else { log.error("Stream Load Request failed:{}", loadResponse); } } } 3.2.2 Stream Load 工具类 import org.slf4j.Logger; import org.slf4j.LoggerFactory; ? ? import java.io.Serializable; import java.io.IOException; import java.io.BufferedOutputStream; import java.io.InputStream; import java.io.BufferedReader; import java.io.InputStreamReader; import java.net.HttpURLConnection; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Base64; import java.util.Calendar; import java.util.UUID; ? ? /** * doris streamLoad */ ? public class DorisStreamLoad implements Serializable { ? private static final Logger log = LoggerFactory.getLogger(DorisStreamLoad.class); //连接地址,这里使用的是连接FE private static String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?"; //fe ip地址 private String hostPort; //数据库 private String db; //要导入的数据表名 private String tbl; //用户名 private String user; //密码 private String passwd; private String loadUrlStr; private String authEncoding; ? ? public DorisStreamLoad(String hostPort, String db, String tbl, String user, String passwd) { this.hostPort = hostPort; this.db = db; this.tbl = tbl; this.user = user; this.passwd = passwd; this.loadUrlStr = String.format(loadUrlPattern, hostPort, db, tbl); this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", user, passwd).getBytes(StandardCharsets.UTF_8)); } //获取http连接信息 private HttpURLConnection getConnection(String urlStr, String label, String columns, String jsonformat) throws IOException { URL url = new URL(urlStr); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setInstanceFollowRedirects(false); conn.setRequestMethod("PUT"); conn.setRequestProperty("Authorization", "Basic " + authEncoding); conn.addRequestProperty("Expect", "100-continue"); conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8"); conn.addRequestProperty("label", label); conn.addRequestProperty("max_filter_ratio", "0"); conn.addRequestProperty("strict_mode", "true"); conn.addRequestProperty("columns", columns); conn.addRequestProperty("format", "json"); conn.addRequestProperty("jsonpaths", jsonformat); conn.addRequestProperty("strip_outer_array", "true"); conn.setDoOutput(true); conn.setDoInput(true); ? return conn; } ? public static class LoadResponse { public int status; public String respMsg; public String respContent; ? public LoadResponse(int status, String respMsg, String respContent) { this.status = status; this.respMsg = respMsg; this.respContent = respContent; } ? @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("status: ").append(status); sb.append(", resp msg: ").append(respMsg); sb.append(", resp content: ").append(respContent); return sb.toString(); } } //执行数据导入 public LoadResponse loadBatch(String data, String columns, String jsonformat) { Calendar calendar = Calendar.getInstance(); //导入的lable,全局唯一 String label = String.format("flink_import_%s%02d%02d_%02d%02d%02d_%s", calendar.get(Calendar.YEAR), calendar.get(Calendar.MONTH) + 1, calendar.get(Calendar.DAY_OF_MONTH), calendar.get(Calendar.HOUR_OF_DAY), calendar.get(Calendar.MINUTE), calendar.get(Calendar.SECOND), UUID.randomUUID().toString().replaceAll("-", "")); ? HttpURLConnection feConn = null; HttpURLConnection beConn = null; try { // build request and send to fe feConn = getConnection(loadUrlStr, label, columns, jsonformat); int status = feConn.getResponseCode(); // fe send back http response code TEMPORARY_REDIRECT 307 and new be location if (status != 307) { throw new Exception("status is not TEMPORARY_REDIRECT 307, status: " + status); } String location = feConn.getHeaderField("Location"); if (location == null) { throw new Exception("redirect location is null"); } // build request and send to new be location beConn = getConnection(location, label, columns, jsonformat); // send data to be BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream()); bos.write(data.getBytes()); bos.close(); ? // get respond status = beConn.getResponseCode(); String respMsg = beConn.getResponseMessage(); InputStream stream = (InputStream) beConn.getContent(); BufferedReader br = new BufferedReader(new InputStreamReader(stream)); StringBuilder response = new StringBuilder(); String line; while ((line = br.readLine()) != null) { response.append(line); } return new LoadResponse(status, respMsg, response.toString()); ? } catch (Exception e) { e.printStackTrace(); String err = "failed to load audit via AuditLoader plugin with label: " + label; log.warn(err, e); return new LoadResponse(-1, e.getMessage(), err); } finally { if (feConn != null) { feConn.disconnect(); } if (beConn != null) { beConn.disconnect(); } } } ? } 3.2.3 Flink Job

这个地方演示的是单表,如果是你通过Canal监听的多个表的数据,这里你需要根据表名进行区分,并和你mysql表和doris里的表建好对应关系,解析相应的数据即可

import org.apache.doris.demo.flink.DorisSink; import org.apache.doris.demo.flink.DorisStreamLoad; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; ? import java.util.Properties; ? /** * * This example mainly demonstrates how to use flink to stream Kafka data. * And use the doris streamLoad method to write the data into the table specified by doris * <p> * Kafka data format is an array, For example: ["id":1,"name":"root"] */ ? public class FlinkKafka2Doris { //kafka address private static final String bootstrapServer = "xxx:9092,xxx:9092,xxx:9092"; //kafka groupName private static final String groupName = "test_flink_doris_group"; //kafka topicName private static final String topicName = "test_flink_doris"; //doris ip port private static final String hostPort = "xxx:8030"; //doris dbName private static final String dbName = "db1"; //doris tbName private static final String tbName = "tb1"; //doris userName private static final String userName = "root"; //doris password private static final String password = ""; //doris columns private static final String columns = "name,age,price,sale"; //json format private static final String jsonFormat = "[\"$.name\",\"$.age\",\"$.price\",\"$.sale\"]"; ? public static void main(String[] args) throws Exception { ? Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupName); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "earliest"); props.put("max.poll.records", "10000"); ? StreamExecutionEnvironment blinkStreamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); blinkStreamEnv.enableCheckpointing(10000); blinkStreamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); ? FlinkKafkaConsumer<String> flinkKafkaConsumer = new FlinkKafkaConsumer<>(topicName, new SimpleStringSchema(), props); ? DataStreamSource<String> dataStreamSource = blinkStreamEnv.addSource(flinkKafkaConsumer); ? DorisStreamLoad dorisStreamLoad = new DorisStreamLoad(hostPort, dbName, tbName, userName, password); ? dataStreamSource.addSink(new DorisSink(dorisStreamLoad,columns,jsonFormat)); ? blinkStreamEnv.execute("flink kafka to doris"); ? } }

然后将Flink Job提交到集群上就可以运行了,数据就可以试试入库

这里其实是一个微批处理,你可以自己完善以下几部分:

每个批次最大入库记录数,或者每个多少秒进行一次入库,如果你的实时数据量比较小,或者你的数据比较大,这两条件哪个先到执行哪个 这里连接是FE,你可以通过FE的 rest api接口拿到所有的BE节点,直接连接BE进行入库,URL地址只是将FE的ip和端口换成BE的IP及http 端口即可 为了避免你连接这个BE或者FE的时候,正好这个节点挂了,你可以进行重试其他FE或者BE 为了避免单个节点压力,你可以进行轮训BE节点,不要每次都连接同一个BE节点 设置最大重试次数,如果超过这个次数,可以将导入失败的数据推送到Kafka队列,以方便后续人工手动处理

4.总结

本文只是抛砖引玉的方式给大家一个使用Stream load进行数据接入的使用方式及示例,Doris还有很多数据接入的方式等待大家去探索


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink消费kafka数据 #1概述Apache #Doris原百度 #SQL #数据仓库由百度在 #2017 #年开源2018