irpas技术客

Flink CDC Oracle 完整踩坑指南_ABECCDF_flink oracle

大大的周 4840

Flink CDC Oracle 完整踩坑指南 1. flink-cdc同步oracle表的增量数据

试用环境:

**Oracle:**11.2.0.4.0(RAC 部署)

**Flink:**1.12.0

通过 Flink 集群方式部署使用。

完整代码实现:

package com.nari.cdc.job; /** * 同步oracle指定表 发送到kafka * * @author gym * @version v1.0 * @description: * @date: 2022/3/31 14:25 */ import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.serializer.SerializerFeature; import com.nari.cdc.domain.OracleDataObj; import com.nari.cdc.func.KafkaSink; import com.nari.cdc.utils.LocalFileConfigParam; import com.ververica.cdc.connectors.oracle.OracleSource; import com.ververica.cdc.connectors.oracle.table.StartupOptions; import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema; import org.apache.commons.collections.MapUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.codehaus.plexus.util.StringUtils; import java.io.IOException; import java.util.Arrays; import java.util.Properties; import java.util.stream.Collectors; public class OracleToKafka { public static void main(String[] args) throws Exception { Properties properties = new Properties(); properties.setProperty("debezium.database.tablename.case.insensitive", "false"); properties.setProperty("debezium.log.mining.strategy", "online_catalog"); properties.setProperty("debezium.log.mining.continuous.mine", "true"); properties.setProperty("scan.startup.mode", "latest-offset"); //properties.setProperty("debezium.snapshot.mode", "latest-offset"); String user = LocalFileConfigParam.getPropertiesString("dataSource.user", "SEA3000"); String password = LocalFileConfigParam.getPropertiesString("dataSource.password", "SEA3000"); String tableStr = LocalFileConfigParam.getPropertiesString("monitor.tableList", ""); String host = LocalFileConfigParam.getPropertiesString("dataSource.host", "localhost"); Integer port = LocalFileConfigParam.getPropertiesInt("dataSource.port", 1521); String serviceName = LocalFileConfigParam.getPropertiesString("dataSource.serviceName", "ORCL"); String[] tableArr; if (tableStr.indexOf(",") > 0) { tableArr = Arrays.stream(tableStr.split(",")).map(s -> user + "." + s).toArray(String[]::new); } else { tableArr = new String[]{user + "." + tableStr}; } SourceFunction<String> sourceFunction = OracleSource.<String>builder() .hostname(host) .port(port) .database(serviceName) // monitor XE database .schemaList(user) // monitor inventory schema NH_CJ_DYDLQX,NH_CJ_DNSZQX,NH_CJ_GLQX .tableList(tableArr) // monitor products table .username(user) .password(password) .debeziumProperties(properties) .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String //只读取增量的 注意:不设置默认是先全量读取表然后增量读取日志中的变化 .startupOptions(StartupOptions.latest()) .build(); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); setEnvProperties(env); DataStreamSource<String> dataStreamSource = env.addSource(sourceFunction).setParallelism(1); SingleOutputStreamOperator<Tuple2<String, String>> streamOperator = dataStreamSource.filter(s -> StringUtils.isNotEmpty(s)).map(new MapFunction<String, OracleDataObj>() { @Override public OracleDataObj map(String s) throws Exception { //转换数据 return JSONObject.parseObject(s, OracleDataObj.class); } }).filter(s -> MapUtils.isEmpty(s.getBefore())) //过滤掉非insert操作 .map(new MapFunction<OracleDataObj, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(OracleDataObj oracleDataObj) throws Exception { //封装topic和数据 String tableName = MapUtils.getString(oracleDataObj.getSource(), "table"); String jsonString = JSONObject.toJSONString(oracleDataObj.getAfter()); return Tuple2.of("NH_" + tableName, jsonString); } }); streamOperator.setParallelism(1).print(); //发送到kafka streamOperator.addSink(new KafkaSink()); env.execute("FlinkCDCOracle"); } private static void setEnvProperties(StreamExecutionEnvironment env) throws IOException { // 1. 状态后端配置 //env.setStateBackend(new MemoryStateBackend()); //env.setStateBackend(new FsStateBackend("")); // 这个需要另外导入依赖 env.setStateBackend(new RocksDBStateBackend("file:///rocksDb/fink-checkpoints")); //应用挂了的话,它默认会删除之前checkpoint数据,当然我们可以在代码中设置应用退出时保留checkpoint数据 env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 2. 检查点配置 (每300ms让jobManager进行一次checkpoint检查) env.enableCheckpointing(300); // 高级选项 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //Checkpoint的处理超时时间 env.getCheckpointConfig().setCheckpointTimeout(60000L); // 最大允许同时处理几个Checkpoint(比如上一个处理到一半,这里又收到一个待处理的Checkpoint事件) env.getCheckpointConfig().setMaxConcurrentCheckpoints(2); // 与上面setMaxConcurrentCheckpoints(2) 冲突,这个时间间隔是 当前checkpoint的处理完成时间与接收最新一个checkpoint之间的时间间隔 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000L); // 如果同时开启了savepoint且有更新的备份,是否倾向于使用更老的自动备份checkpoint来恢复,默认false env.getCheckpointConfig().setPreferCheckpointForRecovery(true); // 最多能容忍几次checkpoint处理失败(默认0,即checkpoint处理失败,就当作程序执行异常) env.getCheckpointConfig().setTolerableCheckpointFailureNumber(0); // 3. 重启策略配置 // 固定延迟重启(最多尝试3次,每次间隔10s) env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10000L)); // 失败率重启(在10分钟内最多尝试3次,每次至少间隔1分钟) env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.minutes(10), Time.minutes(1))); } } package com.nari.cdc.utils; import org.apache.commons.lang3.StringUtils; import java.io.IOException; import java.util.Properties; /** * 现场本地配置参数,系统启动时加载 * @author K.Zhu * 2016-10-17 */ public final class LocalFileConfigParam { private static LocalFileConfigParam localFileConfigParam = null; private Properties p = new Properties(); /** * 禁止new对象使用 */ private LocalFileConfigParam(){ try { p.load(LocalFileConfigParam.class.getResourceAsStream("/conf.properties")); } catch (IOException e) { e.printStackTrace(); } } /** * 禁止new对象使用 */ private LocalFileConfigParam(String filePath){ loadParam(filePath); } /** * 根据key值获取配置信息 * @param key * @return */ public String getConfigValue(String key){ return p.getProperty(key); } /** * 获取系统变量唯一对象句柄 * @return */ public static LocalFileConfigParam getInstance(){ if(localFileConfigParam == null){ synchronized(LocalFileConfigParam.class){ if(localFileConfigParam == null){ localFileConfigParam = new LocalFileConfigParam(); } } } return localFileConfigParam; } /** * 加载配置文件对全局变量赋值 * @param filePath */ public void loadParam(String filePath) { try { p.load(LocalFileConfigParam.class.getResourceAsStream(filePath)); } catch (IOException e) { e.printStackTrace(); } } public String toString(){ String value = ""; for(Object key : p.keySet()){ value = value + key + ":" + p.get(key) + "\n"; } return value; } public static String getPropertiesString(String key, String defaultValue) { String value = LocalFileConfigParam.getInstance().getConfigValue(key); return StringUtils.isBlank(value) ? defaultValue : value; } public static Integer getPropertiesInt(String key, Integer defaultValue) { String value = LocalFileConfigParam.getInstance().getConfigValue(key); return StringUtils.isEmpty(value) ? defaultValue : Integer.parseInt(value); } } package com.nari.cdc.domain; import java.io.Serializable; import java.util.Map; /** * Oracle数据实体 * * @author gym * @version v1.0 * @description: * @date: 2022/4/1 10:32 */ public class OracleDataObj implements Serializable { private static final long serialVersionUID = -3797899893684335135L; //更改之前的数据 private Map<String, Object> before; //更改之后的数据 private Map<String, Object> after; //数据源信息 private Map<String, Object> source; public Map<String, Object> getBefore() { return before; } public void setBefore(Map<String, Object> before) { this.before = before; } public Map<String, Object> getAfter() { return after; } public void setAfter(Map<String, Object> after) { this.after = after; } public Map<String, Object> getSource() { return source; } public void setSource(Map<String, Object> source) { this.source = source; } } package com.nari.cdc.func; import com.nari.cdc.utils.LocalFileConfigParam; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * 发送到kafka * @author gym * @version v1.0 * @description: * @date: 2022/4/1 15:57 */ public class KafkaSink extends RichSinkFunction<Tuple2<String, String>> { Producer producer = null; @Override public void invoke(Tuple2<String, String> value, Context context) throws Exception { producer.send(new ProducerRecord(value.f0, value.f1)); } @Override public void open(Configuration parameters) throws Exception { producer = createProducer(); } @Override public void close() throws Exception { if (producer != null) { producer.close(); } } /** * 创建Producer实例 */ public Producer<String, String> createProducer() { Properties properties = new Properties(); //配置文件里面的变量都是静态final类型的,并且都有默认的值 //用于建立与 kafka 集群连接的 host/port //继承的hashtable,保证了线程安全 properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, LocalFileConfigParam.getPropertiesString("kafka.bootstrap.servers", "localhost:9092")); /** * producer 需要 server 接收到数据之后发出的确认接收的信号,此项配置就是指 procuder需要多少个这样的确认信号。此配置实际上代表 * 了数据备份的可用性。以下设置为常用选项: * (1)acks=0: 设置为 0 表示 producer 不需要等待任何确认收到的信息。副本将立即加到socket buffer 并认为已经发送。没有任何保 * 障可以保证此种情况下 server 已经成功接收数据,同时重试配置不会发生作用(因为客户端不知道是否失败)回馈的 offset 会总是设置为-1; * (2)acks=1: 这意味着至少要等待 leader已经成功将数据写入本地 log,但是并没有等待所有 follower 是否成功写入。这种情况下,如 * 果 follower 没有成功备份数据,而此时 leader又挂掉,则消息会丢失。 * (3)acks=all: 这意味着 leader 需要等待所有备份都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的保证。 * (4)其他的设置,例如 acks=2 也是可以的,这将需要给定的 acks 数量,但是这种策略一般很少用 **/ properties.put(ProducerConfig.ACKS_CONFIG, "1"); /** 设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许 重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一 条消息出现要早 **/ properties.put(ProducerConfig.RETRIES_CONFIG, "0"); /** * producer 将试图批处理消息记录,以减少请求次数。这将改善 client 与 server 之间的性能。这项配置控制默认的批量处理消息字节数。 * 不会试图处理大于这个字节数的消息字节数。发送到 brokers 的请求将包含多个批量处理,其中会包含对每个 partition 的一个请求。 * 较小的批量处理数值比较少用,并且可能降低吞吐量(0 则会仅用批量处理)。较大的批量处理数值将会浪费更多内存空间,这样就需要分配特 * 定批量处理数值的内存大小 **/ properties.put(ProducerConfig.BATCH_SIZE_CONFIG, "16384"); /** * producer 组将会汇总任何在请求与发送之间到达的消息记录一个单独批量的请求。通常来说,这只有在记录产生速度大于发送速度的时候才 * 能发生。然而,在某些条件下,客户端将希望降低请求的数量,甚至降低到中等负载一下。这项设置将通过增加小的延迟来完成--即,不是立即 * 发送一条记录,producer 将会等待给定的延迟时间以允许其他消息记录发送,这些消息记录可以批量处理。这可以认为是 TCP 种 Nagle 的算 * 法类似。这项设置设定了批量处理的更高的延迟边界:一旦我们获得某个 partition 的batch.size,他将会立即发送而不顾这项设置, * 然而如果我们获得消息字节数比这项设置要小的多,我们需要“linger”特定的时间以获取更多的消息。 这个设置默认为 0,即没有延迟。设 * 定 linger.ms=5,例如,将会减少请求数目,但是同时会增加 5ms 的延迟 **/ properties.put(ProducerConfig.LINGER_MS_CONFIG, "1"); /** * producer 可以用来缓存数据的内存大小。如果数据产生速度大于向 broker 发送的速度,将会耗尽这个缓存空间,producer * 会阻塞或者抛出异常,以“block.on.buffer.full”来表明。这项设置将和 producer 能够使用的总内存相关,但并不是一个 * 硬性的限制,因为不是producer 使用的所有内存都是用于缓存。一些额外的内存会用于压缩(如果引入压缩机制),同样还有一些 * 用于维护请求当缓存空间耗尽,其他发送调用将被阻塞,阻塞时间的阈值通过max.block.ms设定,之后它将抛出一个TimeoutException。 **/ properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG, "33554432"); /** * 该配置控制 KafkaProducer's send(),partitionsFor(),inittransaction (),sendOffsetsToTransaction(),commitTransaction() " * 和abortTransaction()方法将阻塞。对于send(),此超时限制了获取元数据和分配缓冲区的总等待时间" **/ properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000"); //将消息发送到kafka server, 所以肯定需要用到序列化的操作 我们这里发送的消息是string类型的,所以使用string的序列化类 properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); return new KafkaProducer<>(properties); } }

pom.xml

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://·blogs.com/liugh/p/8367671.html


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #oracle #FlinkCDC完整踩坑指南