flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juan Rodríguez Hortalá <juan.rodriguez.hort...@gmail.com>
Subject IterativeStream seems to ignore maxWaitTimeMillis
Date Mon, 21 Nov 2016 06:57:40 GMT

I wrote a proof of concept for a Java version of mapWithState with
time-based state eviction
The idea is:

 - Convert an input KeyedStream with key K and value V into a KeyedStream
of Either<V, K>, with the original values as Left.
 - Replace a ValueState<S> by a ValueState for a POJO that besides S it
stores the timestamp of the last time that state was accessed.
 - Define a IterativeStream from the Either stream, and apply a
transformation function that periorically sends "tombstone" events as Right
events in the closeWith of the IterativeStream. When a tombstone is
received, delete the state with clear if it the time since it was last
accessed is bigger than a configured time to live.

This seems to work so far, but there are some things that look weird to me:

 - The program never seems to stop, event though I Ihave defined the
IterativeStream with
. The value of seems to be ignored. I'm using a custom source function, but
it seems like the method SourceFunction.cancel() it's not being called.

 - I'm getting several messages "WARN MetricGroup: Name collision: Group
already contains a Metric with the name 'numRecordsOut'. Metric will not be
reported. (null)". What does that mean?



View raw message