flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bhaskar.ebay77@gmail.com <bhaskar.eba...@gmail.com>
Subject Question regarding state cleaning using timer
Date Fri, 14 Sep 2018 14:29:23 GMT
In the following example given in flink:
object ExampleCountWindowAverage extends App {
  val env = StreamExecutionEnvironment.getExecutionEnvironment

    (1L, 3L),
    (1L, 5L),
    (1L, 7L),
    (1L, 4L),
    (1L, 2L)
    .flatMap(new CountWindowAverage())
  // the printed output will be (1,4) and (1,5)


There is only 1 state because there is one key. In the CountWindowAverage method there is
one state descriptor :  new ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long,
Name given as "average". In order to implement this is generic way, shall i modify the  method:

CountWindowAverage(keyName:String)  so that  new ValueStateDescriptor[(Long, Long)](keyName,
createTypeInformation[(Long, Long)]) is created. But how to configure TTL for this? Inside
this method?
In the eample: https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
,   you have given a stand alone ValueStateDescriptor.  How can i use the TTL inside CountWindowAverage()
per Key?


View raw message