2024-04-11 21:23:58.0|分类: flink|浏览量: 286
package com.conca.flink.bean; /** * 传感器实体类 * @author conca */ public class SensorTemperaturer { public String sensor;//传感器id public Double temperaturer;//温度 public SensorTemperaturer() { super(); } public SensorTemperaturer(String sensor, Double temperaturer) { super(); this.sensor = sensor; this.temperaturer = temperaturer; } } package com.conca.flink.source; import java.util.Random; import org.apache.flink.streaming.api.functions.source.SourceFunction; import com.conca.flink.bean.SensorTemperaturer; /* * 传感器温度发生器,产生测试数据源 */ public class SensorTemperaturerSource implements SourceFunction<SensorTemperaturer>{ private boolean running = true; private Random random =new Random(); @Override public void cancel(){ running = false; } @Override public void run(SourceContext<SensorTemperaturer> ctx) throws Exception { while(running){ for (int i = 1; i < 4; i++) { ctx.collect(new SensorTemperaturer("snsor"+i, random.nextGaussian())); } Thread.sleep( 300L); } } } package com.conca.flink.vedio.keyedProcessFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.KeyedProcessFunction; import org.apache.flink.util.Collector; import com.conca.flink.bean.SensorTemperaturer; import com.conca.flink.source.SensorTemperaturerSource; /** * 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion * Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用 通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法 由于KeyedProcessFunction实现了RichFunction接口, 因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放 需要注意的是,只有在KeyedStream里才能够访问state和定时器, 通俗点来说就是这个函数要用在keyBy这个函数的后面 * @author conca */ public class ContinuousIncreaseMain { public static void main(String[] args) throws Exception { // 创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.addSource(new SensorTemperaturerSource()) .keyBy(r->r.sensor) .process(new StatisticsProcess()) .print(); env.execute(); } public static class StatisticsProcess extends KeyedProcessFunction<String, SensorTemperaturer, String>{ private static final long serialVersionUID = 1L; private ValueState<Double> lastTemperaturer; private ValueState<Long> time; @Override public void open(Configuration parameters) throws Exception { lastTemperaturer = getRuntimeContext() .getState( new ValueStateDescriptor<Double> ("xx", Types.DOUBLE)); time = getRuntimeContext() .getState( new ValueStateDescriptor<Long> ("yyy", Types.LONG)); } @Override public void processElement(SensorTemperaturer in, Context context, Collector<String> out) throws Exception { Double prev = lastTemperaturer.value(); lastTemperaturer.update(in.temperaturer); if(prev != null) { if(in.temperaturer > prev) { if(time.value() == null) { time.update(context.timerService() .currentProcessingTime()+1000L); context.timerService() .registerProcessingTimeTimer(time.value()); } }else if(in.temperaturer < prev) { if(time.value() != null) { context.timerService() .deleteProcessingTimeTimer(time.value()); time.clear(); } } } } @Override public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception { out.collect(ctx.getCurrentKey()+ "温度连续上涨1s,现在温度是:"+lastTemperaturer.value()); time.clear(); } } } |