Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9393D200C3C for ; Mon, 3 Apr 2017 13:55:09 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 91DE9160B8F; Mon, 3 Apr 2017 11:55:09 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A63B0160B76 for ; Mon, 3 Apr 2017 13:55:08 +0200 (CEST) Received: (qmail 81919 invoked by uid 500); 3 Apr 2017 11:55:07 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 81897 invoked by uid 99); 3 Apr 2017 11:55:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Apr 2017 11:55:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 95A47DFBAB; Mon, 3 Apr 2017 11:55:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ijuma@apache.org To: commits@kafka.apache.org Message-Id: <009cdc83ae484329aed067d00edf095a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Fix deadlock between StreamThread and KafkaStreams Date: Mon, 3 Apr 2017 11:55:06 +0000 (UTC) archived-at: Mon, 03 Apr 2017 11:55:09 -0000 Repository: kafka Updated Branches: refs/heads/trunk bdf4cba04 -> 3364f12bc MINOR: Fix deadlock between StreamThread and KafkaStreams This may be a reason why we see Jenkins jobs time out at times. I can reproduce it locally. With current trunk there is a possibility to run into this: ```sh "kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02 waiting for monitor entry [0x00007f66ae2e5000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345) - waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474) at java.lang.Thread.run(Thread.java:745) "appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000 nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219) - waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117) at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259) - locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener) at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168) - locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176) - locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread) at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326) ``` In a nutshell: `KafkaStreams` and `StreamThread` are both waiting for each other since another intermittent `close` (eg. from a test) comes along also trying to lock on `KafkaStreams` : ```sh "main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000] java.lang.Thread.State: WAITING (on object monitor) at java.lang.Object.wait(Native Method) at java.lang.Thread.join(Thread.java:1249) - locked <0x000000077d45a590> (a java.lang.Thread) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503) - locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams) at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447) at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115) ``` => causing a deadlock. Fixed this by softer locking on the state change, that guarantees atomic changes to the state but does not lock on the whole object (I at least could not find another method that would require more than atomicly-locked access except for `setState`). Also qualified the state listeners with their outer-class to make the whole code-flow around this more readable (having two interfaces with the same naming for interface and method and then using them between their two outer classes is crazy hard to read imo :)). Easy to reproduced yourself by running `org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit (save yourself some time by running 2-4 in parallel :)). Eventually it will lock on one of the tests (for me this takes less than 1 min with 4 parallel runs). Author: Armin Braun Author: Armin Reviewers: Eno Thereska , Damian Guy , Ismael Juma Closes #2791 from original-brownbear/fix-streams-deadlock Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3364f12b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3364f12b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3364f12b Branch: refs/heads/trunk Commit: 3364f12bc240e3fefa6a467519ef608fa768917c Parents: bdf4cba Author: Armin Braun Authored: Mon Apr 3 12:50:51 2017 +0100 Committer: Ismael Juma Committed: Mon Apr 3 12:50:57 2017 +0100 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../org/apache/kafka/streams/KafkaStreams.java | 36 +++++++++++--------- .../processor/internals/StreamThread.java | 8 ++--- .../apache/kafka/streams/KafkaStreamsTest.java | 2 +- 4 files changed, 26 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 9342791..8fe99dd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -769,7 +769,7 @@ public class KafkaProducer implements Producer { log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout)); // this will keep track of the first encountered exception - AtomicReference firstException = new AtomicReference(); + AtomicReference firstException = new AtomicReference<>(); boolean invokedFromCallback = Thread.currentThread() == this.ioThread; if (timeout > 0) { if (invokedFromCallback) { http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 2c116d9..6ddf2a1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -189,8 +189,12 @@ public class KafkaStreams { return validTransitions.contains(newState.ordinal()); } } + + private final Object stateLock = new Object(); + private volatile State state = State.CREATED; - private StateListener stateListener = null; + + private KafkaStreams.StateListener stateListener = null; /** @@ -208,25 +212,25 @@ public class KafkaStreams { } /** - * An app can set a single {@link StateListener} so that the app is notified when state changes. + * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified when state changes. * @param listener a new state listener */ - public void setStateListener(final StateListener listener) { + public void setStateListener(final KafkaStreams.StateListener listener) { stateListener = listener; } - private synchronized void setState(final State newState) { - final State oldState = state; - if (!state.isValidTransition(newState)) { - log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); - } else { - log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); - } - - state = newState; - - if (stateListener != null) { - stateListener.onChange(state, oldState); + private void setState(final State newState) { + synchronized (stateLock) { + final State oldState = state; + if (!state.isValidTransition(newState)) { + log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState, newState); + } else { + log.info("{} State transition from {} to {}.", logPrefix, oldState, newState); + } + state = newState; + if (stateListener != null) { + stateListener.onChange(state, oldState); + } } } @@ -248,7 +252,7 @@ public class KafkaStreams { return Collections.unmodifiableMap(metrics.metrics()); } - private class StreamStateListener implements StreamThread.StateListener { + private final class StreamStateListener implements StreamThread.StateListener { @Override public synchronized void onChange(final StreamThread thread, final StreamThread.State newState, http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 46704b9..9791a0a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -124,7 +124,7 @@ public class StreamThread extends Thread { } private volatile State state = State.NOT_RUNNING; - private StateListener stateListener = null; + private StreamThread.StateListener stateListener = null; /** * Listen to state change events @@ -141,10 +141,10 @@ public class StreamThread extends Thread { } /** - * Set the {@link StateListener} to be notified when state changes. Note this API is internal to + * Set the {@link StreamThread.StateListener} to be notified when state changes. Note this API is internal to * Kafka Streams and is not intended to be used by an external application. */ - public void setStateListener(final StateListener listener) { + public void setStateListener(final StreamThread.StateListener listener) { this.stateListener = listener; } @@ -463,7 +463,7 @@ public class StreamThread extends Thread { action.apply(task); } catch (RuntimeException t) { log.error("{} Failed while executing {} {} due to {}: ", - StreamThread.this.logPrefix, + logPrefix, task.getClass().getSimpleName(), task.id(), exceptionMessage, http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index eebbde9..efa484e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -282,7 +282,7 @@ public class KafkaStreamsTest { try { streams.cleanUp(); } catch (final IllegalStateException e) { - Assert.assertEquals("Cannot clean up while running.", e.getMessage()); + assertEquals("Cannot clean up while running.", e.getMessage()); throw e; } finally { streams.close();