flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Unusual log message - Emitter thread got interrupted
Date Mon, 09 Oct 2017 14:36:15 GMT
Hi,

In my understanding this is the expected behaviour of the code. The only way to shut down
the Emitter is via an interrupt because it is otherwise blocking on the queue. If the Emitter
had been interrupted while the operator is still running it would have gone down a different
code path: https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/Emitter.java#L89

Did you see any other faulty behaviour or only this log message.

Best,
Aljoscha

> On 6. Oct 2017, at 18:17, Fabian Hueske <fhueske@gmail.com> wrote:
> 
> Hi Ken,
> 
> I don't have much experience with streaming iterations.
> Maybe Aljoscha (in CC) has an idea what is happening and if it can be prevented.
> 
> Best, Fabian
> 
> 2017-10-05 1:33 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com <mailto:kkrugler_lists@transpac.com>>:
> Hi all,
> 
> I’ve got a streaming topology with an iteration, and a RichAsyncFunction in that iteration.
> 
> When the iteration terminates due to no activity, I see this message in the logs:
> 
> 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This indicates
that the emitter should shut down.
> java.lang.InterruptedException
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
> 	at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
> 	at org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
> 	at org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
> 	at java.lang.Thread.run(Thread.java:748)
> 
> I read through https://issues.apache.org/jira/browse/FLINK-5638 <https://issues.apache.org/jira/browse/FLINK-5638>,
which makes me wonder if there’s a different but related issue involving an async function
in an iteration.
> 
> Or perhaps I need to do something in my RichAsyncFunction to avoid this situation?
> 
> Or is this expected and just the way things are currently?
> 
> Just FYI, my topology is here: https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf
<https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf>
> 
> Thanks,
> 
> — Ken
> 
> --------------------------
> Ken Krugler
> +1 530-210-6378 <tel:(530)%20210-6378>
> http://www.scaleunlimited.com <http://www.scaleunlimited.com/>
> custom big data solutions & training
> Hadoop, Cascading, Cassandra & Solr
> 
> 


Mime
View raw message