kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-5797: Delay checking of partition existence in StoreChangelogReader
Date Wed, 30 Aug 2017 15:43:24 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 76c9a6dcb -> 6682abe4a


KAFKA-5797: Delay checking of partition existence in StoreChangelogReader

1. Remove timeout-based validatePartitionExists from StoreChangelogReader; instead only try
to refresh metadata once after all tasks have been created and their topology initialized
(hence all stores have been registered).
2. Add the logic to refresh partition metadata at the end of initialization if some restorers
needing initialization cannot find their changelogs, hoping that in the next run loop these
stores can find their changelogs.

As a result, after `initialize` is called we may not be able to start initializing all the
`needsInitializing` ones.

As an optimization, we would not call `consumer#partitionsFor` any more, but only `consumer#listTopics`
fetching all the topic metadata; so the only blocking calls left are `listTopics` and `endOffsets`,
and we always capture timeout exceptions around these two calls, and delay to retry in the
next run loop after refreshing the metadata. By doing this we can also reduce the number of
request round trips between consumer and brokers.

Author: Guozhang Wang <wangguoz@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>, Matthias J. Sax <matthias@confluent.io>

Closes #3748 from guozhangwang/K5797-handle-metadata-available


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/6682abe4
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/6682abe4
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/6682abe4

Branch: refs/heads/trunk
Commit: 6682abe4ae68fdbf0eb362e45f43ea14e2aba847
Parents: 76c9a6d
Author: Guozhang Wang <wangguoz@gmail.com>
Authored: Wed Aug 30 08:43:22 2017 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Aug 30 08:43:22 2017 -0700

----------------------------------------------------------------------
 .../processor/internals/AssignedTasks.java      |   2 +-
 .../processor/internals/ChangelogReader.java    |  12 +-
 .../internals/ProcessorStateManager.java        |   1 -
 .../processor/internals/StateRestorer.java      |  10 +-
 .../internals/StoreChangelogReader.java         | 154 ++++++++++---------
 .../processor/internals/StreamThread.java       |   2 -
 .../processor/internals/AbstractTaskTest.java   |   3 +-
 .../processor/internals/StandbyTaskTest.java    |   3 +-
 .../internals/StoreChangelogReaderTest.java     |  99 ++----------
 .../processor/internals/StreamTaskTest.java     |   3 +-
 .../StreamThreadStateStoreProviderTest.java     |   3 +-
 .../apache/kafka/test/MockChangelogReader.java  |   9 +-
 .../kafka/test/ProcessorTopologyTestDriver.java |   2 -
 13 files changed, 112 insertions(+), 191 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index c0c9ccc..f09c48e 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -124,7 +124,7 @@ class AssignedTasks {
                 it.remove();
             } catch (final LockException e) {
                 // made this trace as it will spam the logs in the poll loop.
-                log.trace("{} Could not create {} {} due to {}; will retry", logPrefix, taskTypeName,
entry.getKey(), e.getMessage());
+                log.trace("{} Could not create {} {} due to {}; will retry in the next run
loop", logPrefix, taskTypeName, entry.getKey(), e.getMessage());
             }
         }
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
index d8ed35e..5ebc34c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java
@@ -28,18 +28,10 @@ import java.util.Map;
  */
 public interface ChangelogReader {
     /**
-     * Validate that the partition exists on the cluster.
-     * @param topicPartition    partition to validate.
-     * @param storeName         name of the store the partition is for.
-     * @throws org.apache.kafka.streams.errors.StreamsException if partition doesn't exist
-     */
-    void validatePartitionExists(final TopicPartition topicPartition, final String storeName);
-
-    /**
      * Register a state store and it's partition for later restoration.
-     * @param restorationInfo
+     * @param restorer the state restorer to register
      */
-    void register(final StateRestorer restorationInfo);
+    void register(final StateRestorer restorer);
 
     /**
      * Restore all registered state stores by reading from their changelogs.

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index 978e24b..acd7674 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -157,7 +157,6 @@ public class ProcessorStateManager implements StateManager {
         }
 
         final TopicPartition storePartition = new TopicPartition(topic, getPartition(topic));
-        changelogReader.validatePartitionExists(storePartition, store.name());
 
         if (isStandby) {
             if (store.persistent()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
index ae68fd6..579561f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java
@@ -29,11 +29,13 @@ public class StateRestorer {
     private final Long checkpoint;
     private final long offsetLimit;
     private final boolean persistent;
-    private final TopicPartition partition;
     private final String storeName;
+    private final TopicPartition partition;
     private final CompositeRestoreListener compositeRestoreListener;
+
     private long restoredOffset;
     private long startingOffset;
+    private long endingOffset;
 
     StateRestorer(final TopicPartition partition,
                   final CompositeRestoreListener compositeRestoreListener,
@@ -57,7 +59,7 @@ public class StateRestorer {
         return checkpoint == null ? NO_CHECKPOINT : checkpoint;
     }
 
-    void restoreStarted(long startingOffset, long endingOffset) {
+    void restoreStarted() {
         compositeRestoreListener.onRestoreStart(partition, storeName, startingOffset, endingOffset);
     }
 
@@ -89,6 +91,10 @@ public class StateRestorer {
         this.startingOffset = Math.min(offsetLimit, startingOffset);
     }
 
+    void setEndingOffset(final long endingOffset) {
+        this.endingOffset = Math.min(offsetLimit, endingOffset);
+    }
+
     long startingOffset() {
         return startingOffset;
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index e2cb3a2..57dff64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -22,9 +22,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,6 +32,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,62 +40,27 @@ import java.util.Set;
 public class StoreChangelogReader implements ChangelogReader {
     private static final Logger log = LoggerFactory.getLogger(StoreChangelogReader.class);
 
-    private final Consumer<byte[], byte[]> consumer;
     private final String logPrefix;
-    private final Time time;
-    private final long partitionValidationTimeoutMs;
-    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
+    private final Consumer<byte[], byte[]> consumer;
     private final StateRestoreListener stateRestoreListener;
+    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
+    private final Map<String, List<PartitionInfo>> partitionInfo = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> stateRestorers = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsRestoring = new HashMap<>();
     private final Map<TopicPartition, StateRestorer> needsInitializing = new HashMap<>();
-    private final Map<TopicPartition, Long> endOffsets = new HashMap<>();
 
-    public StoreChangelogReader(final String threadId, final Consumer<byte[], byte[]>
consumer, final Time time,
-                                final long partitionValidationTimeoutMs, final StateRestoreListener
stateRestoreListener) {
-        this.time = time;
+    public StoreChangelogReader(final String threadId,
+                                final Consumer<byte[], byte[]> consumer,
+                                final StateRestoreListener stateRestoreListener) {
         this.consumer = consumer;
-        this.partitionValidationTimeoutMs = partitionValidationTimeoutMs;
 
         this.logPrefix = String.format("stream-thread [%s]", threadId);
         this.stateRestoreListener = stateRestoreListener;
     }
 
-    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer, final Time
time,
-                                long partitionValidationTimeoutMs, final StateRestoreListener
stateRestoreListener) {
-        this("", consumer, time, partitionValidationTimeoutMs, stateRestoreListener);
-    }
-
-    @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, final String
storeName) {
-        final long start = time.milliseconds();
-        // fetch all info on all topics to avoid multiple remote calls
-        if (partitionInfo.isEmpty()) {
-            try {
-                partitionInfo.putAll(consumer.listTopics());
-            } catch (final TimeoutException e) {
-                log.warn("{} Could not list topics so will fall back to partition by partition
fetching", logPrefix);
-            }
-        }
-
-        final long endTime = time.milliseconds() + partitionValidationTimeoutMs;
-        while (!hasPartition(topicPartition) && time.milliseconds() < endTime)
{
-            try {
-                final List<PartitionInfo> partitions = consumer.partitionsFor(topicPartition.topic());
-                if (partitions != null) {
-                    partitionInfo.put(topicPartition.topic(), partitions);
-                }
-            } catch (final TimeoutException e) {
-                throw new StreamsException(String.format("Could not fetch partition info
for topic: %s before expiration of the configured request timeout",
-                                                         topicPartition.topic()));
-            }
-        }
-
-        if (!hasPartition(topicPartition)) {
-            throw new StreamsException(String.format("Store %s's change log (%s) does not
contain partition %s",
-                                                     storeName, topicPartition.topic(), topicPartition.partition()));
-        }
-        log.debug("{} Took {}ms to validate that partition {} exists", logPrefix, time.milliseconds()
- start, topicPartition);
+    public StoreChangelogReader(final Consumer<byte[], byte[]> consumer,
+                                final StateRestoreListener stateRestoreListener) {
+        this("", consumer, stateRestoreListener);
     }
 
     @Override
@@ -106,7 +70,6 @@ public class StoreChangelogReader implements ChangelogReader {
         needsInitializing.put(restorer.partition(), restorer);
     }
 
-
     public Collection<TopicPartition> restore() {
         if (!needsInitializing.isEmpty()) {
             initialize();
@@ -131,45 +94,79 @@ public class StoreChangelogReader implements ChangelogReader {
     }
 
     private void initialize() {
-        final Map<TopicPartition, StateRestorer> newTasksNeedingRestoration = new HashMap<>();
-
         if (!consumer.subscription().isEmpty()) {
-            throw new IllegalStateException(String.format("Restore consumer should have not
subscribed to any partitions (%s) beforehand", consumer.subscription()));
+            throw new IllegalStateException("Restore consumer should not be subscribed to
any topics (" + consumer.subscription() + ")");
         }
-        endOffsets.putAll(consumer.endOffsets(needsInitializing.keySet()));
-
-        // remove any partitions where we already have all of the data
-        for (final Map.Entry<TopicPartition, Long> entry : endOffsets.entrySet()) {
-            TopicPartition topicPartition = entry.getKey();
-            Long offset = entry.getValue();
-            final StateRestorer restorer = needsInitializing.get(topicPartition);
-            // might be null as has was initialized in a previous invocation.
-            if (restorer != null) {
-                if (restorer.checkpoint() >= offset) {
+
+        // first refresh the changelog partition information from brokers, since initialize
is only called when
+        // the needsInitializing map is not empty, meaning we do not know the metadata for
some of them yet
+        refreshChangelogInfo();
+
+        Map<TopicPartition, StateRestorer> initializable = new HashMap<>();
+        for (Map.Entry<TopicPartition, StateRestorer> entry : needsInitializing.entrySet())
{
+            final TopicPartition topicPartition = entry.getKey();
+            if (hasPartition(topicPartition)) {
+                initializable.put(entry.getKey(), entry.getValue());
+            }
+        }
+
+        // try to fetch end offsets for the initializable restorers and remove any partitions
+        // where we already have all of the data
+        try {
+            endOffsets.putAll(consumer.endOffsets(initializable.keySet()));
+        } catch (final TimeoutException e) {
+            // if timeout exception gets thrown we just give up this time and retry in the
next run loop
+            log.debug("{} Could not fetch end offset for {}; will fall back to partition
by partition fetching", logPrefix, initializable);
+            return;
+        }
+
+        final Iterator<TopicPartition> iter = initializable.keySet().iterator();
+        while (iter.hasNext()) {
+            final TopicPartition topicPartition = iter.next();
+            final Long endOffset = endOffsets.get(topicPartition);
+
+            // offset should not be null; but since the consumer API does not guarantee it
+            // we add this check just in case
+            if (endOffset != null) {
+                final StateRestorer restorer = needsInitializing.get(topicPartition);
+                if (restorer.checkpoint() >= endOffset) {
                     restorer.setRestoredOffset(restorer.checkpoint());
-                } else if (restorer.offsetLimit() == 0 || endOffsets.get(topicPartition)
== 0) {
+                    iter.remove();
+                } else if (restorer.offsetLimit() == 0 || endOffset == 0) {
                     restorer.setRestoredOffset(0);
+                    iter.remove();
                 } else {
-                    newTasksNeedingRestoration.put(topicPartition, restorer);
-                    final Long endOffset = endOffsets.get(topicPartition);
-                    restorer.restoreStarted(restorer.startingOffset(), endOffset);
+                    restorer.setEndingOffset(endOffset);
                 }
+                needsInitializing.remove(topicPartition);
+            } else {
+                log.info("{} End offset cannot be found form the returned metadata; removing
this partition from the current run loop", logPrefix);
+                iter.remove();
             }
         }
 
-        log.debug("{} Starting restoring state stores from changelog topics {}", logPrefix,
newTasksNeedingRestoration.keySet());
+        // set up restorer for those initializable
+        if (!initializable.isEmpty()) {
+            startRestoration(initializable);
+        }
+    }
+
+    private void startRestoration(final Map<TopicPartition, StateRestorer> initialized)
{
+        log.debug("{} Start restoring state stores from changelog topics {}", logPrefix,
initialized.keySet());
 
         final Set<TopicPartition> assignment = new HashSet<>(consumer.assignment());
-        assignment.addAll(newTasksNeedingRestoration.keySet());
+        assignment.addAll(initialized.keySet());
         consumer.assign(assignment);
+
         final List<StateRestorer> needsPositionUpdate = new ArrayList<>();
-        for (final StateRestorer restorer : newTasksNeedingRestoration.values()) {
+        for (final StateRestorer restorer : initialized.values()) {
             if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) {
                 consumer.seek(restorer.partition(), restorer.checkpoint());
                 logRestoreOffsets(restorer.partition(),
-                                  restorer.checkpoint(),
-                                  endOffsets.get(restorer.partition()));
+                        restorer.checkpoint(),
+                        endOffsets.get(restorer.partition()));
                 restorer.setStartingOffset(consumer.position(restorer.partition()));
+                restorer.restoreStarted();
             } else {
                 consumer.seekToBeginning(Collections.singletonList(restorer.partition()));
                 needsPositionUpdate.add(restorer);
@@ -178,14 +175,14 @@ public class StoreChangelogReader implements ChangelogReader {
 
         for (final StateRestorer restorer : needsPositionUpdate) {
             final long position = consumer.position(restorer.partition());
-            restorer.setStartingOffset(position);
             logRestoreOffsets(restorer.partition(),
-                              position,
-                              endOffsets.get(restorer.partition()));
+                    position,
+                    endOffsets.get(restorer.partition()));
+            restorer.setStartingOffset(position);
+            restorer.restoreStarted();
         }
 
-        needsRestoring.putAll(newTasksNeedingRestoration);
-        needsInitializing.clear();
+        needsRestoring.putAll(initialized);
     }
 
     private void logRestoreOffsets(final TopicPartition partition, final long startingOffset,
final Long endOffset) {
@@ -203,6 +200,14 @@ public class StoreChangelogReader implements ChangelogReader {
         return completed;
     }
 
+    private void refreshChangelogInfo() {
+        try {
+            partitionInfo.putAll(consumer.listTopics());
+        } catch (final TimeoutException e) {
+            log.debug("{} Could not fetch topic metadata within the timeout, will retry in
the next run loop", logPrefix);
+        }
+    }
+
     @Override
     public Map<TopicPartition, Long> restoredOffsets() {
         final Map<TopicPartition, Long> restoredOffsets = new HashMap<>();
@@ -294,6 +299,5 @@ public class StoreChangelogReader implements ChangelogReader {
         }
 
         return false;
-
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a848172..d978f3d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -638,8 +638,6 @@ public class StreamThread extends Thread implements ThreadDataProvider
{
         final Consumer<byte[], byte[]> restoreConsumer = clientSupplier.getRestoreConsumer(consumerConfigs);
         final StoreChangelogReader changelogReader = new StoreChangelogReader(threadClientId,
                                                                               restoreConsumer,
-                                                                              time,
-                                                                              config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
                                                                               stateRestoreListener);
 
         Producer<byte[], byte[]> threadProducer = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index d6709b8..43fe24f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.AuthorizationException;
 import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.LockException;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -122,7 +121,7 @@ public class AbstractTaskTest {
                                                       Collections.<String, String>emptyMap(),
                                                       Collections.<StateStore>emptyList()),
                                 consumer,
-                                new StoreChangelogReader(consumer, Time.SYSTEM, 5000, new
MockStateRestoreListener()),
+                                new StoreChangelogReader(consumer, new MockStateRestoreListener()),
                                 false,
                                 stateDirectory,
                                 config) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index f22e773..40a66da 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -29,7 +29,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
@@ -133,7 +132,7 @@ public class StandbyTaskTest {
 
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final MockRestoreConsumer restoreStateConsumer = new MockRestoreConsumer();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
Time.SYSTEM, 5000, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
stateRestoreListener);
 
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 8ac9a62..a480ec1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -22,11 +22,8 @@ import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.test.MockRestoreCallback;
 import org.apache.kafka.test.MockStateRestoreListener;
@@ -38,6 +35,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_BATCH;
 import static org.apache.kafka.test.MockStateRestoreListener.RESTORE_END;
@@ -53,94 +51,32 @@ public class StoreChangelogReaderTest {
     private final CompositeRestoreListener restoreListener = new CompositeRestoreListener(callback);
     private final MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer,
new MockTime(), 0, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer,
stateRestoreListener);
     private final TopicPartition topicPartition = new TopicPartition("topic", 0);
-    private final PartitionInfo partitionInfo = new PartitionInfo(topicPartition.topic(),
0, null, null, null);
 
     @Before
     public void setUp() {
         restoreListener.setGlobalRestoreListener(stateRestoreListener);
     }
 
-    @SuppressWarnings("unchecked")
     @Test
-    public void shouldThrowStreamsExceptionWhenTimeoutExceptionThrown() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
+    public void shouldRequestTopicsAndHandleTimeoutException() throws Exception {
+        final AtomicBoolean functionCalled = new AtomicBoolean(false);
+        final MockConsumer<byte[], byte[]> consumer = new MockConsumer<byte[], byte[]>(OffsetResetStrategy.EARLIEST)
{
             @Override
             public Map<String, List<PartitionInfo>> listTopics() {
+                functionCalled.set(true);
                 throw new TimeoutException("KABOOM!");
             }
         };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
MockTime(), 0, stateRestoreListener);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfPartitionDoesntExistAfterMaxWait() throws Exception
{
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldFallbackToPartitionsForIfPartitionNotInAllPartitionsList() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                return Collections.singletonList(partitionInfo);
-            }
-        };
-
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
-            MockTime(), 10, stateRestoreListener);
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
 
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldThrowStreamsExceptionIfTimeoutOccursDuringPartitionsFor() throws Exception
{
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public List<PartitionInfo> partitionsFor(final String topic) {
-                throw new TimeoutException("KABOOM!");
-            }
-        };
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, new
-            MockTime(), 5, stateRestoreListener);
-        try {
-            changelogReader.validatePartitionExists(topicPartition, "store");
-            fail("Should have thrown streams exception");
-        } catch (final StreamsException e) {
-            // pass
-        }
-    }
-
-    @Test
-    public void shouldPassIfTopicPartitionExists() throws Exception {
-        consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
-        changelogReader.validatePartitionExists(topicPartition, "store");
-    }
-
-    @SuppressWarnings("unchecked")
-    @Test
-    public void shouldRequestPartitionInfoIfItDoesntExist() throws Exception {
-        final MockConsumer<byte[], byte[]> consumer = new MockConsumer(OffsetResetStrategy.EARLIEST)
{
-            @Override
-            public Map<String, List<PartitionInfo>> listTopics() {
-                return Collections.emptyMap();
-            }
-        };
-
-        consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(partitionInfo));
-        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, Time.SYSTEM,
5000, stateRestoreListener);
-        changelogReader.validatePartitionExists(topicPartition, "store");
+        final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer, stateRestoreListener);
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
+                "storeName"));
+        changelogReader.restore();
+        assertTrue(functionCalled.get());
     }
 
-
     @Test
     public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception {
         consumer.subscribe(Collections.singleton("sometopic"));
@@ -158,7 +94,6 @@ public class StoreChangelogReaderTest {
         setupConsumer(messages, topicPartition);
         changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, true,
                                                    "storeName"));
-
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(messages));
     }
@@ -191,7 +126,6 @@ public class StoreChangelogReaderTest {
         final StateRestorer restorer = new StateRestorer(topicPartition, restoreListener,
null, 3, true,
                                                          "storeName");
         changelogReader.register(restorer);
-
         changelogReader.restore();
         assertThat(callback.restored.size(), equalTo(3));
         assertThat(restorer.restoredOffset(), equalTo(3L));
@@ -350,23 +284,22 @@ public class StoreChangelogReaderTest {
 
         setupConsumer(1, topicPartition);
         consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L));
-        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false,
-                                                   "storeName"));
+        changelogReader.register(new StateRestorer(topicPartition, restoreListener, null,
Long.MAX_VALUE, false, "storeName"));
 
         assertTrue(changelogReader.restore().isEmpty());
 
+        addRecords(9, topicPartition, 1);
+
         final TopicPartition postInitialization = new TopicPartition("other", 0);
+        setupConsumer(3, postInitialization);
         consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L));
         consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L));
 
         changelogReader.register(new StateRestorer(postInitialization, restoreListener2,
null, Long.MAX_VALUE, false, "otherStore"));
 
-        addRecords(9, topicPartition, 1);
-
         final Collection<TopicPartition> expected = Utils.mkSet(topicPartition, postInitialization);
-
         consumer.assign(expected);
-        addRecords(3, postInitialization, 0);
+
         assertThat(changelogReader.restore(), equalTo(expected));
         assertThat(callback.restored.size(), equalTo(10));
         assertThat(callbackTwo.restored.size(), equalTo(3));

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index a9d3cac..4246b17 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -34,7 +34,6 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsMetrics;
@@ -115,7 +114,7 @@ public class StreamTaskTest {
     private final MockProducer<byte[], byte[]> producer = new MockProducer<>(false,
bytesSerializer, bytesSerializer);
     private final MockConsumer<byte[], byte[]> restoreStateConsumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
     private final StateRestoreListener stateRestoreListener = new MockStateRestoreListener();
-    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
Time.SYSTEM, 5000, stateRestoreListener);
+    private final StoreChangelogReader changelogReader = new StoreChangelogReader(restoreStateConsumer,
stateRestoreListener);
     private final byte[] recordValue = intSerializer.serialize(null, 10);
     private final byte[] recordKey = intSerializer.serialize(null, 1);
     private final String applicationId = "applicationId";

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index ea89494..fa22d7e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
@@ -188,7 +187,7 @@ public class StreamThreadStateStoreProviderTest {
             Collections.singletonList(new TopicPartition(topicName, taskId.partition)),
             topology,
             clientSupplier.consumer,
-            new StoreChangelogReader(clientSupplier.restoreConsumer, Time.SYSTEM, 5000, new
MockStateRestoreListener()),
+            new StoreChangelogReader(clientSupplier.restoreConsumer, new MockStateRestoreListener()),
             streamsConfig,
             new MockStreamsMetrics(new Metrics()),
             stateDirectory,

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
index 86c0eb5..54fd858 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java
@@ -30,13 +30,8 @@ public class MockChangelogReader implements ChangelogReader {
     private final Set<TopicPartition> registered = new HashSet<>();
 
     @Override
-    public void validatePartitionExists(final TopicPartition topicPartition, final String
storeName) {
-
-    }
-
-    @Override
-    public void register(final StateRestorer restorationInfo) {
-        registered.add(restorationInfo.partition());
+    public void register(final StateRestorer restorer) {
+        registered.add(restorer.partition());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/6682abe4/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index f0e245c..47124ed 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -222,8 +222,6 @@ public class ProcessorTopologyTestDriver {
                                   consumer,
                                   new StoreChangelogReader(
                                       createRestoreConsumer(topology.storeToChangelogTopic()),
-                                      Time.SYSTEM,
-                                      5000,
                                       stateRestoreListener),
                                   config,
                                   streamsMetrics, stateDirectory,


Mime
View raw message