Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A3954200D1A for ; Mon, 9 Oct 2017 16:36:21 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A1D401609CE; Mon, 9 Oct 2017 14:36:21 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C0D8C1609BB for ; Mon, 9 Oct 2017 16:36:20 +0200 (CEST) Received: (qmail 38822 invoked by uid 500); 9 Oct 2017 14:36:19 -0000 Mailing-List: contact user-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@flink.apache.org Received: (qmail 38811 invoked by uid 99); 9 Oct 2017 14:36:19 -0000 Received: from mail-relay.apache.org (HELO mail-relay.apache.org) (140.211.11.15) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Oct 2017 14:36:19 +0000 Received: from aljoschas-mbp.fritz.box (ip-2-205-80-51.web.vodafone.de [2.205.80.51]) by mail-relay.apache.org (ASF Mail Server at mail-relay.apache.org) with ESMTPSA id 292601A0117; Mon, 9 Oct 2017 14:36:17 +0000 (UTC) From: Aljoscha Krettek Message-Id: Content-Type: multipart/alternative; boundary="Apple-Mail=_F075F255-5742-4C9B-B461-D5B90BA6DDFE" Mime-Version: 1.0 (Mac OS X Mail 11.0 \(3445.1.7\)) Subject: Re: Unusual log message - Emitter thread got interrupted Date: Mon, 9 Oct 2017 16:36:15 +0200 In-Reply-To: Cc: Ken Krugler , user To: =?utf-8?Q?Fabian_H=C3=BCske?= References: <60AD9E07-AE96-47F5-8DF9-28BE6BBE99A4@transpac.com> X-Mailer: Apple Mail (2.3445.1.7) archived-at: Mon, 09 Oct 2017 14:36:21 -0000 --Apple-Mail=_F075F255-5742-4C9B-B461-D5B90BA6DDFE Content-Transfer-Encoding: quoted-printable Content-Type: text/plain; charset=utf-8 Hi, In my understanding this is the expected behaviour of the code. The only = way to shut down the Emitter is via an interrupt because it is otherwise = blocking on the queue. If the Emitter had been interrupted while the = operator is still running it would have gone down a different code path: = https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be542f0646eada5= 7e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/oper= ators/async/Emitter.java#L89 Did you see any other faulty behaviour or only this log message. Best, Aljoscha > On 6. Oct 2017, at 18:17, Fabian Hueske wrote: >=20 > Hi Ken, >=20 > I don't have much experience with streaming iterations. > Maybe Aljoscha (in CC) has an idea what is happening and if it can be = prevented. >=20 > Best, Fabian >=20 > 2017-10-05 1:33 GMT+02:00 Ken Krugler >: > Hi all, >=20 > I=E2=80=99ve got a streaming topology with an iteration, and a = RichAsyncFunction in that iteration. >=20 > When the iteration terminates due to no activity, I see this message = in the logs: >=20 > 17/10/04 16:01:36 DEBUG async.Emitter:91 - Emitter thread got = interrupted. This indicates that the emitter should shut down. > java.lang.InterruptedException > at = java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.repo= rtInterruptAfterWait(AbstractQueuedSynchronizer.java:2014) > at = java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awai= t(AbstractQueuedSynchronizer.java:2048) > at = org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElemen= tQueue.peekBlockingly(UnorderedStreamElementQueue.java:147) > at = org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82= ) > at java.lang.Thread.run(Thread.java:748) >=20 > I read through https://issues.apache.org/jira/browse/FLINK-5638 = , which makes me = wonder if there=E2=80=99s a different but related issue involving an = async function in an iteration. >=20 > Or perhaps I need to do something in my RichAsyncFunction to avoid = this situation? >=20 > Or is this expected and just the way things are currently? >=20 > Just FYI, my topology is here: = https://s3.amazonaws.com/su-public/flink-crawler+topology.pdf = >=20 > Thanks, >=20 > =E2=80=94 Ken >=20 > -------------------------- > Ken Krugler > +1 530-210-6378 > http://www.scaleunlimited.com > custom big data solutions & training > Hadoop, Cascading, Cassandra & Solr >=20 >=20 --Apple-Mail=_F075F255-5742-4C9B-B461-D5B90BA6DDFE Content-Transfer-Encoding: quoted-printable Content-Type: text/html; charset=utf-8 Hi,

In my = understanding this is the expected behaviour of the code. The only way = to shut down the Emitter is via an interrupt because it is otherwise = blocking on the queue. If the Emitter had been interrupted while the = operator is still running it would have gone down a different code = path: https://github.com/apache/flink/blob/40cec17f4303b43bbf65d8be54= 2f0646eada57e8/flink-streaming-java/src/main/java/org/apache/flink/streami= ng/api/operators/async/Emitter.java#L89

Did you see any other faulty behaviour = or only this log message.

Best,
Aljoscha

On 6. = Oct 2017, at 18:17, Fabian Hueske <fhueske@gmail.com> wrote:

Hi Ken,

I don't have much experience with = streaming iterations.
Maybe Aljoscha (in CC) has an = idea what is happening and if it can be prevented.

Best, Fabian

2017-10-05= 1:33 GMT+02:00 Ken Krugler <kkrugler_lists@transpac.com>:
Hi all,

I=E2=80=99ve got a streaming topology = with an iteration, and a RichAsyncFunction in that = iteration.

When = the iteration terminates due to no activity, I see this message in the = logs:

17/10/04 = 16:01:36 DEBUG async.Emitter:91 - Emitter thread got interrupted. This = indicates that the emitter should shut down.
java.lang.InterruptedException
at = java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:2014)
at = java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2048)
at = org.apache.flink.streaming.api.operators.async.queue.UnorderedStreamElementQueue.peekBlockingly(UnorderedStreamElementQueue.java:147)
at = org.apache.flink.streaming.api.operators.async.Emitter.run(Emitter.java:82)
at = java.lang.Thread.run(Thread.java:748)

I read through https://issues.apache.org/jira/browse/FLINK-5638, which makes me wonder if = there=E2=80=99s a different but related issue involving an async = function in an iteration.

Or perhaps I need to do something in my RichAsyncFunction to = avoid this situation?

Or is this expected and just the way things are = currently?


Thanks,

=E2=80=94 Ken

--------------------------
Ken Krugler
+1 530-210-6378
http://www.scaleunlimited.com
custom big = data solutions & training
Hadoop, Cascading, Cassandra = & Solr



= --Apple-Mail=_F075F255-5742-4C9B-B461-D5B90BA6DDFE--