kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-6349: Fix concurrent modification exception in AbstractStateManager during restore
Date Wed, 13 Dec 2017 03:25:05 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk fe3699ce3 -> d1a9252ca


KAFKA-6349: Fix concurrent modification exception in AbstractStateManager during restore

Fixes a `ConcurrentModificationException` in`AbstractStateManager` that is triggered when
a `StateStore` is re-initialized and there are multiple stores in the context.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Bill Bejeck <bill@confluent.io>,
GuozhangWang <wangguoz@gmail.com>

Closes #4317 from dguy/kafka-6349


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d1a9252c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d1a9252c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d1a9252c

Branch: refs/heads/trunk
Commit: d1a9252ca1808711882ae4c3043b0576620f8177
Parents: fe3699c
Author: Damian Guy <damian.guy@gmail.com>
Authored: Tue Dec 12 19:25:01 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Tue Dec 12 19:25:01 2017 -0800

----------------------------------------------------------------------
 .../internals/AbstractStateManager.java         |  9 ++--
 .../internals/ProcessorStateManagerTest.java    | 48 ++++++++++++++++++++
 2 files changed, 52 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d1a9252c/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
index 777e46b..d387762 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractStateManager.java
@@ -28,7 +28,6 @@ import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
 import java.util.Set;
@@ -57,6 +56,7 @@ abstract class AbstractStateManager implements StateManager {
                                                      final InternalProcessorContext processorContext)
{
         final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic);
         final Set<String> storeToBeReinitialized = new HashSet<>();
+        final Map<String, StateStore> storesCopy = new HashMap<>(stateStores);
 
         for (final TopicPartition topicPartition : partitions) {
             checkpointableOffsets.remove(topicPartition);
@@ -69,16 +69,15 @@ abstract class AbstractStateManager implements StateManager {
             throw new StreamsException("Failed to reinitialize global store.", fatalException);
         }
 
-        final Iterator<Map.Entry<String, StateStore>> it = stateStores.entrySet().iterator();
-        while (it.hasNext()) {
-            final StateStore stateStore = it.next().getValue();
+        for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
+            final StateStore stateStore = entry.getValue();
             final String storeName = stateStore.name();
             if (storeToBeReinitialized.contains(storeName)) {
                 try {
                     stateStore.close();
                 } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ }
                 processorContext.uninitialize();
-                it.remove();
+                stateStores.remove(entry.getKey());
 
                 // TODO remove this eventually
                 // -> (only after we are sure, we don't need it for backward compatibility
reasons anymore; maybe 2.0 release?)

http://git-wip-us.apache.org/repos/asf/kafka/blob/d1a9252c/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index 6ed2245..ab9abc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -25,10 +25,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.test.MockBatchingStateRestoreListener;
 import org.apache.kafka.test.MockStateStore;
+import org.apache.kafka.test.NoOpProcessorContext;
 import org.apache.kafka.test.TestUtils;
 import org.junit.After;
 import org.junit.Assert;
@@ -38,8 +41,10 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -648,6 +653,49 @@ public class ProcessorStateManagerTest {
         }
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldSuccessfullyReInitializeStateStores() throws IOException {
+        final String store2Name = "store2";
+        final String store2Changelog = "store2-changelog";
+        final TopicPartition store2Partition = new TopicPartition(store2Changelog, 0);
+        final List<TopicPartition> changelogPartitions = Arrays.asList(changelogTopicPartition,
store2Partition);
+        Map<String, String> storeToChangelog = new HashMap() {
+            {
+                put(storeName, changelogTopic);
+                put(store2Name, store2Changelog);
+            }
+        };
+        final ProcessorStateManager stateManager = new ProcessorStateManager(
+                taskId,
+                changelogPartitions,
+                false,
+                stateDirectory,
+                storeToChangelog,
+                changelogReader,
+                false,
+                logContext);
+
+        final MockStateStore stateStore = new MockStateStore(storeName, true);
+        final MockStateStore stateStore2 = new MockStateStore(store2Name, true);
+
+        stateManager.register(stateStore, stateStore.stateRestoreCallback);
+        stateManager.register(stateStore2, stateStore2.stateRestoreCallback);
+
+        stateStore.initialized = false;
+        stateStore2.initialized = false;
+
+        stateManager.reinitializeStateStoresForPartitions(changelogPartitions, new NoOpProcessorContext()
{
+            @Override
+            public void register(final StateStore store, final boolean deprecatedAndIgnoredLoggingEnabled,
final StateRestoreCallback stateRestoreCallback) {
+                stateManager.register(store, stateRestoreCallback);
+            }
+        });
+
+        assertTrue(stateStore.initialized);
+        assertTrue(stateStore2.initialized);
+    }
+
     private ProcessorStateManager getStandByStateManager(TaskId taskId) throws IOException
{
         return new ProcessorStateManager(
             taskId,


Mime
View raw message