flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Utopia <gejunwei...@gmail.com>
Subject Cannot get value from ValueState in ProcessingTimeTrigger
Date Wed, 18 Dec 2019 11:20:20 GMT
Hi,

I want to get the last value stored in ValueState when processing element in Trigger.

But as the log shows that sometimes I can get the value, sometimes not.

Only one key in my data(SensorReading).

ValueState:
class ProcessingTimeTrigger extends Trigger[SensorReading, TimeWindow] {

private lazy val descriptor = new ValueStateDescriptor[Long]("desc", classOf[Long])
var value = 1
override def onElement( r: SensorReading, timestamp: Long, window: TimeWindow, ctx: Trigger.TriggerContext):
TriggerResult = {

  println("before update value : " + ctx.getPartitionedState(descriptor).value())

   ctx.getPartitionedState(descriptor).update(value)

   value += 1

   println("after update value: " + ctx.getPartitionedState(descriptor).value())

   ctx.registerProcessingTimeTimer(window.maxTimestamp)
   TriggerResult.CONTINUE
}

override def onEventTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext) = TriggerResult.CONTINUE

override def onProcessingTime(time: Long, window: TimeWindow, ctx: Trigger.TriggerContext)
= TriggerResult.FIRE

override def clear(window: TimeWindow, ctx: Trigger.TriggerContext): Unit = {
   ctx.deleteProcessingTimeTimer(window.maxTimestamp)
 }

override def onMerge(window: TimeWindow, ctx: Trigger.OnMergeContext): Unit = {
   val windowMaxTimestamp = window.maxTimestamp
   if (windowMaxTimestamp > ctx.getCurrentProcessingTime) ctx.registerProcessingTimeTimer(windowMaxTimestamp)
 }

override def canMerge: Boolean = true

}

Main process:
object MyCustomWindows {

def main(args: Array[String]): Unit = {

   val env = StreamExecutionEnvironment.getExecutionEnvironment
   env.getCheckpointConfig.setCheckpointInterval(10 * 1000)
   env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
   env.getConfig.setAutoWatermarkInterval(1000L)

   val sensorData: DataStream[SensorReading] = env
     .addSource(new SensorSource)
     .assignTimestampsAndWatermarks(new SensorTimeAssigner)

   val countsPerThirtySecs = sensorData
     .keyBy(_.id)
    .window(ProcessingTimeSessionWindows.withGap(Time.seconds(1000)))
     .trigger(new ProcessingTimeTrigger)
     .process(new CountFunction)

   env.execute()
 }
}

Log results:

	before update value : null
after update value: 1
before update value : null
after update value: 2
before update value : null
after update value: 3
before update value : 3
after update value: 4
before update value : null
after update value: 5
before update value : null
after update value: 6
before update value : null
after update value: 7
before update value : null
after update value: 8
before update value : null
after update value: 9
before update value : 9
after update value: 10


Best  regards
Utopia

Mime
View raw message