2024-04-11 21:26:14.0|分类: flink|浏览量: 279
package com.conca.flink.source; import java.util.Random; import org.apache.flink.streaming.api.functions.source.SourceFunction; public class IntSource implements SourceFunction<Integer>{ private boolean running = true; private Random random =new Random(); @Override public void cancel(){ running = false; } @Override public void run(SourceContext<Integer> ctx) throws Exception { while(running){ ctx.collect(random.nextInt(1000)); Thread.sleep( 1000L); } } } package com.conca.flink.vedio.keyedProcessFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import com.conca.flink.source.IntSource; import com.conca.flink.vedio.keyBy.StatisticsBean; /** * 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion * Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法 由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放 需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面 * @author conca */ public class StatisticsMain2 { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new IntSource()) .keyBy(r->"count") .process(new StatisticsProcess()) .print(); env.execute(); } public static class StatisticsProcess extends KeyedProcessFunction<String, Integer, StatisticsBean>{ private static final long serialVersionUID = 1L; private ValueState<StatisticsBean> accumulator; private ValueState<Boolean> tag; @Override public void open(Configuration parameters) throws Exception { accumulator = getRuntimeContext() .getState( new ValueStateDescriptor<StatisticsBean> ("xx", org.apache.flink.api.common.typeinfo.Types .POJO(StatisticsBean.class))); tag = getRuntimeContext() .getState( new ValueStateDescriptor<Boolean> ("yyy", Types.BOOLEAN)); } @Override public void processElement(Integer in, Context context, Collector<StatisticsBean> out) throws Exception { if(accumulator.value() == null) { accumulator.update(new StatisticsBean(in, in, 1, in, in)); }else { StatisticsBean old = accumulator.value(); StatisticsBean newBean = new StatisticsBean (Math.max(in, old.max), Math.min(in, old.min), 1+old.count, in+old.sum, (in+old.sum)/(1+old.count)); accumulator.update(newBean); } if(tag.value() == null) { context .timerService() .registerProcessingTimeTimer( context.timerService().currentProcessingTime()+10*1000); tag.update(true); } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<StatisticsBean> out) throws Exception { out.collect(accumulator.value()); tag.clear(); } } } |