irpas技术客

flink 源码分析1之RichSinkFunction_普通网友_flink richsinkfunction

irpas 7154

flink sink 2 mysql demo

我们先看一个自定义sink 的demo,将 nc 的数据写入到mysql 中。

import myflink.learn.model.Student; import myflink.learn.sink.SinkToMySQL; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.concurrent.atomic.AtomicInteger; /** * @Author wtx * @Date 2019/1/23 */ public class Flink2MysqlDemo { public static void main(String[] args) throws Exception { AtomicInteger atomicInteger = new AtomicInteger(0); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(1); // 设置数据源 DataStream<String> text = env.socketTextStream("localhost", 9000, "\n"); // DataStream<Student> studentDataStream = text.map(new MapFunction<String, Student>() { // @Override // public Student map(String s) throws Exception { // Student student = new Student(); // student.setName(s); // student.setId(atomicInteger.addAndGet(1)); // return student; // } // }); DataStream<Student> studentDataStream = text.map((str) -> { Student student = new Student(); student.setName(str); student.setId(atomicInteger.addAndGet(1)); return student; }); studentDataStream.addSink(new SinkToMySQL()); env.execute(); } } @Slf4j public class SinkToMySQL extends RichSinkFunction<Student> { PreparedStatement ps; private Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = getConnection(); String sql = "insert into student(id, name) values(?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //关闭连接和释放资源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } @Override public void invoke(Student value, Context context) throws Exception { //组装数据,执行插入操作 ps.setInt(1, value.getId()); ps.setString(2, value.getName()); ps.executeUpdate(); } private static Connection getConnection() { Connection con = null; try { Class.forName("com.mysql.jdbc.Driver"); con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", ""); } catch (Exception e) { log.error("exception e:", e); } return con; } } RichSinkFunction 的类结构

可以看到自定义的sink 继承自RichSinkFunction. 来看 RichSinkFunction 的类结构

?

/** * The base interface for all user-defined functions. * * <p>This interface is empty in order to allow extending interfaces to * be SAM (single abstract method) interfaces that can be implemented via Java 8 lambdas.</p> */ @Public public interface Function extends java.io.Serializable { } public interface SinkFunction<IN> extends Function, Serializable{ default void invoke(IN value, Context context) throws Exception { invoke(value); } @Public interface Context<T> { long currentProcessingTime(); long currentWatermark(); Long timestamp(); } }

在上面的 SinkFunction 接口中实际只有一个方法,invoke(),将类型为IN 的value 写入到sink 中。Context: 写入value 时的上下文?

@Public public abstract class AbstractRichFunction implements RichFunction, Serializable { private transient RuntimeContext runtimeContext; @Override public void open(Configuration parameters) throws Exception {} @Override public void close() throws Exception {} }

?

而在AbstractRichFunction 只有默认的生命周期方法 open() 和 close() 的空实现。 留给我们自己的比如上面的 SinkToMySQL那样 实现 对于mysql 的 open() close() 另外可以类似的实现对于redis 的sink 类。查看flink-connector-redis 发现已经有了RedisSink 类。 我们先来看看简单的使用:只需要将 new SinkToMySQL() -> new RedisSink

studentDataStream.addSink(new SinkToMySQL()); ==> FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").build(); studentDataStream.addSink(new RedisSink<Tuple2<String, Integer>>(conf, new RedisExampleMapper())); public static final class RedisExampleMapper implements RedisMapper<Tuple2<String, Integer>> { public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "flink"); } public String getKeyFromData(Tuple2<String, Integer> data) { return data.f0; } public String getValueFromData(Tuple2<String, Integer> data) { return data.f1.toString(); } }

?回到 RedisSink,也是通过继承自RichSinkFunction:

public class RedisSink<IN> extends RichSinkFunction<IN> { public RedisSink(FlinkJedisConfigBase flinkJedisConfigBase, RedisMapper<IN> redisSinkMapper) { Preconditions.checkNotNull(flinkJedisConfigBase, "Redis connection pool config should not be null"); Preconditions.checkNotNull(redisSinkMapper, "Redis Mapper can not be null"); Preconditions.checkNotNull(redisSinkMapper.getCommandDescription(), "Redis Mapper data type description can not be null"); this.flinkJedisConfigBase = flinkJedisConfigBase; this.redisSinkMapper = redisSinkMapper; RedisCommandDescription redisCommandDescription = redisSinkMapper.getCommandDescription(); this.redisCommand = redisCommandDescription.getCommand(); this.additionalKey = redisCommandDescription.getAdditionalKey(); } }

?通过传入: conf 和 redisSinkMapper 构造出来RedisSink,然后 override invoke().

@Override public void invoke(IN input) throws Exception { String key = redisSinkMapper.getKeyFromData(input); String value = redisSinkMapper.getValueFromData(input); switch (redisCommand) { case RPUSH: this.redisCommandsContainer.rpush(key, value); break; case LPUSH: this.redisCommandsContainer.lpush(key, value); break; case SADD: this.redisCommandsContainer.sadd(key, value); break; case SET: this.redisCommandsContainer.set(key, value); break; case PFADD: this.redisCommandsContainer.pfadd(key, value); break; case PUBLISH: this.redisCommandsContainer.publish(key, value); break; case ZADD: this.redisCommandsContainer.zadd(this.additionalKey, value, key); break; case HSET: this.redisCommandsContainer.hset(this.additionalKey, key, value); break; default: throw new IllegalArgumentException("Cannot process such data type: " + redisCommand); } }

?

通过上面的 switch 我们知道,flink -> redis 目前只支持8个最基础的redisCommand,要想调用其他的redisCommand,目前看还是需要自己实现。 另外在 RedisSink 中 还Override 了open 和 close 方法实现了对于redis 的连接和关闭。

RedisSink open() 方法

我们来看看open()

@Override public void open(Configuration parameters) throws Exception { this.redisCommandsContainer = RedisCommandsContainerBuilder.build(this.flinkJedisConfigBase); }

?

这里使用 RedisCommandsContainerBuilder 构造了一个 redisCommandsContainer,RedisCommandsContainer 是一个 接口(The container for all available Redis commands.) 刚好对应上面的 switch (redisCommand)

void hset(String key, String hashField, String value); void rpush(String listName, String value); void lpush(String listName, String value); void sadd(String setName, String value); void publish(String channelName, String message); void set(String key, String value); void pfadd(String key, String element); void zadd(String key, String score, String element); void close() throws IOException;

?

在来看 RedisCommandsContainerBuilder。它通过flinkJedisConfigBase 构造出来redisCommandsContainer。其中 FlinkJedisConfigBase 定义了4个redis 连接时常用的属性connectionTimeout,maxTotal,maxIdle,minIdle

public abstract class FlinkJedisConfigBase implements Serializable { private static final long serialVersionUID = 1L; protected final int maxTotal; protected final int maxIdle; protected final int minIdle; protected final int connectionTimeout; protected FlinkJedisConfigBase(int connectionTimeout, int maxTotal, int maxIdle, int minIdle){ Preconditions.checkArgument(connectionTimeout >= 0, "connection timeout can not be negative"); Preconditions.checkArgument(maxTotal >= 0, "maxTotal value can not be negative"); Preconditions.checkArgument(maxIdle >= 0, "maxIdle value can not be negative"); Preconditions.checkArgument(minIdle >= 0, "minIdle value can not be negative"); this.connectionTimeout = connectionTimeout; this.maxTotal = maxTotal; this.maxIdle = maxIdle; this.minIdle = minIdle; }

?回到RedisCommandsContainerBuilder,可以看到FlinkJedisPoolConfig 的实现类有3种,对应

FlinkJedisPoolConfig jedis 连接池的方式 FlinkJedisClusterConfig redis cluster 方式 FlinkJedisSentinelConfig redis sentinel 方式

public static RedisCommandsContainer build(FlinkJedisConfigBase flinkJedisConfigBase){ if(flinkJedisConfigBase instanceof FlinkJedisPoolConfig){ FlinkJedisPoolConfig flinkJedisPoolConfig = (FlinkJedisPoolConfig) flinkJedisConfigBase; return RedisCommandsContainerBuilder.build(flinkJedisPoolConfig); } else if (flinkJedisConfigBase instanceof FlinkJedisClusterConfig) { FlinkJedisClusterConfig flinkJedisClusterConfig = (FlinkJedisClusterConfig) flinkJedisConfigBase; return RedisCommandsContainerBuilder.build(flinkJedisClusterConfig); } else if (flinkJedisConfigBase instanceof FlinkJedisSentinelConfig) { FlinkJedisSentinelConfig flinkJedisSentinelConfig = (FlinkJedisSentinelConfig) flinkJedisConfigBase; return RedisCommandsContainerBuilder.build(flinkJedisSentinelConfig); } else { throw new IllegalArgumentException("Jedis configuration not found"); } }

?我们看 jedis pool 的方式,最终生成一个 由jedisPool 构造的 RedisContainer

public static RedisCommandsContainer build(FlinkJedisPoolConfig jedisPoolConfig) { Preconditions.checkNotNull(jedisPoolConfig, "Redis pool config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisPoolConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisPoolConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisPoolConfig.getMinIdle()); JedisPool jedisPool = new JedisPool(genericObjectPoolConfig, jedisPoolConfig.getHost(), jedisPoolConfig.getPort(), jedisPoolConfig.getConnectionTimeout(), jedisPoolConfig.getPassword(), jedisPoolConfig.getDatabase()); return new RedisContainer(jedisPool); }

?

这里的RedisContainer 实现了 上面 RedisCommandsContainer 接口,并且通过jedis pool 的方式真正实现了接口中的hset 等8个方法。

同样的 redis cluster 方式 最终生成的一个 RedisClusterContainer

?

public static RedisCommandsContainer build(FlinkJedisClusterConfig jedisClusterConfig) { Preconditions.checkNotNull(jedisClusterConfig, "Redis cluster config should not be Null"); GenericObjectPoolConfig genericObjectPoolConfig = new GenericObjectPoolConfig(); genericObjectPoolConfig.setMaxIdle(jedisClusterConfig.getMaxIdle()); genericObjectPoolConfig.setMaxTotal(jedisClusterConfig.getMaxTotal()); genericObjectPoolConfig.setMinIdle(jedisClusterConfig.getMinIdle()); JedisCluster jedisCluster = new JedisCluster(jedisClusterConfig.getNodes(), jedisClusterConfig.getConnectionTimeout(), jedisClusterConfig.getMaxRedirections(), genericObjectPoolConfig); return new RedisClusterContainer(jedisCluster); }

同样: RedisClusterContainer 实现了 上面 RedisCommandsContainer 接口,并且通过jedis cluster 的方式真正实现了接口中的hset 等8个方法。

sink 2 kafka

flink 同样实现了 到 kafka 的写入,先将 SinkToMySQL 换成 FlinkKafkaProducer

Properties properties = new Properties(); properties.put("bootstrap.servers", "127.0.0.1:9092"); text.addSink(new FlinkKafkaProducer<>("flink_2_kafka_demo", new SimpleStringSchema(), properties));

?

除了 kafka 所需的 properties 外,还有个 SimpleStringSchema,按照上面的 Sink2Mysql 和 RedisSink,我们可以很容易的想到 FlinkKafkaProducer 实现了 RichSinkFunction,来看 源码:

public class FlinkKafkaProducer<IN> extends TwoPhaseCommitSinkFunction<IN, FlinkKafkaProducer.KafkaTransactionState, FlinkKafkaProducer.KafkaTransactionContext> { }

?

?

sink 2 kafka 比较复杂,其中 TwoPhaseCommitSinkFunction 除了实现写入kafka 消息外,还有 两阶段提交协议的实现。 <基本上还是依赖kafka的事务处理实现的,下篇文章在详细分析>

先来看构造函数

?

public FlinkKafkaProducer( String defaultTopicId, KeyedSerializationSchema<IN> serializationSchema, Properties producerConfig, Optional<FlinkKafkaPartitioner<IN>> customPartitioner, FlinkKafkaProducer.Semantic semantic, int kafkaProducersPoolSize) { super(new FlinkKafkaProducer.TransactionStateSerializer(), new FlinkKafkaProducer.ContextStateSerializer()); this.defaultTopicId = checkNotNull(defaultTopicId, "defaultTopicId is null"); this.schema = checkNotNull(serializationSchema, "serializationSchema is null"); this.producerConfig = checkNotNull(producerConfig, "producerConfig is null"); this.flinkKafkaPartitioner = checkNotNull(customPartitioner, "customPartitioner is null").orElse(null); this.semantic = checkNotNull(semantic, "semantic is null"); this.kafkaProducersPoolSize = kafkaProducersPoolSize; checkState(kafkaProducersPoolSize > 0, "kafkaProducersPoolSize must be non empty"); ClosureCleaner.clean(this.flinkKafkaPartitioner, true); ClosureCleaner.ensureSerializable(serializationSchema); // set the producer configuration properties for kafka record key value serializers. if (!producerConfig.containsKey(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)) { this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); } else { LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); } if (!producerConfig.containsKey(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)) { this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName()); } else { LOG.warn("Overwriting the '{}' is not recommended", ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); } // eagerly ensure that bootstrap servers are set. if (!this.producerConfig.containsKey(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)) { throw new IllegalArgumentException(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG + " must be supplied in the producer config properties."); } if (!producerConfig.containsKey(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG)) { long timeout = DEFAULT_KAFKA_TRANSACTION_TIMEOUT.toMilliseconds(); checkState(timeout < Integer.MAX_VALUE && timeout > 0, "timeout does not fit into 32 bit integer"); this.producerConfig.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, (int) timeout); LOG.warn("Property [{}] not specified. Setting it to {}", ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_KAFKA_TRANSACTION_TIMEOUT); } // Enable transactionTimeoutWarnings to avoid silent data loss // See KAFKA-6119 (affects versions 0.11.0.0 and 0.11.0.1): // The KafkaProducer may not throw an exception if the transaction failed to commit if (semantic == FlinkKafkaProducer.Semantic.EXACTLY_ONCE) { final Object object = this.producerConfig.get(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG); final long transactionTimeout; if (object instanceof String && StringUtils.isNumeric((String) object)) { transactionTimeout = Long.parseLong((String) object); } else if (object instanceof Number) { transactionTimeout = ((Number) object).longValue(); } else { throw new IllegalArgumentException(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG + " must be numeric, was " + object); } super.setTransactionTimeout(transactionTimeout); super.enableTransactionTimeoutWarnings(0.8); } this.topicPartitionsMap = new HashMap<>(); }

构造函数看起来很长,基本都是在给属性赋值。其中:

defaultTopicId: kafka的 tpoicId serializationSchema producerConfig customPartitioner semantic: kafkaProducersPoolSize: default KafkaProducers pool size

这里面有个 enum Semantic:

public enum Semantic { EXACTLY_ONCE, AT_LEAST_ONCE, NONE }

?

. Semantic.EXACTLY_ONCE 有且仅有一次 the Flink producer will write all messages in a Kafka transaction that will be committed to Kafka on a checkpoint. . Semantic.AT_LEAST_ONCE 最少一次 the Flink producer will wait for all outstanding messages in the Kafka buffers to be acknowledged by the Kafka producer on a checkpoint

代码量很大,我们先看两个简单的close和open,与mysql 和 redis 的close,open相比,代码行数也是比较大的。 close 方法中对于EXACTLY_ONCE 的Semantic,首先拿到 currentTransaction,如果不为空,flush(),对于AT_LEAST_ONCE 和 NONE 类型的,需要手动调用 currentTransaction.producer.close(); 然后 将pendingTransactions 的transaction closeQuietly。

@Override public void close() throws FlinkKafkaException { final FlinkKafkaProducer.KafkaTransactionState currentTransaction = currentTransaction(); if (currentTransaction != null) { // to avoid exceptions on aborting transactions with some pending records flush(currentTransaction); // normal abort for AT_LEAST_ONCE and NONE do not clean up resources because of producer reusing, thus // we need to close it manually switch (semantic) { case EXACTLY_ONCE: break; case AT_LEAST_ONCE: case NONE: currentTransaction.producer.close(); break; } } try { super.close(); } catch (Exception e) { asyncException = ExceptionUtils.firstOrSuppressed(e, asyncException); } // make sure we propagate pending errors checkErroneous(); pendingTransactions().forEach(transaction -> IOUtils.closeQuietly(transaction.getValue().producer) ); }

?open 方法比较简单,根据是否logFailuresOnly,构造不同的 Callback,用于在 发送给kafka消息成功后,调用不用的 Callback

@Override public void open(Configuration configuration) throws Exception { if (logFailuresOnly) { callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception e) { if (e != null) { LOG.error("Error while sending record to Kafka: " + e.getMessage(), e); } acknowledgeMessage(); } }; } else { callback = new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (exception != null && asyncException == null) { asyncException = exception; } acknowledgeMessage(); } }; } super.open(configuration); }

在来看看invoke方法,最终调用transaction里面的producer 去发送消息给kafka。 transaction.producer.send(record, callback);

?

@Override public void invoke(FlinkKafkaProducer.KafkaTransactionState transaction, IN next, Context context) throws FlinkKafkaException { checkErroneous(); byte[] serializedKey = schema.serializeKey(next); byte[] serializedValue = schema.serializeValue(next); String targetTopic = schema.getTargetTopic(next); if (targetTopic == null) { targetTopic = defaultTopicId; } Long timestamp = null; if (this.writeTimestampToKafka) { timestamp = context.timestamp(); } ProducerRecord<byte[], byte[]> record; int[] partitions = topicPartitionsMap.get(targetTopic); if (null == partitions) { partitions = getPartitionsByTopic(targetTopic, transaction.producer); topicPartitionsMap.put(targetTopic, partitions); } if (flinkKafkaPartitioner != null) { record = new ProducerRecord<>( targetTopic, flinkKafkaPartitioner.partition(next, serializedKey, serializedValue, targetTopic, partitions), timestamp, serializedKey, serializedValue); } else { record = new ProducerRecord<>(targetTopic, null, timestamp, serializedKey, serializedValue); } /** * pendingRecords 是一个AtomicLong * private final AtomicLong pendingRecords = new AtomicLong(); */ pendingRecords.incrementAndGet(); transaction.producer.send(record, callback); }

?里面有个flinkKafkaPartitioner

public abstract class FlinkKafkaPartitioner<T> implements Serializable{ public void open(int parallelInstanceId, int parallelInstances) { // overwrite this method if needed. } // Determine the id of the partition that the record should be written to // 决定了record 应该写入到哪个分区。返回该分区的id public abstract int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions); }

?目前 flink 中只剩下一个具体的实现,partitions[parallelInstanceId % partitions.length];

public class FlinkFixedPartitioner<T> extends FlinkKafkaPartitioner<T> { @Override public int partition(T record, byte[] key, byte[] value, String targetTopic, int[] partitions) { return partitions[parallelInstanceId % partitions.length]; } }

?

总结

flink 通过 继承 RichSinkFunction 实现对不同存储的sink。并且只需要 overide 里面的open,close,invoke 三个方法即可

硬核资料:关注即私信或(点击获取)可领取行业经典书籍PDF。 技术互助:技术群大佬指点迷津,你的问题可能不是问题,求资源在(技术群)里喊一声。 面试题库:由P8大佬们共同投稿,热乎的大厂面试真题,持续更新中。(点击获取) 知识体系:含编程语言、算法、大数据生态圈组件(Mysql、Hive、Spark、Flink)、数据仓库? ?


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #Sink #2 #MySQL #的demo将