kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] 05/06: Github comments Moved immutable part into constructor Don't write non-persistent partitions into checkpoint file
Date Wed, 13 Jun 2018 23:54:55 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 1a349142cd6daf1dfeece02cc0553b1b7d82c2c8
Author: Matthias J. Sax <matthias@confluent.io>
AuthorDate: Wed Jun 13 16:30:19 2018 -0700

    Github comments
    Moved immutable part into constructor
    Don't write non-persistent partitions into checkpoint file
---
 .../internals/GlobalStateManagerImpl.java          | 23 +++++++++++-----------
 .../internals/GlobalStateManagerImplTest.java      |  4 ++--
 2 files changed, 13 insertions(+), 14 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index fac1526..05c82d4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -63,6 +63,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
     private final int retries;
     private final long retryBackoffMs;
     private final Duration pollTime;
+    private final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
 
     public GlobalStateManagerImpl(final LogContext logContext,
                                   final ProcessorTopology topology,
@@ -72,6 +73,14 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
                                   final StreamsConfig config) {
         super(stateDirectory.globalStateDir(), StreamsConfig.EXACTLY_ONCE.equals(config.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG)));
 
+        // Find non persistent store's topics
+        final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
+        for (final StateStore store : topology.globalStateStores()) {
+            if (!store.persistent()) {
+                globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
+            }
+        }
+
         this.log = logContext.logger(GlobalStateManagerImpl.class);
         this.topology = topology;
         this.globalConsumer = globalConsumer;
@@ -246,7 +255,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
         for (final TopicPartition topicPartition : topicPartitions) {
             globalConsumer.assign(Collections.singletonList(topicPartition));
             final Long checkpoint = checkpointableOffsets.get(topicPartition);
-            if (checkpoint != null && checkpoint > StateRestorer.NO_CHECKPOINT)
{
+            if (checkpoint != null) {
                 globalConsumer.seek(topicPartition, checkpoint);
             } else {
                 globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
@@ -337,16 +346,6 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
 
     @Override
     public void checkpoint(final Map<TopicPartition, Long> offsets) {
-
-        // Find non persistent store's topics
-        final Map<String, String> storeToChangelogTopic = topology.storeToChangelogTopic();
-        final Set<String> globalNonPersistentStoresTopics = new HashSet<>();
-        for (final StateStore store : topology.globalStateStores()) {
-            if (!store.persistent() && storeToChangelogTopic.containsKey(store.name()))
{
-                globalNonPersistentStoresTopics.add(storeToChangelogTopic.get(store.name()));
-            }
-        }
-
         checkpointableOffsets.putAll(offsets);
 
         final Map<TopicPartition, Long> filteredOffsets = new HashMap<>();
@@ -355,7 +354,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
         for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet())
{
             final String topic = topicPartitionOffset.getKey().topic();
             if (globalNonPersistentStoresTopics.contains(topic)) {
-                filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT);
+                filteredOffsets.remove(topicPartitionOffset.getKey());
             } else {
                 filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 19af5f4..e37f6a6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -493,9 +493,9 @@ public class GlobalStateManagerImplTest {
         stateManager.initialize();
         initializeConsumer(10, 1, t3);
         stateManager.register(store3, stateRestoreCallback);
-        stateManager.close(Collections.<TopicPartition, Long>emptyMap());
+        stateManager.close(Collections.emptyMap());
 
-        assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.emptyMap()));
     }
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message