irpas技术客

基于Flink 1.13.2问题集锦_cuichunchi

网络投稿 7517

问题:

问题:flink sql 自定义函数时,使用map作为缓存,但是每次有时候会被清空缓存数据。打印的线程id也是相同的,后面有时间看看源码下。

解决:设置为静态缓存类private static final ......

package com.nj.snx.app.functions; import com.nj.snx.common.LruLinkedHashMap; import org.apache.commons.lang3.StringUtils; import org.apache.flink.table.functions.ScalarFunction; import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.function.Consumer; /** * @author :cuicc * @date : 2022/5/16 9:32 */ public class ContainsIndexFunction extends ScalarFunction { //设置静态 private static final LruLinkedHashMap<String,List<String>> quotaIdMaps = new LruLinkedHashMap<>(50); /** * @param businessKey 流水号 * @param quotas 以逗号隔开的 需要计算的衍生指标id PC_CODE-INDEX_ID * @param indexId 所有配置中的衍生指标id * @return */ public boolean eval(String businessKey,String quotas,String indexId){ // System.out.println("自定义函数【需要加工的衍生指标ID】"+quotas+",获取的indexId:"+indexId); if(StringUtils.isNotEmpty(quotas)){ List<String> strings = quotaIdMaps.get(businessKey); System.out.println("map----->"+strings+":businessKey:"+businessKey+";quotas:"+quotas+";indexId:"+indexId+":当前线程:"+Thread.currentThread().getId()); if(strings != null && strings.contains(indexId)){ System.out.println("命中:"+indexId+":businessKey:"+businessKey+";quotas:"+quotas+":当前线程:"+Thread.currentThread().getId()); return true; }else if(strings == null){ System.out.println("为null:"+strings+":businessKey:"+businessKey+";quotas:"+quotas+":当前线程:"+Thread.currentThread().getId()); String[] quotaIds = quotas.split(","); List<String> quotaIdsList = Arrays.asList(quotaIds); for (int i = 0; i < quotaIdsList.size(); i++) { quotaIdsList.set(i,quotaIdsList.get(i).split("-")[0]); } quotaIdMaps.put(businessKey,quotaIdsList); if(quotaIdsList.contains(indexId)){ return true; } } } return false; } }

?

问题:使用flink sql的if(条件,a,b)的时候,如果b设置为null的话,报错:?org.apache.calcite.sql.validate.SqlValidatorException: Illegal use of 'NULL'。

解决:IF(type=1,2,CAST(NULL AS INT))或者IF(type=1,2,CAST(NULL AS STRING))即可解决。

问题:使用over()开窗函数:执行后报Flink Window TOPN: The window can only be ordered in ASCENDING mode.

解决:外套一层,取TOP-N之后就不会报错了。

1、使用flink mysql cdc 发现bigint unsigned类型的字段,capture以后转成了字符串类型,用的这个解析吧JsonDebeziumDeserializationSchema。

解决:在设置debeziumProperties方法里传入以下参数:

properties.setProperty("bigint.unsigned.handling.mode","long");

properties.setProperty("decimal.handling.mode","double");

2、使用Flink-connector-kafka-2.11的连接器去消费某个topic时,在kafka服务器中查询不到消费者组?

解:

bin/kafka-consumer-groups.sh --bootstrap-server s203:9092 --group ddd --describe

没有对应的消费者。[FLINK-11325] Flink Consumer Kafka Topic Not Found ConsumerID - ASF JIRAhttps://issues.apache.org/jira/browse/FLINK-11325

上面链接就是反映这个问题,其实就是flink没有使用kafka的特性,直接控制kafka的消费,而不是交由kafka去消费,connector自己实现了FlinkKafkaConsumer,且没有按照kafka的feature实现coordinator以及JOIN_GROUOP的逻辑,导致在服务器中没有查询到消费者组。?

从kafka服务器查询不到,但是这些指标可以在Flink Web的Metrics指标中可以查询到,

?

currentOffsets:当前偏移量committedOffsets:已提交的偏移量。


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #基于flink #1132问题集锦 #问题1使用flink #MySQL #CDC #发现bigint