kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset (#5430)
Date Sat, 28 Jul 2018 05:12:05 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c8c3a7d  KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset (#5430)
c8c3a7d is described below

commit c8c3a7dc48dc1ec6220798294da123bb617a6cd7
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Jul 27 22:11:55 2018 -0700

    KAFKA-7192 Follow-up: update checkpoint to the reset beginning offset (#5430)
    
    1. When we reinitialize the state store due to no CHECKPOINT with EOS turned on, we should
update the checkpoint to consumer.seekToBeginnning() / consumer.position() to avoid falling
into endless iterations.
    
    2. Fixed a few other logic bugs around needsInitializing and needsRestoring.
    
    Reviewers: Jason Gustafson <jason@confluent.io>, Bill Bejeck <bbejeck@gmail.com>
---
 .../streams/processor/internals/StateRestorer.java |  10 +-
 .../processor/internals/StoreChangelogReader.java  | 117 ++++++++++++---------
 .../streams/processor/internals/StreamThread.java  |   2 +-
 .../internals/StoreChangelogReaderTest.java        |  17 ++-
 4 files changed, 87 insertions(+), 59 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index c1a41ce..3bbf42e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -26,13 +26,13 @@ public class StateRestorer {
 
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
     private final String storeName;
     private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
     private long endingOffset;
@@ -45,7 +45,7 @@ public class StateRestorer {
                   final String storeName) {
         this.partition = partition;
         this.compositeRestoreListener = compositeRestoreListener;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
         this.storeName = storeName;
@@ -60,7 +60,11 @@ public class StateRestorer {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
     }
 
     void restoreStarted() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 1927b5a..9185920 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -48,8 +48,9 @@ public class StoreChangelogReader implements ChangelogReader {
     private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
     private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
-    private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
-    private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
+    private final Set<TopicPartition> needsRestoring = new HashSet<>();
+    private final Set<TopicPartition> needsInitializing = new HashSet<>();
+    private final Set<TopicPartition> completedRestorers = new HashSet<>();
     private final Duration pollTime;
 
     public StoreChangelogReader(final Consumer<byte[], byte[]> restoreConsumer,
@@ -64,9 +65,14 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        restorer.setUserRestoreListener(userStateRestoreListener);
-        stateRestorers.put(restorer.partition(), restorer);
-        needsInitializing.put(restorer.partition(), restorer);
+        if (!stateRestorers.containsKey(restorer.partition())) {
+            restorer.setUserRestoreListener(userStateRestoreListener);
+            stateRestorers.put(restorer.partition(), restorer);
+
+            log.trace("Added restorer for changelog {}", restorer.partition());
+        }
+
+        needsInitializing.add(restorer.partition());
     }
 
     public Collection<TopicPartition> restore(final RestoringTasks active) {
@@ -81,16 +87,15 @@ public class StoreChangelogReader implements ChangelogReader {
 
         try {
             final ConsumerRecords<byte[], byte[]> records = restoreConsumer.poll(pollTime);
-            final Iterator<TopicPartition> iterator = needsRestoring.keySet().iterator();
-            while (iterator.hasNext()) {
-                final TopicPartition partition = iterator.next();
+
+            for (final TopicPartition partition : needsRestoring) {
                 final StateRestorer restorer = stateRestorers.get(partition);
                 final long pos = processNext(records.records(partition), restorer, endOffsets.get(partition));
                 restorer.setRestoredOffset(pos);
                 if (restorer.hasCompleted(pos, endOffsets.get(partition))) {
                     restorer.restoreDone();
                     endOffsets.remove(partition);
-                    iterator.remove();
+                    completedRestorers.add(partition);
                 }
             }
         } catch (final InvalidOffsetException recoverableException) {
@@ -98,12 +103,18 @@ public class StoreChangelogReader implements ChangelogReader {
             final Set<TopicPartition> partitions = recoverableException.partitions();
             for (final TopicPartition partition : partitions) {
                 final StreamTask task = active.restoringTaskFor(partition);
-                log.info("Reinitializing StreamTask {}", task);
+                log.info("Reinitializing StreamTask {} for changelog {}", task, partition);
+
+                needsInitializing.remove(partition);
+                needsRestoring.remove(partition);
+
                 task.reinitializeStateStoresForPartitions(recoverableException.partitions());
             }
             restoreConsumer.seekToBeginning(partitions);
         }
 
+        needsRestoring.removeAll(completedRestorers);
+
         if (needsRestoring.isEmpty()) {
             restoreConsumer.unsubscribe();
         }
@@ -120,25 +131,24 @@ public class StoreChangelogReader implements ChangelogReader {
         // the needsInitializing map is not empty, meaning we do not know the metadata for
some of them yet
         refreshChangelogInfo();
 
-        final Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
-        for (final Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
-            final TopicPartition topicPartition = entry.getKey();
+        final Set<TopicPartition> initializable = new HashSet<>();
+        for (final TopicPartition topicPartition : needsInitializing) {
             if (hasPartition(topicPartition)) {
-                initializable.put(entry.getKey(), entry.getValue());
+                initializable.add(topicPartition);
             }
         }
 
         // try to fetch end offsets for the initializable restorers and remove any partitions
         // where we already have all of the data
         try {
-            endOffsets.putAll(restoreConsumer.endOffsets(initializable.keySet()));
+            endOffsets.putAll(restoreConsumer.endOffsets(initializable));
         } catch (final TimeoutException e) {
             // if timeout exception gets thrown we just give up this time and retry in the
next run loop
             log.debug("Could not fetch end offset for {}; will fall back to partition by
partition fetching", initializable);
             return;
         }
 
-        final Iterator<TopicPartition> iter = initializable.keySet().iterator();
+        final Iterator<TopicPartition> iter = initializable.iterator();
         while (iter.hasNext()) {
             final TopicPartition topicPartition = iter.next();
             final Long endOffset = endOffsets.get(topicPartition);
@@ -146,13 +156,15 @@ public class StoreChangelogReader implements ChangelogReader {
             // offset should not be null; but since the consumer API does not guarantee it
             // we add this check just in case
             if (endOffset != null) {
-                final StateRestorer restorer = needsInitializing.get(topicPartition);
+                final StateRestorer restorer = stateRestorers.get(topicPartition);
                 if (restorer.checkpoint() >= endOffset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
                     iter.remove();
+                    completedRestorers.add(topicPartition);
                 } else if (restorer.offsetLimit() == 0 || endOffset == 0) {
                     restorer.setRestoredOffset(0);
                     iter.remove();
+                    completedRestorers.add(topicPartition);
                 } else {
                     restorer.setEndingOffset(endOffset);
                 }
@@ -169,51 +181,59 @@ public class StoreChangelogReader implements ChangelogReader {
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+    private void startRestoration(final Set<TopicPartition> initialized,
                                   final RestoringTasks active) {
-        log.debug("Start restoring state stores from changelog topics {}", initialized.keySet());
+        log.debug("Start restoring state stores from changelog topics {}", initialized);
 
         final Set<TopicPartition> assignment = new HashSet<>(restoreConsumer.assignment());
-        assignment.addAll(initialized.keySet());
+        assignment.addAll(initialized);
         restoreConsumer.assign(assignment);
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-        for (final StateRestorer restorer : initialized.values()) {
+
+        for (final TopicPartition partition : initialized) {
+            final StateRestorer restorer = stateRestorers.get(partition);
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                restoreConsumer.seek(restorer.partition(), restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
-                                  restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
-                restorer.setStartingOffset(restoreConsumer.position(restorer.partition()));
+                restoreConsumer.seek(partition, restorer.checkpoint());
+                logRestoreOffsets(partition,
+                        restorer.checkpoint(),
+                        endOffsets.get(partition));
+                restorer.setStartingOffset(restoreConsumer.position(partition));
                 restorer.restoreStarted();
             } else {
-                final StreamTask task = active.restoringTaskFor(restorer.partition());
-
-                // If checkpoint does not exist it means the task was not shutdown gracefully
before;
-                // and in this case if EOS is turned on we should wipe out the state and
re-initialize the task
-                if (task.isEosEnabled()) {
-                    log.info("No checkpoint found for task {} state store {} changelog {}
with EOS turned on. " +
-                            "Reinitializing the task and restore its state from the beginning.",
task.id, restorer.storeName(), restorer.partition());
-                    task.reinitializeStateStoresForPartitions(Collections.singleton(restorer.partition()));
-                } else {
-                    log.info("Restoring task {}'s state store {} from beginning of the changelog
{} ", task.id, restorer.storeName(), restorer.partition());
-                }
-
-                restoreConsumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                restoreConsumer.seekToBeginning(Collections.singletonList(partition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = restoreConsumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
-            restorer.restoreStarted();
+            final TopicPartition partition = restorer.partition();
+
+            // If checkpoint does not exist it means the task was not shutdown gracefully
before;
+            // and in this case if EOS is turned on we should wipe out the state and re-initialize
the task
+            final StreamTask task = active.restoringTaskFor(partition);
+            if (task.isEosEnabled()) {
+                log.info("No checkpoint found for task {} state store {} changelog {} with
EOS turned on. " +
+                        "Reinitializing the task and restore its state from the beginning.",
task.id, restorer.storeName(), partition);
+
+                needsInitializing.remove(partition);
+                initialized.remove(partition);
+                restorer.setCheckpointOffset(restoreConsumer.position(partition));
+
+                task.reinitializeStateStoresForPartitions(Collections.singleton(partition));
+            } else {
+                log.info("Restoring task {}'s state store {} from beginning of the changelog
{} ", task.id, restorer.storeName(), partition);
+
+                final long position = restoreConsumer.position(restorer.partition());
+                logRestoreOffsets(restorer.partition(),
+                        position,
+                        endOffsets.get(restorer.partition()));
+                restorer.setStartingOffset(position);
+                restorer.restoreStarted();
+            }
         }
 
-        needsRestoring.putAll(initialized);
+        needsRestoring.addAll(initialized);
     }
 
     private void logRestoreOffsets(final TopicPartition partition,
@@ -226,10 +246,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private Collection<TopicPartition> completed() {
-        final Set<TopicPartition> completed = new HashSet<>(stateRestorers.keySet());
-        completed.removeAll(needsRestoring.keySet());
-        log.trace("The set of restoration completed partitions so far: {}", completed);
-        return completed;
+        return completedRestorers;
     }
 
     private void refreshChangelogInfo() {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 31de839..428aa1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -1129,7 +1129,7 @@ public class StreamThread extends Thread {
                         throw new TaskMigratedException(task);
                     }
 
-                    log.info("Reinitializing StandbyTask {}", task);
+                    log.info("Reinitializing StandbyTask {} from changelogs {}", task, recoverableException.partitions());
                     task.reinitializeStateStoresForPartitions(recoverableException.partitions());
                 }
                 restoreConsumer.seekToBeginning(partitions);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 1e74d47..ae48f57 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -100,7 +100,11 @@ public class StoreChangelogReaderTest {
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() {
         final StateRestorer mockRestorer = EasyMock.mock(StateRestorer.class);
         mockRestorer.setUserRestoreListener(stateRestoreListener);
-        expect(mockRestorer.partition()).andReturn(new TopicPartition("sometopic", 0)).andReturn(new
TopicPartition("sometopic", 0));
+        expect(mockRestorer.partition())
+                .andReturn(new TopicPartition("sometopic", 0))
+                .andReturn(new TopicPartition("sometopic", 0))
+                .andReturn(new TopicPartition("sometopic", 0))
+                .andReturn(new TopicPartition("sometopic", 0));
         EasyMock.replay(mockRestorer);
         changelogReader.register(mockRestorer);
 
@@ -144,6 +148,9 @@ public class StoreChangelogReaderTest {
 
         // first restore call "fails" but we should not die with an exception
         assertEquals(0, changelogReader.restore(active).size());
+
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
+                "storeName"));
         // retry restore should succeed
         assertEquals(1, changelogReader.restore(active).size());
         assertThat(callback.restored.size(), equalTo(messages));
@@ -226,9 +233,9 @@ public class StoreChangelogReaderTest {
         setupConsumer(3, two);
 
         changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, Long.MAX_VALUE,
true, "storeName1"));
-        changelogReader.register(new StateRestorer(one, restoreListener1, null, Long.MAX_VALUE,
true, "storeName2"));
-        changelogReader.register(new StateRestorer(two, restoreListener2, null, Long.MAX_VALUE,
true, "storeName3"));
+            .register(new StateRestorer(topicPartition, restoreListener, 0L, Long.MAX_VALUE,
true, "storeName1"));
+        changelogReader.register(new StateRestorer(one, restoreListener1, 0L, Long.MAX_VALUE,
true, "storeName2"));
+        changelogReader.register(new StateRestorer(two, restoreListener2, 0L, Long.MAX_VALUE,
true, "storeName3"));
 
         expect(active.restoringTaskFor(one)).andReturn(task);
         expect(active.restoringTaskFor(two)).andReturn(task);
@@ -257,7 +264,7 @@ public class StoreChangelogReaderTest {
     public void shouldOnlyReportTheLastRestoredOffset() {
         setupConsumer(10, topicPartition);
         changelogReader
-            .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
+            .register(new StateRestorer(topicPartition, restoreListener, 0L, 5, true, "storeName1"));
         expect(active.restoringTaskFor(topicPartition)).andStubReturn(task);
         replay(active, task);
         changelogReader.restore(active);


Mime
View raw message