kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Fix deadlock between StreamThread and KafkaStreams
Date Mon, 03 Apr 2017 11:55:06 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk bdf4cba04 -> 3364f12bc


MINOR: Fix deadlock between StreamThread and KafkaStreams

This may be a reason why we see Jenkins jobs time out at times.
I can reproduce it locally.

With current trunk there is a possibility to run into this:

```sh
"kafka-streams-close-thread" #585 daemon prio=5 os_prio=0 tid=0x00007f66d052d800 nid=0x7e02
waiting for monitor entry [0x00007f66ae2e5000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.processor.internals.StreamThread.close(StreamThread.java:345)
	- waiting to lock <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.KafkaStreams$1.run(KafkaStreams.java:474)
	at java.lang.Thread.run(Thread.java:745)

"appId-bd262a91-5155-4a35-bc46-c6432552c2c5-StreamThread-97" #583 prio=5 os_prio=0 tid=0x00007f66d052f000
nid=0x7e01 waiting for monitor entry [0x00007f66ae4e6000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at org.apache.kafka.streams.KafkaStreams.setState(KafkaStreams.java:219)
	- waiting to lock <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.access$100(KafkaStreams.java:117)
	at org.apache.kafka.streams.KafkaStreams$StreamStateListener.onChange(KafkaStreams.java:259)
	- locked <0x000000077d42f138> (a org.apache.kafka.streams.KafkaStreams$StreamStateListener)
	at org.apache.kafka.streams.processor.internals.StreamThread.setState(StreamThread.java:168)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.setStateWhenNotInPendingShutdown(StreamThread.java:176)
	- locked <0x000000077d33c538> (a org.apache.kafka.streams.processor.internals.StreamThread)
	at org.apache.kafka.streams.processor.internals.StreamThread.access$1600(StreamThread.java:70)
	at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:1321)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:406)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:349)
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310)
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:296)
	at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1037)
	at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1002)
	at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:531)
	at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:669)
	at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:326)

```

In a nutshell: `KafkaStreams` and `StreamThread` are both
waiting for each other since another intermittent `close`
(eg. from a test) comes along also trying to lock on
`KafkaStreams` :

```sh
"main" #1 prio=5 os_prio=0 tid=0x00007f66d000c800 nid=0x78bb in Object.wait() [0x00007f66d7a15000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1249)
	- locked <0x000000077d45a590> (a java.lang.Thread)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:503)
	- locked <0x000000077d335760> (a org.apache.kafka.streams.KafkaStreams)
	at org.apache.kafka.streams.KafkaStreams.close(KafkaStreams.java:447)
	at org.apache.kafka.streams.KafkaStreamsTest.testCannotStartOnceClosed(KafkaStreamsTest.java:115)
```

=> causing a deadlock.

Fixed this by softer locking on the state change, that guarantees
atomic changes to the state but does not lock on the whole object
(I at least could not find another method that would require more
than atomicly-locked access except for `setState`).

Also qualified the state listeners with their outer-class to make
the whole code-flow around this more readable (having two
interfaces with the same naming for interface and method and then
using them between their two outer classes is crazy hard to read
imo :)).

Easy to reproduced yourself by running
`org.apache.kafka.streams.KafkaStreamsTest` in a loop for a bit
(save yourself some time by running 2-4 in parallel :)). Eventually
it will lock on one of the tests (for me this takes less than 1 min
with 4 parallel runs).

Author: Armin Braun <me@obrown.io>
Author: Armin <me@obrown.io>

Reviewers: Eno Thereska <eno@confluent.io>, Damian Guy <damian.guy@gmail.com>,
Ismael Juma <ismael@juma.me.uk>

Closes #2791 from original-brownbear/fix-streams-deadlock


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3364f12b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3364f12b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3364f12b

Branch: refs/heads/trunk
Commit: 3364f12bc240e3fefa6a467519ef608fa768917c
Parents: bdf4cba
Author: Armin Braun <me@obrown.io>
Authored: Mon Apr 3 12:50:51 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Mon Apr 3 12:50:57 2017 +0100

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |  2 +-
 .../org/apache/kafka/streams/KafkaStreams.java  | 36 +++++++++++---------
 .../processor/internals/StreamThread.java       |  8 ++---
 .../apache/kafka/streams/KafkaStreamsTest.java  |  2 +-
 4 files changed, 26 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 9342791..8fe99dd 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -769,7 +769,7 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
 
         log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeUnit.toMillis(timeout));
         // this will keep track of the first encountered exception
-        AtomicReference<Throwable> firstException = new AtomicReference<Throwable>();
+        AtomicReference<Throwable> firstException = new AtomicReference<>();
         boolean invokedFromCallback = Thread.currentThread() == this.ioThread;
         if (timeout > 0) {
             if (invokedFromCallback) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 2c116d9..6ddf2a1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -189,8 +189,12 @@ public class KafkaStreams {
             return validTransitions.contains(newState.ordinal());
         }
     }
+
+    private final Object stateLock = new Object();
+
     private volatile State state = State.CREATED;
-    private StateListener stateListener = null;
+
+    private KafkaStreams.StateListener stateListener = null;
 
 
     /**
@@ -208,25 +212,25 @@ public class KafkaStreams {
     }
 
     /**
-     * An app can set a single {@link StateListener} so that the app is notified when state
changes.
+     * An app can set a single {@link KafkaStreams.StateListener} so that the app is notified
when state changes.
      * @param listener a new state listener
      */
-    public void setStateListener(final StateListener listener) {
+    public void setStateListener(final KafkaStreams.StateListener listener) {
         stateListener = listener;
     }
 
-    private synchronized void setState(final State newState) {
-        final State oldState = state;
-        if (!state.isValidTransition(newState)) {
-            log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState,
newState);
-        } else {
-            log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
-        }
-
-        state = newState;
-
-        if (stateListener != null) {
-            stateListener.onChange(state, oldState);
+    private void setState(final State newState) {
+        synchronized (stateLock) {
+            final State oldState = state;
+            if (!state.isValidTransition(newState)) {
+                log.warn("{} Unexpected state transition from {} to {}.", logPrefix, oldState,
newState);
+            } else {
+                log.info("{} State transition from {} to {}.", logPrefix, oldState, newState);
+            }
+            state = newState;
+            if (stateListener != null) {
+                stateListener.onChange(state, oldState);
+            }
         }
     }
 
@@ -248,7 +252,7 @@ public class KafkaStreams {
         return Collections.unmodifiableMap(metrics.metrics());
     }
 
-    private class StreamStateListener implements StreamThread.StateListener {
+    private final class StreamStateListener implements StreamThread.StateListener {
         @Override
         public synchronized void onChange(final StreamThread thread,
                                           final StreamThread.State newState,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 46704b9..9791a0a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -124,7 +124,7 @@ public class StreamThread extends Thread {
     }
 
     private volatile State state = State.NOT_RUNNING;
-    private StateListener stateListener = null;
+    private StreamThread.StateListener stateListener = null;
 
     /**
      * Listen to state change events
@@ -141,10 +141,10 @@ public class StreamThread extends Thread {
     }
 
     /**
-     * Set the {@link StateListener} to be notified when state changes. Note this API is
internal to
+     * Set the {@link StreamThread.StateListener} to be notified when state changes. Note
this API is internal to
      * Kafka Streams and is not intended to be used by an external application.
      */
-    public void setStateListener(final StateListener listener) {
+    public void setStateListener(final StreamThread.StateListener listener) {
         this.stateListener = listener;
     }
 
@@ -463,7 +463,7 @@ public class StreamThread extends Thread {
                 action.apply(task);
             } catch (RuntimeException t) {
                 log.error("{} Failed while executing {} {} due to {}: ",
-                        StreamThread.this.logPrefix,
+                        logPrefix,
                         task.getClass().getSimpleName(),
                         task.id(),
                         exceptionMessage,

http://git-wip-us.apache.org/repos/asf/kafka/blob/3364f12b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index eebbde9..efa484e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -282,7 +282,7 @@ public class KafkaStreamsTest {
         try {
             streams.cleanUp();
         } catch (final IllegalStateException e) {
-            Assert.assertEquals("Cannot clean up while running.", e.getMessage());
+            assertEquals("Cannot clean up while running.", e.getMessage());
             throw e;
         } finally {
             streams.close();


Mime
View raw message