flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhaskar.ebay77@gmail.com <bhaskar.eba...@gmail.com>
Subject Question regarding state cleaning using timer
Date Fri, 14 Sep 2018 14:29:23 GMT
Hi
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

  env.fromCollection(List(
    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
  )).keyBy(_._1)
    .flatMap(new CountWindowAverage())
    .print()
  // the printed output will be (1,4) and (1,5)

  env.execute("ExampleManagedState")
}

There is only 1 state because there is one key. In the CountWindowAverage method there is
one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long,
Long)])
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName,
createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside
this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage()
per Key?

Regards
Bhaskar

Mime
View raw message