flink onTimer定时器实现定时需求
< 返回列表时间: 2020-07-15来源:OSCHINA
1. 业务需求
接收实时数据流数据,实时更新状态,并且每隔一定的时间,将所有状态数据输出。
实时数据类型:("张", 1)
状态更新:第一个元素为key,将第二个元素全部缓存起来,放到list中,最后将key和其对应的list全部输出。
2. 实现方案
使用processFunction算子,在processElement函数中仅注册一次定时器,然后在onTimer函数中处理定时器任务,并且重新注册定时器。
3. 实现代码
3.1 source /** * 每隔1秒发送一个tuple2类型的数据,第一个字段值为随机的一个姓氏,第二个字段为自增的数字 **/ class MySourceTuple2 extends SourceFunction[(String, Long)] { var isRunning: Boolean = true val names: List[String] = List("张", "王", "李", "赵") private val random = new Random() var number: Long = 1 override def run(ctx: SourceFunction.SourceContext[(String, Long)]): Unit = { while (true) { val index: Int = random.nextInt(4) ctx.collect((names(index), number)) number += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
3.2 流处理 object TimerMain2 { def main(args: Array[String]): Unit = { val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) env .addSource(new MySourceTuple2) .keyBy(_._1) .process(new KeyedProcessFunction[String, (String, Long), String] { //缓存流数据 private val cache: mutable.Map[String, ListBuffer[Long]] = mutable.Map[String, ListBuffer[Long]]() private var first: Boolean = true /** * 定时器触发时回调该函数 * * @param timestamp 定时器触发时间 */ override def onTimer(timestamp: Long, ctx: KeyedProcessFunction[String, (String, Long), String]#OnTimerContext, out: Collector[String]): Unit = { println("定时器触发:" + timestamp) //将缓存中的数据组织成需要的格式 val builder = new StringBuilder() for (entry: (String, ListBuffer[Long]) <- cache) { builder.append(entry._1).append(":") for (ele <- entry._2) { builder.append(ele).append(",") } builder.delete(builder.size - 1, builder.size).append(";") cache(entry._1).clear() } println("定时器注册:" + timestamp) //该定时器执行完任务之后,重新注册一个定时器 ctx.timerService().registerProcessingTimeTimer(timestamp + 5000) out.collect(builder.toString()) } /** * 处理每一个流数据 */ override def processElement(value: (String, Long), ctx: KeyedProcessFunction[String, (String, Long), String]#Context, out: Collector[String]): Unit = { //仅在该算子接收到第一个数据时,注册一个定时器 if (first) { first = false val time: Long = System.currentTimeMillis() println("定时器第一次注册:" + time) ctx.timerService().registerProcessingTimeTimer(time + 5000) } //将流数据更新到缓存中 if (cache.contains(value._1)) { cache(value._1).append(value._2) } else { cache.put(value._1, ListBuffer[Long](value._2)) } } } ) .print("处理结果:") env.execute() } }
所有代码解释均在注释中。
4. 运行结果

可以看到,定时器注册之后,过5秒就会被触发,同时注册下个5秒的注册器,然后将数据发送到下个算子打印出来。
注意:该实例中算子并行度为1,所以“定时器第一次注册”只会触发一次,如果是多个并行度的话,则会在每个并行度里面进行“定时器第一次注册”,并且每个算子维护自己的定时器,算子之间互不影响。
热门排行