kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 0.11.0 updated: KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641)
Date Fri, 14 Sep 2018 19:05:32 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 0.11.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/0.11.0 by this push:
     new 4c9d49b  KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file
does not exist (#5641)
4c9d49b is described below

commit 4c9d49bd3bb8381e95bba4e3223c0d6a3c3c8e22
Author: Matthias J. Sax <mjsax@apache.org>
AuthorDate: Fri Sep 14 12:05:24 2018 -0700

    KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist
(#5641)
    
    Reviews: Guozhang Wang <guozhang@confluent.io>, Bill Bejeck <bill@confluent.io>,
John Roesler <john@confluent.io>
---
 .../internals/AbstractProcessorContext.java        |  6 ++
 .../streams/processor/internals/AbstractTask.java  |  4 ++
 .../streams/processor/internals/AssignedTasks.java |  2 +-
 .../processor/internals/ChangelogReader.java       |  2 +-
 .../internals/InternalProcessorContext.java        |  5 ++
 .../processor/internals/ProcessorStateManager.java | 63 +++++++++++++++++++-
 .../streams/processor/internals/StateRestorer.java | 19 ++++--
 .../processor/internals/StoreChangelogReader.java  | 64 ++++++++++++++------
 .../streams/processor/internals/StreamThread.java  |  2 +-
 .../streams/integration/EosIntegrationTest.java    |  4 +-
 .../processor/internals/StateRestorerTest.java     |  4 +-
 .../internals/StoreChangelogReaderTest.java        | 68 +++++++++++-----------
 .../org/apache/kafka/test/MockChangelogReader.java |  3 +-
 .../apache/kafka/test/MockProcessorContext.java    |  3 +
 14 files changed, 184 insertions(+), 65 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 04af9f2..c094b83 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -193,4 +193,10 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
     public void initialized() {
         initialized = true;
     }
+
+    @Override
+    public void uninitialize() {
+        initialized = false;
+    }
+
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 7f6ac7c..f8f6416 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -226,6 +226,10 @@ public abstract class AbstractTask {
         }
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition partitions) {
+        stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext);
+    }
+
     /**
      * @throws ProcessorStateException if there is an error while closing the state manager
      * @param writeCheckpoint boolean indicating if a checkpoint file should be written
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index d59ec2b..ad4868f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -298,7 +298,7 @@ class AssignedTasks<T extends AbstractTask> {
         return suspended.values();
     }
 
-    Collection<T> restoringTasks() {
+    public Collection<T> restoringTasks() {
         return Collections.unmodifiableCollection(restoring.values());
     }
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index 5ebc34c..e82ee2c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -37,7 +37,7 @@ public interface ChangelogReader {
      * Restore all registered state stores by reading from their changelogs.
      * @return all topic partitions that have been restored
      */
-    Collection<TopicPartition> restore();
+    Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks);
 
     /**
      * @return the restored offsets for all persistent stores.
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 57bb3ac..b5719b1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -53,4 +53,9 @@ public interface InternalProcessorContext extends ProcessorContext {
      * Mark this contex as being initialized
      */
     void initialized();
+
+    /**
+     * Mark this context as being uninitialized
+     */
+    void uninitialize();
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 34a87ce..93e7ffc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
 import org.apache.kafka.streams.errors.StreamsException;
@@ -33,9 +34,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 
 public class ProcessorStateManager implements StateManager {
@@ -50,6 +53,7 @@ public class ProcessorStateManager implements StateManager {
     private final String logPrefix;
     private final boolean isStandby;
     private final ChangelogReader changelogReader;
+    private final boolean eosEnabled;
     private final Map<String, StateStore> stores;
     private final Map<String, StateStore> globalStores;
     private final Map<TopicPartition, Long> offsetLimits;
@@ -106,6 +110,7 @@ public class ProcessorStateManager implements StateManager {
         checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME));
         checkpointedOffsets = new HashMap<>(checkpoint.read());
 
+        this.eosEnabled = eosEnabled;
         if (eosEnabled) {
             // delete the checkpoint file after finish loading its stored offsets
             checkpoint.delete();
@@ -169,7 +174,8 @@ public class ProcessorStateManager implements StateManager {
                                                              stateRestoreCallback,
                                                              checkpointedOffsets.get(storePartition),
                                                              offsetLimit(storePartition),
-                                                             store.persistent()
+                                                             store.persistent(),
+                                                             store.name()
             );
 
             changelogReader.register(restorer);
@@ -178,6 +184,61 @@ public class ProcessorStateManager implements StateManager {
         stores.put(store.name(), store);
     }
 
+    void reinitializeStateStoresForPartitions(final TopicPartition topicPartition,
+                                              final InternalProcessorContext processorContext)
{
+        final Map<String, String> changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic);
+        final Set<String> storeToBeReinitialized = new HashSet<>();
+        final Map<String, StateStore> storesCopy = new HashMap<>(stores);
+
+        checkpointedOffsets.remove(topicPartition);
+        storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic()));
+
+        if (!eosEnabled) {
+            try {
+                checkpoint.write(checkpointedOffsets);
+            } catch (final IOException fatalException) {
+                log.error("Failed to write offset checkpoint file to {} while re-initializing
{}: {}", checkpoint, stores, fatalException);
+                throw new StreamsException("Failed to reinitialize stores.", fatalException);
+            }
+        }
+
+        for (final Map.Entry<String, StateStore> entry : storesCopy.entrySet()) {
+            final StateStore stateStore = entry.getValue();
+            final String storeName = stateStore.name();
+            if (storeToBeReinitialized.contains(storeName)) {
+                try {
+                    stateStore.close();
+                } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ }
+                processorContext.uninitialize();
+                stores.remove(entry.getKey());
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator
+ storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, fatalException);
+                    throw new StreamsException(String.format("Failed to reinitialize store
%s.", storeName), fatalException);
+                }
+
+                try {
+                    Utils.delete(new File(baseDir + File.separator + storeName));
+                } catch (final IOException fatalException) {
+                    log.error("Failed to reinitialize store {}.", storeName, fatalException);
+                    throw new StreamsException(String.format("Failed to reinitialize store
%s.", storeName), fatalException);
+                }
+
+                stateStore.init(processorContext, stateStore);
+            }
+        }
+    }
+
+    private Map<String, String> inverseOneToOneMap(final Map<String, String>
origin) {
+        final Map<String, String> reversedMap = new HashMap<>();
+        for (final Map.Entry<String, String> entry : origin.entrySet()) {
+            reversedMap.put(entry.getValue(), entry.getKey());
+        }
+        return reversedMap;
+    }
+
     @Override
     public Map<TopicPartition, Long> checkpointed() {
         final Map<TopicPartition, Long> partitionsAndOffsets = new HashMap<>();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index 79bfd1d..3c5efe8 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -22,12 +22,13 @@ import org.apache.kafka.streams.processor.StateRestoreCallback;
 public class StateRestorer {
     static final int NO_CHECKPOINT = -1;
 
-    private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
+    private final String storeName;
     private final TopicPartition partition;
     private final StateRestoreCallback stateRestoreCallback;
 
+    private long checkpointOffset;
     private long restoredOffset;
     private long startingOffset;
 
@@ -35,12 +36,14 @@ public class StateRestorer {
                   final StateRestoreCallback stateRestoreCallback,
                   final Long checkpoint,
                   final long offsetLimit,
-                  final boolean persistent) {
+                  final boolean persistent,
+                  final String storeName) {
         this.partition = partition;
         this.stateRestoreCallback = stateRestoreCallback;
-        this.checkpoint = checkpoint;
+        this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint;
         this.offsetLimit = offsetLimit;
         this.persistent = persistent;
+        this.storeName = storeName;
     }
 
     public TopicPartition partition() {
@@ -48,7 +51,15 @@ public class StateRestorer {
     }
 
     long checkpoint() {
-        return checkpoint == null ? NO_CHECKPOINT : checkpoint;
+        return checkpointOffset;
+    }
+
+    void setCheckpointOffset(final long checkpointOffset) {
+        this.checkpointOffset = checkpointOffset;
+    }
+
+    public String storeName() {
+        return storeName;
     }
 
     void restore(final byte[] key, final byte[] value) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 34dcb75..305bf10 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -60,13 +60,16 @@ public class StoreChangelogReader implements ChangelogReader {
 
     @Override
     public void register(final StateRestorer restorer) {
-        stateRestorers.put(restorer.partition(), restorer);
+        if (!stateRestorers.containsKey(restorer.partition())) {
+            stateRestorers.put(restorer.partition(), restorer);
+            log.trace("Added restorer for changelog {}", restorer.partition());
+        }
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-    public Collection<TopicPartition> restore() {
+    public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks)
{
         if (!needsInitializing.isEmpty()) {
-            initialize();
+            initialize(restoringTasks);
         }
 
         if (needsRestoring.isEmpty()) {
@@ -87,7 +90,7 @@ public class StoreChangelogReader implements ChangelogReader {
         return completed();
     }
 
-    private void initialize() {
+    private void initialize(final Collection<StreamTask> restoringTasks) {
         if (!consumer.subscription().isEmpty()) {
             throw new IllegalStateException("Restore consumer should not be subscribed to
any topics (" + consumer.subscription() + ")");
         }
@@ -139,11 +142,12 @@ public class StoreChangelogReader implements ChangelogReader {
 
         // set up restorer for those initializable
         if (!initializable.isEmpty()) {
-            startRestoration(initializable);
+            startRestoration(initializable, restoringTasks);
         }
     }
 
-    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized)
{
+    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized,
+                                  final Collection<StreamTask> restoringTasks) {
         log.debug("{} Start restoring state stores from changelog topics {}", logPrefix,
initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
@@ -152,24 +156,48 @@ public class StoreChangelogReader implements ChangelogReader {
 
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
         for (final StateRestorer restorer : initialized.values()) {
+            final TopicPartition restoringPartition = restorer.partition();
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
-                consumer.seek(restorer.partition(), restorer.checkpoint());
-                logRestoreOffsets(restorer.partition(),
-                        restorer.checkpoint(),
-                        endOffsets.get(restorer.partition()));
-                restorer.setStartingOffset(consumer.position(restorer.partition()));
+                consumer.seek(restoringPartition, restorer.checkpoint());
+                logRestoreOffsets(
+                    restoringPartition,
+                    restorer.checkpoint(),
+                    endOffsets.get(restoringPartition));
+                restorer.setStartingOffset(consumer.position(restoringPartition));
             } else {
-                consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
+                consumer.seekToBeginning(Collections.singletonList(restoringPartition));
                 needsPositionUpdate.add(restorer);
             }
         }
 
         for (final StateRestorer restorer : needsPositionUpdate) {
-            final long position = consumer.position(restorer.partition());
-            logRestoreOffsets(restorer.partition(),
-                    position,
-                    endOffsets.get(restorer.partition()));
-            restorer.setStartingOffset(position);
+            final TopicPartition restoringPartition = restorer.partition();
+
+            for (final StreamTask task : restoringTasks) {
+                if (task.changelogPartitions().contains(restoringPartition) || task.partitions().contains(restoringPartition))
{
+                    if (task.eosEnabled) {
+                        log.info("No checkpoint found for task {} state store {} changelog
{} with EOS turned on. " +
+                            "Reinitializing the task and restore its state from the beginning.",
task.id, restorer.storeName(), restorer.partition());
+
+                        needsInitializing.remove(restoringPartition);
+                        initialized.put(restoringPartition, restorer);
+                        restorer.setCheckpointOffset(consumer.position(restoringPartition));
+
+                        task.reinitializeStateStoresForPartitions(restoringPartition);
+                    } else {
+                        log.info("Restoring task {}'s state store {} from beginning of the
changelog {} ", task.id, restorer.storeName(), restorer.partition());
+
+                        final long position = consumer.position(restoringPartition);
+                        logRestoreOffsets(
+                            restoringPartition,
+                            position,
+                            endOffsets.get(restoringPartition));
+                        restorer.setStartingOffset(position);
+                    }
+                }
+            }
+
+
         }
 
         needsRestoring.putAll(initialized);
@@ -220,7 +248,7 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void restorePartition(final ConsumerRecords<byte[], byte[]> allRecords,
-                                    final TopicPartition topicPartition) {
+                                  final TopicPartition topicPartition) {
         final StateRestorer restorer = stateRestorers.get(topicPartition);
         final Long endOffset = endOffsets.get(topicPartition);
         final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 210b070..6b7f4f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -532,7 +532,7 @@ public class StreamThread extends Thread {
         active.initializeNewTasks();
         standby.initializeNewTasks();
 
-        final Collection<TopicPartition> restored = storeChangelogReader.restore();
+        final Collection<TopicPartition> restored = storeChangelogReader.restore(active.restoringTasks());
         final Set<TopicPartition> resumed = active.updateRestored(restored);
 
         if (!resumed.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 0c3b36a..7f70950 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -392,7 +392,7 @@ public class EosIntegrationTest {
         // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted
writes
         // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition)
         //
-        // the failure gets inject after 20 committed and 30 uncommitted records got received
+        // the failure gets inject after 20 committed and 10 uncommitted records got received
         // -> the failure only kills one thread
         // after fail over, we should read 40 committed records and the state stores should
contain the correct sums
         // per key (even if some records got processed twice)
@@ -402,7 +402,7 @@ public class EosIntegrationTest {
             streams.start();
 
             final List<KeyValue<Long, Long>> committedDataBeforeFailure = prepareData(0L,
10L, 0L, 1L);
-            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L);
+            final List<KeyValue<Long, Long>> uncommittedDataBeforeFailure = prepareData(10L,
15L, 0L, 1L, 2L, 3L);
 
             final List<KeyValue<Long, Long>> dataBeforeFailure = new ArrayList<>();
             dataBeforeFailure.addAll(committedDataBeforeFailure);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
index 6968f33..3a8ebda 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java
@@ -28,7 +28,7 @@ public class StateRestorerTest {
 
     private static final long OFFSET_LIMIT = 50;
     private final MockRestoreCallback callback = new MockRestoreCallback();
-    private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic",
1), callback, null, OFFSET_LIMIT, true);
+    private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic",
1), callback, null, OFFSET_LIMIT, true, "store");
 
     @Test
     public void shouldCallRestoreOnRestoreCallback() throws Exception {
@@ -53,7 +53,7 @@ public class StateRestorerTest {
 
     @Test
     public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception {
-        final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1),
callback, null, 0, true);
+        final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1),
callback, null, 0, true, "store");
         assertTrue(restorer.hasCompleted(0, 10));
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index a43f083..24820f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -59,8 +59,8 @@ public class StoreChangelogReaderTest {
         };
 
         final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertTrue(functionCalled.get());
     }
 
@@ -68,7 +68,7 @@ public class StoreChangelogReaderTest {
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
         consumer.subscribe(Collections.singleton("sometopic"));
         try {
-            changelogReader.restore();
+            changelogReader.restore(Collections.<StreamTask>emptySet());
             fail("Should have thrown IllegalStateException");
         } catch (final IllegalStateException e) {
             // ok
@@ -79,9 +79,9 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception
{
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(messages));
     }
 
@@ -89,9 +89,9 @@ public class StoreChangelogReaderTest {
     public void shouldRestoreMessagesFromCheckpoint() throws Exception {
         final int messages = 10;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE,
true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE,
true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(5));
     }
 
@@ -99,18 +99,18 @@ public class StoreChangelogReaderTest {
     public void shouldClearAssignmentAtEndOfRestore() throws Exception {
         final int messages = 1;
         setupConsumer(messages, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(consumer.assignment(), equalTo(Collections.<TopicPartition>emptySet()));
     }
 
     @Test
     public void shouldRestoreToLimitWhenSupplied() throws Exception {
         setupConsumer(10, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null,
3, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null,
3, true, "store");
         changelogReader.register(restorer);
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
     }
@@ -125,11 +125,11 @@ public class StoreChangelogReaderTest {
         setupConsumer(5, one);
         setupConsumer(3, two);
 
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
-        changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE,
true));
-        changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE,
true));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
+        changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE,
true, "store"));
+        changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE,
true, "store"));
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
 
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackOne.restored.size(), equalTo(5));
@@ -138,11 +138,11 @@ public class StoreChangelogReaderTest {
 
     @Test
     public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception {
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null,
Long.MAX_VALUE, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, callback, null,
Long.MAX_VALUE, true, "store");
         setupConsumer(0, topicPartition);
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(0L));
     }
@@ -151,11 +151,11 @@ public class StoreChangelogReaderTest {
     public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception {
         final Long endOffset = 10L;
         setupConsumer(endOffset, topicPartition);
-        final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset,
Long.MAX_VALUE, true);
+        final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset,
Long.MAX_VALUE, true, "store");
 
         changelogReader.register(restorer);
 
-        changelogReader.restore();
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(callback.restored.size(), equalTo(0));
         assertThat(restorer.restoredOffset(), equalTo(endOffset));
     }
@@ -163,8 +163,8 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L)));
     }
@@ -172,8 +172,8 @@ public class StoreChangelogReaderTest {
     @Test
     public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception {
         setupConsumer(10, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
         final Map<TopicPartition, Long> restoredOffsets = changelogReader.restoredOffsets();
         assertThat(restoredOffsets, equalTo(Collections.<TopicPartition, Long>emptyMap()));
     }
@@ -186,8 +186,8 @@ public class StoreChangelogReaderTest {
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
1, (byte[]) null, bytes));
         consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
2, bytes, bytes));
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false));
-        changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false, "store"));
+        changelogReader.restore(Collections.<StreamTask>emptySet());
 
         assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes,
bytes), KeyValue.pair(bytes, bytes))));
     }
@@ -200,15 +200,15 @@ public class StoreChangelogReaderTest {
             consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
i, bytes, bytes));
         }
         consumer.assign(Collections.singletonList(topicPartition));
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false, "store"));
 
-        final Collection<TopicPartition> completedFirstTime = changelogReader.restore();
+        final Collection<TopicPartition> completedFirstTime = changelogReader.restore(Collections.<StreamTask>emptySet());
         assertTrue(completedFirstTime.isEmpty());
         for (int i = 5; i < 10; i++) {
             consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(),
i, bytes, bytes));
         }
         final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
-        assertThat(changelogReader.restore(), equalTo(expected));
+        assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected));
     }
 
     private void setupConsumer(final long messages, final TopicPartition topicPartition)
{
@@ -237,8 +237,8 @@ public class StoreChangelogReaderTest {
     public void shouldCompleteImmediatelyWhenEndOffsetIs0() {
         final Collection<TopicPartition> expected = Collections.singleton(topicPartition);
         setupConsumer(0, topicPartition);
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true));
-        final Collection<TopicPartition> restored = changelogReader.restore();
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
true, "store"));
+        final Collection<TopicPartition> restored = changelogReader.restore(Collections.<StreamTask>emptySet());
         assertThat(restored, equalTo(expected));
     }
 
@@ -248,9 +248,9 @@ public class StoreChangelogReaderTest {
 
         setupConsumer(1, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
-        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false));
+        changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE,
false, "store"));
 
-        assertTrue(changelogReader.restore().isEmpty());
+        assertTrue(changelogReader.restore(Collections.<StreamTask>emptySet()).isEmpty());
 
         addRecords(9, topicPartition, 1);
 
@@ -259,12 +259,12 @@ public class StoreChangelogReaderTest {
         consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
 
-        changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null,
Long.MAX_VALUE, false));
+        changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null,
Long.MAX_VALUE, false, "store"));
 
         final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
         consumer.assign(expected);
 
-        assertThat(changelogReader.restore(), equalTo(expected));
+        assertThat(changelogReader.restore(Collections.<StreamTask>emptySet()), equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));
     }
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 54fd858..93ce801 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -19,6 +19,7 @@ package org.apache.kafka.test;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.streams.processor.internals.ChangelogReader;
 import org.apache.kafka.streams.processor.internals.StateRestorer;
+import org.apache.kafka.streams.processor.internals.StreamTask;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -35,7 +36,7 @@ public class MockChangelogReader implements ChangelogReader {
     }
 
     @Override
-    public Collection<TopicPartition> restore() {
+    public Collection<TopicPartition> restore(final Collection<StreamTask> restoringTasks)
{
         return registered;
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
index cb56fa1..fed8752 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java
@@ -143,6 +143,9 @@ public class MockProcessorContext implements InternalProcessorContext,
RecordCol
     public void initialized() {}
 
     @Override
+    public void uninitialize() {}
+
+    @Override
     public File stateDir() {
         if (stateDir == null) {
             throw new UnsupportedOperationException("State directory not specified");


Mime
View raw message