flink中窗口函数(一)基础函数 (1)ReduceFunction

A ReduceFunction specifies how two elements from the input are combined to produce an output element of the same type. Flink uses a ReduceFunction to incrementally aggregate(增量聚合) the elements of a window.


DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });


/** * 测试ReduceFunction * */ public class TestReduceFunctionOnWindow { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据 DataStream<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH); //keyBy(0) 计算班级总成绩,下标0表示班级 //countWindow(2) 根据元素个数对数据流进行分组切片,达到2个,触发窗口进行计算 DataStream<Tuple3<String,String,Integer>> totalPoints = input.keyBy(0).countWindow(2).reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception { //效果如下: //(class1,张三,100) //(class1,李四,30) //============== System.out.println("" + value1); System.out.println("" + value2); System.out.println("=============="); return new Tuple3<>(value1.f0, value1.f1, value1.f2+value2.f2); } }); //输出结果 //效果如下: //2> (class1,张三,130) totalPoints.print(); env.execute("TestReduceFunctionOnWindow"); } /** * 定义班级的三元数组 */ public static final Tuple3[] ENGLISH = new Tuple3[]{ //班级 姓名 成绩 Tuple3.of("class1","张三",100), Tuple3.of("class1","李四",30), Tuple3.of("class1","王五",70), Tuple3.of("class2","赵六",50), Tuple3.of("class2","小七",40), Tuple3.of("class2","小八",10), }; } 结果: (class2,赵六,50) (class2,小七,40) ============== 1> (class2,赵六,90) (class1,张三,100) (class1,李四,30) ============== 2> (class1,张三,130) (2)AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。



/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());


/** * 测试AggFunction——求各个班级英语成绩平均分 * */ public class TestAggFunctionOnWindow { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据 DataStream<Tuple3<String, String, Long>> input = env.fromElements(ENGLISH); // 求各个班级英语成绩平均分 DataStream<Double> avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate()); avgScore.print(); env.execute("TestAggFunctionOnWindow"); } public static final Tuple3[] ENGLISH = new Tuple3[] { Tuple3.of("class1", "张三", 100L), Tuple3.of("class1", "李四", 40L), Tuple3.of("class1", "王五", 60L), Tuple3.of("class2", "赵六", 20L), Tuple3.of("class2", "小七", 30L), Tuple3.of("class2", "小八", 50L), }; //Tuple3<String, String, Long> 输入类型 //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 //Double 输出类型 public static class AverageAggrate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> { /** * 创建累加器保存中间状态(sum count) * * sum 英语总成绩 * count 学生个数 * * @return */ @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } /** * 将元素添加到累加器并返回新的累加器 * * @param value 输入类型 * @param acc 累加器ACC类型 * * @return 返回新的累加器 */ @Override public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) { //acc.f0 总成绩 //value.f2 表示成绩 //acc.f1 人数 return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L); } /** * 从累加器提取结果 * * @param longLongTuple2 * @return */ @Override public Double getResult(Tuple2<Long, Long> acc) { return ((double) acc.f0) / acc.f1; } /** * 累加器合并 * * @param longLongTuple2 * @param acc1 * @return */ @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) { return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1); } } } (3)ProcessWindowFunction/ProcessAllWindowFunction

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。

? 在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数据元素的结果计算, 例如对整个窗口数据排序取 TopN , 这样的需要就必须使用ProcessWindowFunction。

? ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

? WindowFunction的升级版,可以跟ReduceFunction/AggregateFunction/FoldFunction结合使用(推荐用法)

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }


public class TestProcessWinFunctionOnWindow { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据 DataStream<Tuple3<String,String,Long>> input = env.fromElements(ENGLISH); //求各班级英语成绩平均分 DataStream<Double> avgScore = input.keyBy(0) .countWindow(2) .process(new MyProcessWindowFunction()); avgScore.print(); env.execute("TestProcessWinFunctionOnWindow"); } public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double, Tuple, GlobalWindow>{ //iterable 输入流中的元素类型集合 @Override public void process(Tuple tuple, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> out) throws Exception { long sum = 0; long count = 0; for (Tuple3<String,String,Long> in :iterable){ sum+=in.f2; count++; } out.collect((double)(sum/count)); } } public static final Tuple3[] ENGLISH = new Tuple3[]{ Tuple3.of("class1","张三",100L), Tuple3.of("class1","李四",78L), Tuple3.of("class1","王五",99L), Tuple3.of("class2","赵六",81L), Tuple3.of("class2","小七",59L), Tuple3.of("class2","小八",97L), }; }


