flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vijay Bhaskar <bhaskar.eba...@gmail.com>
Subject Re: Question regarding state cleaning using timer
Date Mon, 17 Sep 2018 15:39:18 GMT
Thanks Kostas!

Regards
Bhaskar

On Mon, Sep 17, 2018 at 9:05 PM Kostas Kloudas <k.kloudas@data-artisans.com>
wrote:

> Hi Bhaskar,
>
> If you want different TTLs per key, then you should use timers with a
> process function
> as shown in [1]. This is though an old presentation, so now the
> RichProcessFunction is a KeyedProcessFunction.
> Also please have a look at the training material in [2] and the process
> function documentation in [3]
>
> Cheers,
> Kostas
>
> [1]
> https://www.slideshare.net/dataArtisans/apache-flink-training-datastream-api-processfunction
> [2] https://training.data-artisans.com/
> [3]
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/process_function.html
>
> On Sep 17, 2018, at 8:50 AM, Vijay Bhaskar <bhaskar.ebay77@gmail.com>
> wrote:
>
> Thanks Hequn. But i want to give random TTL for each partitioned key. How
> can i achieve it?
>
> Regards
> Bhaskar
>
> On Mon, Sep 17, 2018 at 7:30 AM Hequn Cheng <chenghequn@gmail.com> wrote:
>
>> Hi bhaskar,
>>
>> You need change nothing if you want to handle multi keys. Flink will do
>> it for you. The ValueState is a keyed state. You can think of Keyed State
>> <https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#using-managed-keyed-state>
>> as Operator State that has been partitioned, or sharded, with exactly one
>> state-partition per key.
>> TTL can be used in the same way.
>>
>> Best, Hequn
>>
>>
>> On Fri, Sep 14, 2018 at 10:29 PM bhaskar.ebay77@gmail.com <
>> bhaskar.ebay77@gmail.com> wrote:
>>
>>> Hi
>>> In the following example given in flink:
>>> object ExampleCountWindowAverage extends App {
>>>   val env = StreamExecutionEnvironment.getExecutionEnvironment
>>>
>>>   env.fromCollection(List(
>>>     (1L, 3L),
>>>     (1L, 5L),
>>>     (1L, 7L),
>>>     (1L, 4L),
>>>     (1L, 2L)
>>>   )).keyBy(_._1)
>>>     .flatMap(new CountWindowAverage())
>>>     .print()
>>>   // the printed output will be (1,4) and (1,5)
>>>
>>>   env.execute("ExampleManagedState")
>>> }
>>>
>>> There is only 1 state because there is one key. In the
>>> CountWindowAverage method there is one state descriptor :  new
>>> ValueStateDescriptor[(Long, Long)]("average", createTypeInformation[(Long,
>>> Long)])
>>> Name given as "average". In order to implement this is generic way,
>>> shall i modify the  method:
>>>
>>> CountWindowAverage(keyName:String)  so that  new
>>> ValueStateDescriptor[(Long, Long)](keyName, createTypeInformation[(Long,
>>> Long)]) is created. But how to configure TTL for this? Inside this method?
>>> In the eample:
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/state.html#state-time-to-live-ttl
>>> ,   you have given a stand alone ValueStateDescriptor.  How can i use the
>>> TTL inside CountWindowAverage() per Key?
>>>
>>> Regards
>>> Bhaskar
>>>
>>
>

Mime
View raw message