irpas技术客

flink中窗口函数(一)基础函数_undo_try_flink窗口

未知 8381

flink中窗口函数(一)基础函数 (1)ReduceFunction

A ReduceFunction specifies how two elements from the input are combined to produce an output element of the same type. Flink uses a ReduceFunction to incrementally aggregate(增量聚合) the elements of a window.

ReduceFunction定义了如何把两个输入的元素进行合并来生成相同类型的输出元素的过程,Flink使用ReduceFunction来对窗口中的元素进行增量聚合

DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .reduce(new ReduceFunction<Tuple2<String, Long>> { public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) { return new Tuple2<>(v1.f0, v1.f1 + v2.f1); } });

举例:

/** * 测试ReduceFunction * */ public class TestReduceFunctionOnWindow { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据 DataStream<Tuple3<String,String,Integer>> input = env.fromElements(ENGLISH); //keyBy(0) 计算班级总成绩,下标0表示班级 //countWindow(2) 根据元素个数对数据流进行分组切片,达到2个,触发窗口进行计算 DataStream<Tuple3<String,String,Integer>> totalPoints = input.keyBy(0).countWindow(2).reduce(new ReduceFunction<Tuple3<String, String, Integer>>() { @Override public Tuple3<String, String, Integer> reduce(Tuple3<String, String, Integer> value1, Tuple3<String, String, Integer> value2) throws Exception { //效果如下: //(class1,张三,100) //(class1,李四,30) //============== System.out.println("" + value1); System.out.println("" + value2); System.out.println("=============="); return new Tuple3<>(value1.f0, value1.f1, value1.f2+value2.f2); } }); //输出结果 //效果如下: //2> (class1,张三,130) totalPoints.print(); env.execute("TestReduceFunctionOnWindow"); } /** * 定义班级的三元数组 */ public static final Tuple3[] ENGLISH = new Tuple3[]{ //班级 姓名 成绩 Tuple3.of("class1","张三",100), Tuple3.of("class1","李四",30), Tuple3.of("class1","王五",70), Tuple3.of("class2","赵六",50), Tuple3.of("class2","小七",40), Tuple3.of("class2","小八",10), }; } 结果: (class2,赵六,50) (class2,小七,40) ============== 1> (class2,赵六,90) (class1,张三,100) (class1,李四,30) ============== 2> (class1,张三,130) (2)AggregateFunction

AggregateFunction 比 ReduceFunction 更加的通用,它有三个参数:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。

输入类型是输入流中的元素类型,AggregateFunction有一个add方法可以将一个输入元素添加到一个累加器中。

该接口还具有创建初始累加器(createAccumulator方法)、将两个累加器合并到一个累加器(merge方法)以及从累加器中提取输出(类型为OUT)的方法。

/** * The accumulator is used to keep a running sum and a count. The {@code getResult} method * computes the average. */ private static class AverageAggregate implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>, Double> { @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } @Override public Tuple2<Long, Long> add(Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) { return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L); } @Override public Double getResult(Tuple2<Long, Long> accumulator) { return ((double) accumulator.f0) / accumulator.f1; } @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> a, Tuple2<Long, Long> b) { return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1); } } DataStream<Tuple2<String, Long>> input = ...; input .keyBy(<key selector>) .window(<window assigner>) .aggregate(new AverageAggregate());

举例:

/** * 测试AggFunction——求各个班级英语成绩平均分 * */ public class TestAggFunctionOnWindow { public static void main(String[] args) throws Exception { // 获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 读取数据 DataStream<Tuple3<String, String, Long>> input = env.fromElements(ENGLISH); // 求各个班级英语成绩平均分 DataStream<Double> avgScore = input.keyBy(0).countWindow(3).aggregate(new AverageAggrate()); avgScore.print(); env.execute("TestAggFunctionOnWindow"); } public static final Tuple3[] ENGLISH = new Tuple3[] { Tuple3.of("class1", "张三", 100L), Tuple3.of("class1", "李四", 40L), Tuple3.of("class1", "王五", 60L), Tuple3.of("class2", "赵六", 20L), Tuple3.of("class2", "小七", 30L), Tuple3.of("class2", "小八", 50L), }; //Tuple3<String, String, Long> 输入类型 //Tuple2<Long, Long> 累加器ACC类型,保存中间状态 //Double 输出类型 public static class AverageAggrate implements AggregateFunction<Tuple3<String, String, Long>, Tuple2<Long, Long>, Double> { /** * 创建累加器保存中间状态(sum count) * * sum 英语总成绩 * count 学生个数 * * @return */ @Override public Tuple2<Long, Long> createAccumulator() { return new Tuple2<>(0L, 0L); } /** * 将元素添加到累加器并返回新的累加器 * * @param value 输入类型 * @param acc 累加器ACC类型 * * @return 返回新的累加器 */ @Override public Tuple2<Long, Long> add(Tuple3<String, String, Long> value, Tuple2<Long, Long> acc) { //acc.f0 总成绩 //value.f2 表示成绩 //acc.f1 人数 return new Tuple2<>(acc.f0 + value.f2, acc.f1 + 1L); } /** * 从累加器提取结果 * * @param longLongTuple2 * @return */ @Override public Double getResult(Tuple2<Long, Long> acc) { return ((double) acc.f0) / acc.f1; } /** * 累加器合并 * * @param longLongTuple2 * @param acc1 * @return */ @Override public Tuple2<Long, Long> merge(Tuple2<Long, Long> acc1, Tuple2<Long, Long> acc2) { return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1); } } } (3)ProcessWindowFunction/ProcessAllWindowFunction

全量聚合: 窗口需要维护全部原始数据,窗口触发进行全量聚合。

? 在某些情况下,统计更复杂的指标可能需要依赖于窗口中所有的数据元素,或需要操作窗口中的状态数据和窗口元数据,这时就需要使用到ProcessWindowsFunction,ProcessWindowsFunction 能够更加灵活地支持基于窗口全部数据元素的结果计算, 例如对整个窗口数据排序取 TopN , 这样的需要就必须使用ProcessWindowFunction。

? ProcessWindowFunction获得一个包含窗口所有元素的可迭代器,以及一个具有时间和状态信息访问权的上下文对象,这使得它比其他窗口函数提供更大的灵活性。这是以性能和资源消耗为代价的,因为元素不能增量地聚合,而是需要在内部缓冲,直到认为窗口可以处理为止。

? WindowFunction的升级版,可以跟ReduceFunction/AggregateFunction/FoldFunction结合使用(推荐用法)

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function { /** * Evaluates the window and outputs none or several elements. * * @param key The key for which this window is evaluated. * @param context The context in which the window is being evaluated. * @param elements The elements in the window being evaluated. * @param out A collector for emitting elements. * * @throws Exception The function may throw exceptions to fail the program and trigger recovery. */ public abstract void process( KEY key, Context context, Iterable<IN> elements, Collector<OUT> out) throws Exception; /** * The context holding window metadata. */ public abstract class Context implements java.io.Serializable { /** * Returns the window that is being evaluated. */ public abstract W window(); /** Returns the current processing time. */ public abstract long currentProcessingTime(); /** Returns the current event-time watermark. */ public abstract long currentWatermark(); /** * State accessor for per-key and per-window state. * * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up * by implementing {@link ProcessWindowFunction#clear(Context)}. */ public abstract KeyedStateStore windowState(); /** * State accessor for per-key global state. */ public abstract KeyedStateStore globalState(); } }

举例(求各班级英语成绩平均分):

public class TestProcessWinFunctionOnWindow { public static void main(String[] args) throws Exception{ //获取执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //读取数据 DataStream<Tuple3<String,String,Long>> input = env.fromElements(ENGLISH); //求各班级英语成绩平均分 DataStream<Double> avgScore = input.keyBy(0) .countWindow(2) .process(new MyProcessWindowFunction()); avgScore.print(); env.execute("TestProcessWinFunctionOnWindow"); } public static class MyProcessWindowFunction extends ProcessWindowFunction<Tuple3<String,String,Long>,Double, Tuple, GlobalWindow>{ //iterable 输入流中的元素类型集合 @Override public void process(Tuple tuple, Context context, Iterable<Tuple3<String, String, Long>> iterable, Collector<Double> out) throws Exception { long sum = 0; long count = 0; for (Tuple3<String,String,Long> in :iterable){ sum+=in.f2; count++; } out.collect((double)(sum/count)); } } public static final Tuple3[] ENGLISH = new Tuple3[]{ Tuple3.of("class1","张三",100L), Tuple3.of("class1","李四",78L), Tuple3.of("class1","王五",99L), Tuple3.of("class2","赵六",81L), Tuple3.of("class2","小七",59L), Tuple3.of("class2","小八",97L), }; }


1.本站遵循行业规范,任何转载的稿件都会明确标注作者和来源;2.本站的原创文章,会注明原创字样,如未注明都非原创,如有侵权请联系删除!;3.作者投稿可能会经我们编辑修改或补充;4.本站不提供任何储存功能只提供收集或者投稿人的网盘链接。

标签: #flink窗口 #ReduceFunction #specifies #How #two #elements #from