kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint file (#5219)
Date Thu, 14 Jun 2018 20:51:23 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 6517c7c  KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory
stores in checkpoint file (#5219)
6517c7c is described below

commit 6517c7c02c20f4d76d60bb80b11b0914e9fdf8c3
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Thu Jun 14 04:44:09 2018 -0700

    KAFKA-6711: GlobalStateManagerImpl should not write offsets of in-memory stores in checkpoint
file (#5219)
---
 .../internals/GlobalStateManagerImpl.java          | 28 ++++++++++++++++++----
 .../integration/GlobalKTableIntegrationTest.java   | 26 +++++++++++++++++++-
 .../internals/GlobalStateManagerImplTest.java      | 19 +++++++++++++--
 .../org/apache/kafka/test/NoOpReadOnlyStore.java   | 11 +++++++--
 4 files changed, 75 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 036bb1d..fad8833 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -63,6 +63,7 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     private final Set<String> globalStoreNames = new HashSet<>();
     private final Map<TopicPartition, Long> checkpointableOffsets = new HashMap<>();
     private final StateRestoreListener stateRestoreListener;
+    private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
 
     public GlobalStateManagerImpl(final ProcessorTopology topology,
                                   final Consumer<byte[], byte[]> consumer,
@@ -74,6 +75,14 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
         this.baseDir = stateDirectory.globalStateDir();
         this.checkpoint = new OffsetCheckpoint(new File(this.baseDir, CHECKPOINT_FILE_NAME));
         this.stateRestoreListener = stateRestoreListener;
+
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent()) {
+                globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
     }
 
     @Override
@@ -250,11 +259,22 @@ public class GlobalStateManagerImpl implements GlobalStateManager {
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
         checkpointableOffsets.putAll(offsets);
-        if (!checkpointableOffsets.isEmpty()) {
+
+        final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
+
+        // Skip non persistent store
+        for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet())
{
+            final String topic = topicPartitionOffset.getKey().topic();
+            if (!globalNonPersistentStoresTopics.contains(topic)) {
+                filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
+            }
+        }
+
+        if (!filteredOffsets.isEmpty()) {
             try {
-                checkpoint.write(checkpointableOffsets);
-            } catch (IOException e) {
-                log.warn("Failed to write offsets checkpoint for global stores", e);
+                checkpoint.write(filteredOffsets);
+            } catch (final IOException e) {
+                log.warn("Failed to write offset checkpoint file to {} for global stores:
{}", checkpoint, e);
             }
         }
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index 0816aba..8892b90 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -40,6 +40,7 @@ import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.IntegrationTest;
 import org.apache.kafka.test.TestCondition;
 import org.apache.kafka.test.TestUtils;
@@ -55,6 +56,9 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Properties;
 
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.junit.Assert.assertThat;
+
 @Category({IntegrationTest.class})
 public class GlobalKTableIntegrationTest {
     private static final int NUM_BROKERS = 1;
@@ -225,7 +229,27 @@ public class GlobalKTableIntegrationTest {
             }
         }, 30000L, "waiting for final values");
     }
-    
+
+    @Test
+    public void shouldRestoreGlobalInMemoryKTableOnRestart() throws Exception {
+        builder = new StreamsBuilder();
+        globalTable = builder.globalTable(
+            globalOne,
+            Consumed.with(Serdes.Long(), Serdes.String()),
+            Materialized.<Long, String>as(Stores.inMemoryKeyValueStore(globalStore)));
+
+        produceInitialGlobalTableValues();
+
+        startStreams();
+        ReadOnlyKeyValueStore<Long, String> store = kafkaStreams.store(globalStore,
QueryableStoreTypes.<Long, String>keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+        kafkaStreams.close();
+
+        startStreams();
+        store = kafkaStreams.store(globalStore, QueryableStoreTypes.<Long, String>keyValueStore());
+        assertThat(store.approximateNumEntries(), equalTo(4L));
+    }
+
     private void createTopics() throws InterruptedException {
         inputStream = "input-stream-" + testNo;
         inputTable = "input-table-" + testNo;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index e9d61f5..e1fb54e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -68,12 +68,14 @@ public class GlobalStateManagerImplTest {
     private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
     private final TopicPartition t1 = new TopicPartition("t1", 1);
     private final TopicPartition t2 = new TopicPartition("t2", 1);
+    private final TopicPartition t3 = new TopicPartition("t3", 1);
     private GlobalStateManagerImpl stateManager;
     private NoOpProcessorContext context;
     private StateDirectory stateDirectory;
     private String stateDirPath;
     private NoOpReadOnlyStore<Object, Object> store1;
     private NoOpReadOnlyStore store2;
+    private NoOpReadOnlyStore store3;
     private MockConsumer<byte[], byte[]> consumer;
     private File checkpointFile;
     private ProcessorTopology topology;
@@ -83,18 +85,21 @@ public class GlobalStateManagerImplTest {
         final Map<String, String> storeToTopic = new HashMap<>();
         storeToTopic.put("t1-store", "t1");
         storeToTopic.put("t2-store", "t2");
+        storeToTopic.put("t3-store", "t3");
 
         final Map<StateStore, ProcessorNode> storeToProcessorNode = new HashMap<>();
         store1 = new NoOpReadOnlyStore<>("t1-store");
         storeToProcessorNode.put(store1, new MockProcessorNode(-1));
         store2 = new NoOpReadOnlyStore("t2-store");
         storeToProcessorNode.put(store2, new MockProcessorNode(-1));
+        store3 = new NoOpReadOnlyStore("t3-store", false);
+        storeToProcessorNode.put(store2, new MockProcessorNode(-1));
         topology = new ProcessorTopology(Collections.<ProcessorNode>emptyList(),
                                          Collections.<String, SourceNode>emptyMap(),
                                          Collections.<String, SinkNode>emptyMap(),
                                          Collections.<StateStore>emptyList(),
                                          storeToTopic,
-                                         Arrays.<StateStore>asList(store1, store2));
+                                         Arrays.<StateStore>asList(store1, store2,
store3));
 
         context = new NoOpProcessorContext();
         stateDirPath = TestUtils.tempDirectory().getPath();
@@ -158,7 +163,7 @@ public class GlobalStateManagerImplTest {
     @Test
     public void shouldReturnInitializedStoreNames() {
         final Set<String> storeNames = stateManager.initialize(context);
-        assertEquals(Utils.mkSet(store1.name(), store2.name()), storeNames);
+        assertEquals(Utils.mkSet(store1.name(), store2.name(), store3.name()), storeNames);
     }
 
     @Test
@@ -462,6 +467,16 @@ public class GlobalStateManagerImplTest {
         assertThat(readOffsetsCheckpoint(), equalTo(checkpointMap));
     }
 
+    @Test
+    public void shouldSkipGlobalInMemoryStoreOffsetsToFile() throws IOException {
+        stateManager.initialize(context);
+        initializeConsumer(10, 1, t3);
+        stateManager.register(store3, stateRestoreCallback);
+        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+    }
+
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {
         final OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(new File(stateManager.baseDir(),
                                                                                 ProcessorStateManager.CHECKPOINT_FILE_NAME));
diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
index 0ada2e4..892a4dc 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpReadOnlyStore.java
@@ -25,17 +25,24 @@ public class NoOpReadOnlyStore<K, V>
         implements ReadOnlyKeyValueStore<K, V>, StateStore {
 
     private final String name;
+    private final boolean persistent;
     private boolean open = true;
     public boolean initialized;
     public boolean flushed;
 
 
     public NoOpReadOnlyStore() {
-        this("");
+        this("", true);
     }
 
     public NoOpReadOnlyStore(final String name) {
+        this(name, true);
+    }
+
+    public NoOpReadOnlyStore(final String name,
+                             final boolean persistent) {
         this.name = name;
+        this.persistent = persistent;
     }
 
     @Override
@@ -80,7 +87,7 @@ public class NoOpReadOnlyStore<K, V>
 
     @Override
     public boolean persistent() {
-        return false;
+        return persistent;
     }
 
     @Override

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message