kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5818: KafkaStreams state transitions not correct
Date Sun, 03 Sep 2017 22:25:30 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 99e08412c -> 7b6e5f96c


KAFKA-5818: KafkaStreams state transitions not correct

Author: Matthias J. Sax <matthias@confluent.io>

Reviewers: Ted Yu <yuzhihong@gmail.com>, Guozhang Wang <wangguoz@gmail.com>

Closes #3779 from mjsax/kafka-5818-kafkaStreams-state-transition-01101


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7b6e5f96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7b6e5f96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7b6e5f96

Branch: refs/heads/0.11.0
Commit: 7b6e5f96ce8e724fb513dfc7f2da2eaf7bb82328
Parents: 99e0841
Author: Matthias J. Sax <matthias@confluent.io>
Authored: Sun Sep 3 15:25:26 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Sep 3 15:25:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/streams/KafkaStreams.java  | 26 ++++++++++++----
 .../apache/kafka/streams/KafkaStreamsTest.java  | 32 +++++++++++---------
 2 files changed, 38 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/7b6e5f96/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 846348f..08ca34c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -79,7 +79,6 @@ import static org.apache.kafka.common.utils.Utils.getPort;
 import static org.apache.kafka.streams.KafkaStreams.State.ERROR;
 import static org.apache.kafka.streams.KafkaStreams.State.NOT_RUNNING;
 import static org.apache.kafka.streams.KafkaStreams.State.PENDING_SHUTDOWN;
-import static org.apache.kafka.streams.KafkaStreams.State.RUNNING;
 import static org.apache.kafka.streams.StreamsConfig.EXACTLY_ONCE;
 import static org.apache.kafka.streams.StreamsConfig.PROCESSING_GUARANTEE_CONFIG;
 
@@ -277,6 +276,22 @@ public class KafkaStreams {
         return true;
     }
 
+    private void setRunningFromCreated() {
+        synchronized (stateLock) {
+            if (state != State.CREATED) {
+                log.error("{} Unexpected state transition from {} to {}", logPrefix, state,
State.RUNNING);
+                throw new IllegalStateException(logPrefix + " Unexpected state transition
from " + state + " to " + State.RUNNING);
+            }
+            state = State.RUNNING;
+            stateLock.notifyAll();
+        }
+
+        // we need to call the user customized state listener outside the state lock to avoid
potential deadlocks
+        if (stateListener != null) {
+            stateListener.onChange(State.RUNNING, State.CREATED);
+        }
+    }
+
     /**
      * Return the current {@link State} of this {@code KafkaStreams} instance.
      *
@@ -505,8 +520,8 @@ public class KafkaStreams {
         if (globalTaskTopology != null) {
             globalStreamThread.setStateListener(streamStateListener);
         }
-        for (int i = 0; i < threads.length; i++) {
-            threads[i].setStateListener(streamStateListener);
+        for (final StreamThread thread :  threads) {
+            thread.setStateListener(streamStateListener);
         }
         final GlobalStateStoreProvider globalStateStoreProvider = new GlobalStateStoreProvider(builder.globalStateStores());
         queryableStoreProvider = new QueryableStoreProvider(storeProviders, globalStateStoreProvider);
@@ -558,9 +573,8 @@ public class KafkaStreams {
 
     private void validateStartOnce() {
         try {
-            if (setState(RUNNING)) {
-                return;
-            }
+            setRunningFromCreated();
+            return;
         } catch (StreamsException e) {
             // do nothing, will throw
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7b6e5f96/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index d2eaca7..8eea60c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 @Category({IntegrationTest.class})
 public class KafkaStreamsTest {
@@ -229,29 +230,29 @@ public class KafkaStreamsTest {
             closeCount, MockMetricsReporter.CLOSE_COUNT.get());
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testCannotStartOnceClosed() throws Exception {
         streams.start();
         streams.close();
         try {
             streams.start();
-        } catch (final IllegalStateException e) {
-            Assert.assertEquals("Cannot start again.", e.getMessage());
-            throw e;
+            fail("Should have thrown IllegalStateException.");
+        } catch (final IllegalStateException expected) {
+            // ignore
         } finally {
             streams.close();
         }
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testCannotStartTwice() throws Exception {
         streams.start();
 
         try {
             streams.start();
-        } catch (final IllegalStateException e) {
-            Assert.assertEquals("Cannot start again.", e.getMessage());
-            throw e;
+            fail("Should have thrown IllegalStateException.");
+        } catch (final IllegalStateException expected) {
+            // ignore
         } finally {
             streams.close();
         }
@@ -265,15 +266,18 @@ public class KafkaStreamsTest {
         assertEquals(metrics.size(), 16);
     }
 
-    @Test(expected = ConfigException.class)
+    @Test
     public void testIllegalMetricsConfig() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig");
         final KStreamBuilder builder = new KStreamBuilder();
-        final KafkaStreams streams = new KafkaStreams(builder, props);
 
+        try {
+            new KafkaStreams(builder, props);
+            fail("Should have thrown ConfigException.");
+        } catch (final ConfigException expected) { /* ignore */ }
     }
 
     @Test
@@ -389,7 +393,7 @@ public class KafkaStreamsTest {
         streams.cleanUp();
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test
     public void testCannotCleanupWhileRunning() throws Exception {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
@@ -401,9 +405,9 @@ public class KafkaStreamsTest {
         streams.start();
         try {
             streams.cleanUp();
-        } catch (final IllegalStateException e) {
-            assertEquals("Cannot clean up while running.", e.getMessage());
-            throw e;
+            fail("Should have thrown IllegalStateException.");
+        } catch (final IllegalStateException expected) {
+            // ignore
         } finally {
             streams.close();
         }


Mime
View raw message