irpas技术客

一次FlinkSQL + Stream API 的简易尝试(涉及TOPN,JOIN,滑动窗口,动态开窗)_大小不是白_flink sql stream

irpas 946

一次FlinkSQL的简易尝试(涉及TOPN,JOIN,滑动窗口,动态开窗)

假象需求: 根据不同的Key,实现不同的时间范围金额求和

尝试方案 1、TableA保存Key,时间间隔,最后一次更新时间,获取每个Key的最新时间间隔数据 TableB为数据,将TableA的时间间隔打在TableB中,通过where过滤求sum

2、TableA保存Key,时间间隔,最后一次更新时间,获取每个Key的最新时间间隔数据 TableB为数据,将TableA的时间间隔打在TableB中,生成表TableC,针对TableC滑动方式动态开窗,求sum

3、不使用FlinkSQL,使用Flink Stream API + 状态实现

4、自定义WindowAssigner

方案结论 方案一:sum求和的数据,不会被清理,值会一直累加,失败 方案二:FlinkSQL,窗口不支持动态开窗,最后求和无法实现,失败 方案三:可行 方案四:可行

欢迎各位提供各种想法,有时间的可以一起来尝试下。

方案一/方案二:代码逻辑 TableA:Key、时间间隔

SingleOutputStreamOperator<TableInterval> keyTimeInterval = env.socketTextStream("localhost", 9999) .map(new MapFunction<String, TableInterval>() { @Override public TableInterval map(String value) throws Exception { String[] strings = value.split(","); return new TableInterval(strings[0],Long.parseLong(strings[1]),Long.parseLong(strings[2]) ); } });

TableB:基础数据

SingleOutputStreamOperator<TableData> keyData = env.socketTextStream("localhost", 9998) .map(new MapFunction<String, TableData>() { @Override public TableData map(String value) throws Exception { String[] strings = value.split(","); return new TableData(strings[0],Integer.parseInt(strings[1]),Long.parseLong(strings[2])); } });

TableC:基础数据 + 时间间隔 JOIN

Table tableA = tableEnv.fromDataStream(keyTimeInterval); Table tableB = tableEnv.fromDataStream(keyData); tableEnv.createTemporaryView("tableA",tableA); tableEnv.createTemporaryView("tableB",tableB); Table tableA2 = tableEnv.sqlQuery("" + " select " + " a.key, " + " a.interval_b " + " from( " + " select " + " key, " + " interval_b, " + " ROW_NUMBER() over(partition by key order by updata_time DESC) as rn " + " from tableA " + " )a " + " where a.rn = 1 "); Table tableA3 = tableEnv.sqlQuery(" select " + " a.*, " + " b.interval_b " + " from tableB a " + " inner join " + tableA2 + " b " + " on a.key = b.key " );

滑动窗口 求和

tableEnv.createTemporaryView( "tableA3", tableDetailDataSingleOutputStreamOperator, $("key"), $("money"), $("pay_time"), $("interval_b"), $("pt").proctime()); Table tableC = tableEnv.sqlQuery(" select " + " key, " + " sum(money), " + " hop_start(pt, INTERVAL '1' minute, INTERVAL interval_b minute) as winstart " + " from tableA3 " + " group by " + " key, " + " hop(pt, INTERVAL '1' minute, INTERVAL interval_b minute) ");

方案三:使用Flink Stream API + 状态实现 A流、B流,同TableA、TableB

.process(new KeyedProcessFunction<String, TableDetailData, Tuple2<String,Long>>() { // 定义状态 格式 id + 进入时间 , 数据 这里简单写了 private MapState<Long,Long> mapState; @Override public void open(Configuration Long) throws Exception { // 初始化状态 MapStateDescriptor<Long, Long> dataMapStateDescriptor = new MapStateDescriptor<>("map-state", Long.class, Long.class); mapState = getRuntimeContext().getMapState(dataMapStateDescriptor); } @Override public void processElement(TableDetailData value, Context ctx, Collector<Tuple2<String, Long>> out) throws Exception { Long sumMoney = 0L; ArrayList<Long> keyList = new ArrayList<>(); mapState.put(value.getPay_time(), (long) value.getMoney()); long timeEnd = ctx.timerService().currentProcessingTime(); long timeStart = timeEnd - value.getInterval_b(); String timeRange = "timeStart:"+timeStart + " ~ timeEnd:" + timeEnd; System.out.println(timeRange); Iterator<Map.Entry<Long, Long>> iterator = mapState.entries().iterator(); // while (iterator.hasNext()){ Map.Entry<Long, Long> next = iterator.next(); Long key = next.getKey(); if (key < timeStart){ // 迭代器无法在遍历的时候,对迭代器内的数据进行删除 mapState.remove(key); 不可以 keyList.add(key); }else { sumMoney += next.getValue(); } } // 遍历Key,删除状态数据 for (Long aLong : keyList) { mapState.remove(aLong); } out.collect(Tuple2.of(timeRange,sumMoney)); } })

方案四:自定义WindowAssigner 可以参考源码窗口那部分进行修改 A流、B流,同TableA、TableB

public static class MySlidingProcessingTimeWindows<T> extends WindowAssigner<Object, TimeWindow> { private static final long serialVersionUID = 1L; // 窗口大小 private long size; // 偏移量 private long offset; // 滑动步长 private long slide; public MySlidingProcessingTimeWindows() { } public MySlidingProcessingTimeWindows(long size, long slide, long offset) { if (Math.abs(offset) >= slide || size <= 0) { throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " + "abs(offset) < slide and size > 0"); } this.size = size; this.slide = slide; this.offset = offset; } @SneakyThrows @Override public Collection<TimeWindow> assignWindows(ClassA element, long timestamp, WindowAssignerContext context) { // 窗口大小随数据传入 size = element.getInterval(); // 滑动大小 3s slide = 3000L; // 偏移量:0 offset = 0L; timestamp = context.getCurrentProcessingTime(); List<TimeWindow> windows = new ArrayList<>((int) (size / slide)); long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide); for (long start = lastStart; start > timestamp - size; start -= slide) { windows.add(new TimeWindow(start, start + size)); } return windows; } public long getSize() { return size; } public long getSlide() { return slide; } @Override public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) { return ProcessingTimeTrigger.create(); } @Override public String toString() { return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")"; } @Override public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) { return new TimeWindow.Serializer(); } @Override public boolean isEventTime() { return false; } } }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #Flink #SQL #stream #JOIN