flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Subject Re: IterativeStream seems to ignore maxWaitTimeMillis
Date Wed, 23 Nov 2016 04:07:51 GMT
Thanks for your answer Aljoscha,

The source stops, when I comment all the transformed streams and just print
the input, the program completes. But this is custom SourceFunction, could
this be related to this? Maybe I should implement emitWatermark? I'm using
ingestion time so I assumed this wasn't needed.



On Mon, Nov 21, 2016 at 9:17 AM, Aljoscha Krettek <aljoscha@apache.org>

> Might it be that your initial source never stops? A loop will only
> terminate if both the original source stops and the loop timeout is reached.
> On Mon, 21 Nov 2016 at 07:58 Juan Rodríguez Hortalá <
> juan.rodriguez.hortala@gmail.com> wrote:
>> Hi,
>> I wrote a proof of concept for a Java version of mapWithState with
>> time-based state eviction https://github.com/juanrh/
>> flink-state-eviction/blob/a6bb0d4ca0908d2f4350209a4a41e3
>> 81e99c76c5/src/main/java/com/github/juanrh/streaming/
>> MapWithStateIterPoC.java. The idea is:
>>  - Convert an input KeyedStream with key K and value V into a KeyedStream
>> of Either<V, K>, with the original values as Left.
>>  - Replace a ValueState<S> by a ValueState for a POJO that besides S it
>> stores the timestamp of the last time that state was accessed.
>>  - Define a IterativeStream from the Either stream, and apply a
>> transformation function that periorically sends "tombstone" events as Right
>> events in the closeWith of the IterativeStream. When a tombstone is
>> received, delete the state with clear if it the time since it was last
>> accessed is bigger than a configured time to live.
>> This seems to work so far, but there are some things that look weird to
>> me:
>>  - The program never seems to stop, event though I Ihave defined the
>> IterativeStream with https://ci.apache.org/projects/flink/flink-docs-
>> master/api/java/org/apache/flink/streaming/api/
>> datastream/DataStream.html#iterate-long- . The value of seems to be
>> ignored. I'm using a custom source function, but it seems like the method
>> SourceFunction.cancel() it's not being called.
>>  - I'm getting several messages "WARN MetricGroup: Name collision: Group
>> already contains a Metric with the name 'numRecordsOut'. Metric will not be
>> reported. (null)". What does that mean?
>> Thanks,
>> Juan

View raw message