2024-04-11 21:27:40.0|分类: flink|浏览量: 681
|
package com.conca.flink.source;
import java.util.Random;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
public class IntSource implements SourceFunction<Integer>{
private boolean running = true;
private Random random =new Random();
@Override
public void cancel(){
running = false;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while(running){
ctx.collect(random.nextInt(1000));
Thread.sleep( 1000L);
}
}
}
package com.conca.flink.vedio.keyedProcessFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import com.conca.flink.source.IntSource;
import com.conca.flink.vedio.keyBy.StatisticsBean;
/**
* 针对keyBy之后的键控流(KeyedStream),可以使用KeyedProcessFunetion
* Flink中输入流中的每一条数据都会触发KeyedProcessFunction类的processElement方法调用
通过这个方法的Context参数可以设置定时器,在开启定时器后会程序会定时调用onTimer方法
由于KeyedProcessFunction实现了RichFunction接口,因此是可以通过RuntimeContext上下文对象管理状态state的开启和释放
需要注意的是,只有在KeyedStream里才能够访问state和定时器,通俗点来说就是这个函数要用在keyBy这个函数的后面
* @author conca
*/
public class StatisticsMain {
public static void main(String[] args) throws Exception {
// 创建执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
env.setParallelism(1);
env.addSource(new IntSource())
.keyBy(r->"count")
.process(new StatisticsProcess())
.print();
env.execute();
}
public static class StatisticsProcess extends
KeyedProcessFunction<String, Integer, StatisticsBean>{
private static final long serialVersionUID = 1L;
private ValueState<StatisticsBean> accumulator;
@Override
public void open(Configuration parameters) throws Exception {
accumulator = getRuntimeContext()
.getState(
new ValueStateDescriptor<StatisticsBean>
("xx", org.apache.flink.api.common.typeinfo.Types
.POJO(StatisticsBean.class)));
}
@Override
public void processElement(Integer in, Context context,
Collector<StatisticsBean> out) throws Exception {
if(accumulator.value() == null) {
accumulator.update(new StatisticsBean(in, in, 1, in, in));
}else {
StatisticsBean old =
accumulator.value();
StatisticsBean newBean = new StatisticsBean
(Math.max(in, old.max),
Math.min(in, old.min),
1+old.count,
in+old.sum,
(in+old.sum)/(1+old.count));
accumulator.update(newBean);
}
out.collect(accumulator.value());
}
}
} |
