kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] 03/06: KAFKA-6711: Write global stores' offsets as -1
Date Wed, 13 Jun 2018 23:54:53 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch kafka-6711-GlobalStateManagerImpl-no-checkpoint-in-memory
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit b5c5fc37e9d374ecc80c70ba48c0c6e3b4066400
Author: Cemo <cemalettin.koc@gmail.com>
AuthorDate: Wed Mar 28 15:32:39 2018 +0300

    KAFKA-6711: Write global stores' offsets as -1
---
 .../kafka/streams/processor/internals/GlobalStateManagerImpl.java       | 2 +-
 .../kafka/streams/processor/internals/GlobalStateManagerImplTest.java   | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
index 26cfcf7..7c5c874 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
@@ -355,7 +355,7 @@ public class GlobalStateManagerImpl extends AbstractStateManager implements
Glob
         for (final Map.Entry<TopicPartition, Long> topicPartitionOffset : checkpointableOffsets.entrySet())
{
             final String topic = topicPartitionOffset.getKey().topic();
             if (globalNonPersistentStoresTopics.contains(topic)) {
-                log.debug("Skipping global store' topic {}", topic);
+                filteredOffsets.put(topicPartitionOffset.getKey(), (long) StateRestorer.NO_CHECKPOINT);
             } else {
                 filteredOffsets.put(topicPartitionOffset.getKey(), topicPartitionOffset.getValue());
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
index 7769c0a..19af5f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
@@ -495,7 +495,7 @@ public class GlobalStateManagerImplTest {
         stateManager.register(store3, stateRestoreCallback);
         stateManager.close(Collections.<TopicPartition, Long>emptyMap());
 
-        assertThat(readOffsetsCheckpoint(), equalTo(Collections.<TopicPartition, Long>emptyMap()));
+        assertThat(readOffsetsCheckpoint(), equalTo(Collections.singletonMap(t3, (long) StateRestorer.NO_CHECKPOINT)));
     }
 
     private Map<TopicPartition, Long> readOffsetsCheckpoint() throws IOException {

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message