kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: MINOR: Fix multiple KafkaStreams.StreamStateListener being instantiated
Date Wed, 05 Apr 2017 10:39:44 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk d4c4bcf01 -> 97e61d4ae


MINOR: Fix multiple KafkaStreams.StreamStateListener being instantiated

There should only be a single `KafkaStreams.StreamStateListener` to
ensure synchronization of operations on
`KafkaStreams.StreamStateListener#threadState`.

Author: Armin Braun <me@obrown.io>

Reviewers: Damian Guy <damian.guy@gmail.com>, Ismael Juma <ismael@juma.me.uk>

Closes #2801 from original-brownbear/fix-stream-state-listener


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/97e61d4a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/97e61d4a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/97e61d4a

Branch: refs/heads/trunk
Commit: 97e61d4ae2feaf0551e75fa8cdd041f49f42a9a5
Parents: d4c4bcf
Author: Armin Braun <armin.braun@1und1.de>
Authored: Wed Apr 5 11:38:52 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Wed Apr 5 11:39:18 2017 +0100

----------------------------------------------------------------------
 .../java/org/apache/kafka/streams/KafkaStreams.java    | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/97e61d4a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
index 6ddf2a1..bc2a433 100644
--- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
+++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
@@ -122,7 +122,6 @@ public class KafkaStreams {
     private GlobalStreamThread globalStreamThread;
 
     private final StreamThread[] threads;
-    private final Map<Long, StreamThread.State> threadState;
     private final Metrics metrics;
     private final QueryableStoreProvider queryableStoreProvider;
 
@@ -253,6 +252,13 @@ public class KafkaStreams {
     }
 
     private final class StreamStateListener implements StreamThread.StateListener {
+
+        private final Map<Long, StreamThread.State> threadState;
+
+        StreamStateListener(Map<Long, StreamThread.State> threadState) {
+            this.threadState = threadState;
+        }
+
         @Override
         public synchronized void onChange(final StreamThread thread,
                                           final StreamThread.State newState,
@@ -333,7 +339,7 @@ public class KafkaStreams {
         metrics = new Metrics(metricConfig, reporters, time);
 
         threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)];
-        threadState = new HashMap<>(threads.length);
+        final Map<Long, StreamThread.State> threadState = new HashMap<>(threads.length);
         final ArrayList<StateStoreProvider> storeProviders = new ArrayList<>();
         streamsMetadataState = new StreamsMetadataState(builder, parseHostInfo(config.getString(StreamsConfig.APPLICATION_SERVER_CONFIG)));
 
@@ -358,6 +364,7 @@ public class KafkaStreams {
                                                         globalThreadId);
         }
 
+        final StreamStateListener streamStateListener = new StreamStateListener(threadState);
         for (int i = 0; i < threads.length; i++) {
             threads[i] = new StreamThread(builder,
                                           config,
@@ -369,7 +376,7 @@ public class KafkaStreams {
                                           time,
                                           streamsMetadataState,
                                           cacheSizeBytes);
-            threads[i].setStateListener(new StreamStateListener());
+            threads[i].setStateListener(streamStateListener);
             threadState.put(threads[i].getId(), threads[i].state());
             storeProviders.add(new StreamThreadStateStoreProvider(threads[i]));
         }


Mime
View raw message