From commits-return-10364-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Fri Sep 14 21:05:35 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 853B5180647 for ; Fri, 14 Sep 2018 21:05:34 +0200 (CEST) Received: (qmail 23975 invoked by uid 500); 14 Sep 2018 19:05:33 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 23966 invoked by uid 99); 14 Sep 2018 19:05:33 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 14 Sep 2018 19:05:33 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D750982DC3; Fri, 14 Sep 2018 19:05:32 +0000 (UTC) Date: Fri, 14 Sep 2018 19:05:32 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 0.11.0 updated: KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641) MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <153695193156.25182.3189865396712416577@gitbox.apache.org> From: mjsax@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/0.11.0 X-Git-Reftype: branch X-Git-Oldrev: 379211134740268b570fc8edd59ae78df0dffee9 X-Git-Newrev: 4c9d49bd3bb8381e95bba4e3223c0d6a3c3c8e22 X-Git-Rev: 4c9d49bd3bb8381e95bba4e3223c0d6a3c3c8e22 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. mjsax pushed a commit to branch 0.11.0 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/0.11.0 by this push: new 4c9d49b KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641) 4c9d49b is described below commit 4c9d49bd3bb8381e95bba4e3223c0d6a3c3c8e22 Author: Matthias J. Sax AuthorDate: Fri Sep 14 12:05:24 2018 -0700 KAFKA-7192: Wipe out state store if EOS is turned on and checkpoint file does not exist (#5641) Reviews: Guozhang Wang , Bill Bejeck , John Roesler --- .../internals/AbstractProcessorContext.java | 6 ++ .../streams/processor/internals/AbstractTask.java | 4 ++ .../streams/processor/internals/AssignedTasks.java | 2 +- .../processor/internals/ChangelogReader.java | 2 +- .../internals/InternalProcessorContext.java | 5 ++ .../processor/internals/ProcessorStateManager.java | 63 +++++++++++++++++++- .../streams/processor/internals/StateRestorer.java | 19 ++++-- .../processor/internals/StoreChangelogReader.java | 64 ++++++++++++++------ .../streams/processor/internals/StreamThread.java | 2 +- .../streams/integration/EosIntegrationTest.java | 4 +- .../processor/internals/StateRestorerTest.java | 4 +- .../internals/StoreChangelogReaderTest.java | 68 +++++++++++----------- .../org/apache/kafka/test/MockChangelogReader.java | 3 +- .../apache/kafka/test/MockProcessorContext.java | 3 + 14 files changed, 184 insertions(+), 65 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java index 04af9f2..c094b83 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java @@ -193,4 +193,10 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte public void initialized() { initialized = true; } + + @Override + public void uninitialize() { + initialized = false; + } + } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index 7f6ac7c..f8f6416 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -226,6 +226,10 @@ public abstract class AbstractTask { } } + void reinitializeStateStoresForPartitions(final TopicPartition partitions) { + stateMgr.reinitializeStateStoresForPartitions(partitions, processorContext); + } + /** * @throws ProcessorStateException if there is an error while closing the state manager * @param writeCheckpoint boolean indicating if a checkpoint file should be written diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java index d59ec2b..ad4868f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java @@ -298,7 +298,7 @@ class AssignedTasks { return suspended.values(); } - Collection restoringTasks() { + public Collection restoringTasks() { return Collections.unmodifiableCollection(restoring.values()); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java index 5ebc34c..e82ee2c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ChangelogReader.java @@ -37,7 +37,7 @@ public interface ChangelogReader { * Restore all registered state stores by reading from their changelogs. * @return all topic partitions that have been restored */ - Collection restore(); + Collection restore(final Collection restoringTasks); /** * @return the restored offsets for all persistent stores. diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java index 57bb3ac..b5719b1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java @@ -53,4 +53,9 @@ public interface InternalProcessorContext extends ProcessorContext { * Mark this contex as being initialized */ void initialized(); + + /** + * Mark this context as being uninitialized + */ + void uninitialize(); } diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 34a87ce..93e7ffc 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.LockException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.errors.StreamsException; @@ -33,9 +34,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Set; public class ProcessorStateManager implements StateManager { @@ -50,6 +53,7 @@ public class ProcessorStateManager implements StateManager { private final String logPrefix; private final boolean isStandby; private final ChangelogReader changelogReader; + private final boolean eosEnabled; private final Map stores; private final Map globalStores; private final Map offsetLimits; @@ -106,6 +110,7 @@ public class ProcessorStateManager implements StateManager { checkpoint = new OffsetCheckpoint(new File(baseDir, CHECKPOINT_FILE_NAME)); checkpointedOffsets = new HashMap<>(checkpoint.read()); + this.eosEnabled = eosEnabled; if (eosEnabled) { // delete the checkpoint file after finish loading its stored offsets checkpoint.delete(); @@ -169,7 +174,8 @@ public class ProcessorStateManager implements StateManager { stateRestoreCallback, checkpointedOffsets.get(storePartition), offsetLimit(storePartition), - store.persistent() + store.persistent(), + store.name() ); changelogReader.register(restorer); @@ -178,6 +184,61 @@ public class ProcessorStateManager implements StateManager { stores.put(store.name(), store); } + void reinitializeStateStoresForPartitions(final TopicPartition topicPartition, + final InternalProcessorContext processorContext) { + final Map changelogTopicToStore = inverseOneToOneMap(storeToChangelogTopic); + final Set storeToBeReinitialized = new HashSet<>(); + final Map storesCopy = new HashMap<>(stores); + + checkpointedOffsets.remove(topicPartition); + storeToBeReinitialized.add(changelogTopicToStore.get(topicPartition.topic())); + + if (!eosEnabled) { + try { + checkpoint.write(checkpointedOffsets); + } catch (final IOException fatalException) { + log.error("Failed to write offset checkpoint file to {} while re-initializing {}: {}", checkpoint, stores, fatalException); + throw new StreamsException("Failed to reinitialize stores.", fatalException); + } + } + + for (final Map.Entry entry : storesCopy.entrySet()) { + final StateStore stateStore = entry.getValue(); + final String storeName = stateStore.name(); + if (storeToBeReinitialized.contains(storeName)) { + try { + stateStore.close(); + } catch (final RuntimeException ignoreAndSwallow) { /* ignore */ } + processorContext.uninitialize(); + stores.remove(entry.getKey()); + + try { + Utils.delete(new File(baseDir + File.separator + "rocksdb" + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + try { + Utils.delete(new File(baseDir + File.separator + storeName)); + } catch (final IOException fatalException) { + log.error("Failed to reinitialize store {}.", storeName, fatalException); + throw new StreamsException(String.format("Failed to reinitialize store %s.", storeName), fatalException); + } + + stateStore.init(processorContext, stateStore); + } + } + } + + private Map inverseOneToOneMap(final Map origin) { + final Map reversedMap = new HashMap<>(); + for (final Map.Entry entry : origin.entrySet()) { + reversedMap.put(entry.getValue(), entry.getKey()); + } + return reversedMap; + } + @Override public Map checkpointed() { final Map partitionsAndOffsets = new HashMap<>(); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java index 79bfd1d..3c5efe8 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateRestorer.java @@ -22,12 +22,13 @@ import org.apache.kafka.streams.processor.StateRestoreCallback; public class StateRestorer { static final int NO_CHECKPOINT = -1; - private final Long checkpoint; private final long offsetLimit; private final boolean persistent; + private final String storeName; private final TopicPartition partition; private final StateRestoreCallback stateRestoreCallback; + private long checkpointOffset; private long restoredOffset; private long startingOffset; @@ -35,12 +36,14 @@ public class StateRestorer { final StateRestoreCallback stateRestoreCallback, final Long checkpoint, final long offsetLimit, - final boolean persistent) { + final boolean persistent, + final String storeName) { this.partition = partition; this.stateRestoreCallback = stateRestoreCallback; - this.checkpoint = checkpoint; + this.checkpointOffset = checkpoint == null ? NO_CHECKPOINT : checkpoint; this.offsetLimit = offsetLimit; this.persistent = persistent; + this.storeName = storeName; } public TopicPartition partition() { @@ -48,7 +51,15 @@ public class StateRestorer { } long checkpoint() { - return checkpoint == null ? NO_CHECKPOINT : checkpoint; + return checkpointOffset; + } + + void setCheckpointOffset(final long checkpointOffset) { + this.checkpointOffset = checkpointOffset; + } + + public String storeName() { + return storeName; } void restore(final byte[] key, final byte[] value) { diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java index 34dcb75..305bf10 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java @@ -60,13 +60,16 @@ public class StoreChangelogReader implements ChangelogReader { @Override public void register(final StateRestorer restorer) { - stateRestorers.put(restorer.partition(), restorer); + if (!stateRestorers.containsKey(restorer.partition())) { + stateRestorers.put(restorer.partition(), restorer); + log.trace("Added restorer for changelog {}", restorer.partition()); + } needsInitializing.put(restorer.partition(), restorer); } - public Collection restore() { + public Collection restore(final Collection restoringTasks) { if (!needsInitializing.isEmpty()) { - initialize(); + initialize(restoringTasks); } if (needsRestoring.isEmpty()) { @@ -87,7 +90,7 @@ public class StoreChangelogReader implements ChangelogReader { return completed(); } - private void initialize() { + private void initialize(final Collection restoringTasks) { if (!consumer.subscription().isEmpty()) { throw new IllegalStateException("Restore consumer should not be subscribed to any topics (" + consumer.subscription() + ")"); } @@ -139,11 +142,12 @@ public class StoreChangelogReader implements ChangelogReader { // set up restorer for those initializable if (!initializable.isEmpty()) { - startRestoration(initializable); + startRestoration(initializable, restoringTasks); } } - private void startRestoration(final Map initialized) { + private void startRestoration(final Map initialized, + final Collection restoringTasks) { log.debug("{} Start restoring state stores from changelog topics {}", logPrefix, initialized.keySet()); final Set assignment = new HashSet<>(consumer.assignment()); @@ -152,24 +156,48 @@ public class StoreChangelogReader implements ChangelogReader { final List needsPositionUpdate = new ArrayList<>(); for (final StateRestorer restorer : initialized.values()) { + final TopicPartition restoringPartition = restorer.partition(); if (restorer.checkpoint() != StateRestorer.NO_CHECKPOINT) { - consumer.seek(restorer.partition(), restorer.checkpoint()); - logRestoreOffsets(restorer.partition(), - restorer.checkpoint(), - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(consumer.position(restorer.partition())); + consumer.seek(restoringPartition, restorer.checkpoint()); + logRestoreOffsets( + restoringPartition, + restorer.checkpoint(), + endOffsets.get(restoringPartition)); + restorer.setStartingOffset(consumer.position(restoringPartition)); } else { - consumer.seekToBeginning(Collections.singletonList(restorer.partition())); + consumer.seekToBeginning(Collections.singletonList(restoringPartition)); needsPositionUpdate.add(restorer); } } for (final StateRestorer restorer : needsPositionUpdate) { - final long position = consumer.position(restorer.partition()); - logRestoreOffsets(restorer.partition(), - position, - endOffsets.get(restorer.partition())); - restorer.setStartingOffset(position); + final TopicPartition restoringPartition = restorer.partition(); + + for (final StreamTask task : restoringTasks) { + if (task.changelogPartitions().contains(restoringPartition) || task.partitions().contains(restoringPartition)) { + if (task.eosEnabled) { + log.info("No checkpoint found for task {} state store {} changelog {} with EOS turned on. " + + "Reinitializing the task and restore its state from the beginning.", task.id, restorer.storeName(), restorer.partition()); + + needsInitializing.remove(restoringPartition); + initialized.put(restoringPartition, restorer); + restorer.setCheckpointOffset(consumer.position(restoringPartition)); + + task.reinitializeStateStoresForPartitions(restoringPartition); + } else { + log.info("Restoring task {}'s state store {} from beginning of the changelog {} ", task.id, restorer.storeName(), restorer.partition()); + + final long position = consumer.position(restoringPartition); + logRestoreOffsets( + restoringPartition, + position, + endOffsets.get(restoringPartition)); + restorer.setStartingOffset(position); + } + } + } + + } needsRestoring.putAll(initialized); @@ -220,7 +248,7 @@ public class StoreChangelogReader implements ChangelogReader { } private void restorePartition(final ConsumerRecords allRecords, - final TopicPartition topicPartition) { + final TopicPartition topicPartition) { final StateRestorer restorer = stateRestorers.get(topicPartition); final Long endOffset = endOffsets.get(topicPartition); final long pos = processNext(allRecords.records(topicPartition), restorer, endOffset); diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 210b070..6b7f4f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -532,7 +532,7 @@ public class StreamThread extends Thread { active.initializeNewTasks(); standby.initializeNewTasks(); - final Collection restored = storeChangelogReader.restore(); + final Collection restored = storeChangelogReader.restore(active.restoringTasks()); final Set resumed = active.updateRestored(restored); if (!resumed.isEmpty()) { diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java index 0c3b36a..7f70950 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java @@ -392,7 +392,7 @@ public class EosIntegrationTest { // the app commits after each 10 records per partition, and thus will have 2*5 uncommitted writes // and store updates (ie, another 5 uncommitted writes to a changelog topic per partition) // - // the failure gets inject after 20 committed and 30 uncommitted records got received + // the failure gets inject after 20 committed and 10 uncommitted records got received // -> the failure only kills one thread // after fail over, we should read 40 committed records and the state stores should contain the correct sums // per key (even if some records got processed twice) @@ -402,7 +402,7 @@ public class EosIntegrationTest { streams.start(); final List> committedDataBeforeFailure = prepareData(0L, 10L, 0L, 1L); - final List> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L); + final List> uncommittedDataBeforeFailure = prepareData(10L, 15L, 0L, 1L, 2L, 3L); final List> dataBeforeFailure = new ArrayList<>(); dataBeforeFailure.addAll(committedDataBeforeFailure); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java index 6968f33..3a8ebda 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateRestorerTest.java @@ -28,7 +28,7 @@ public class StateRestorerTest { private static final long OFFSET_LIMIT = 50; private final MockRestoreCallback callback = new MockRestoreCallback(); - private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true); + private final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, OFFSET_LIMIT, true, "store"); @Test public void shouldCallRestoreOnRestoreCallback() throws Exception { @@ -53,7 +53,7 @@ public class StateRestorerTest { @Test public void shouldBeCompletedIfOffsetAndOffsetLimitAreZero() throws Exception { - final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true); + final StateRestorer restorer = new StateRestorer(new TopicPartition("topic", 1), callback, null, 0, true, "store"); assertTrue(restorer.hasCompleted(0, 10)); } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java index a43f083..24820f8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java @@ -59,8 +59,8 @@ public class StoreChangelogReaderTest { }; final StoreChangelogReader changelogReader = new StoreChangelogReader(consumer); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.restore(Collections.emptySet()); assertTrue(functionCalled.get()); } @@ -68,7 +68,7 @@ public class StoreChangelogReaderTest { public void shouldThrowExceptionIfConsumerHasCurrentSubscription() throws Exception { consumer.subscribe(Collections.singleton("sometopic")); try { - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); fail("Should have thrown IllegalStateException"); } catch (final IllegalStateException e) { // ok @@ -79,9 +79,9 @@ public class StoreChangelogReaderTest { public void shouldRestoreAllMessagesFromBeginningWhenCheckpointNull() throws Exception { final int messages = 10; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(messages)); } @@ -89,9 +89,9 @@ public class StoreChangelogReaderTest { public void shouldRestoreMessagesFromCheckpoint() throws Exception { final int messages = 10; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, 5L, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(5)); } @@ -99,18 +99,18 @@ public class StoreChangelogReaderTest { public void shouldClearAssignmentAtEndOfRestore() throws Exception { final int messages = 1; setupConsumer(messages, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(consumer.assignment(), equalTo(Collections.emptySet())); } @Test public void shouldRestoreToLimitWhenSupplied() throws Exception { setupConsumer(10, topicPartition); - final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, 3, true, "store"); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(3)); assertThat(restorer.restoredOffset(), equalTo(3L)); } @@ -125,11 +125,11 @@ public class StoreChangelogReaderTest { setupConsumer(5, one); setupConsumer(3, two); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true)); - changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.register(new StateRestorer(one, callbackOne, null, Long.MAX_VALUE, true, "store")); + changelogReader.register(new StateRestorer(two, callbackTwo, null, Long.MAX_VALUE, true, "store")); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackOne.restored.size(), equalTo(5)); @@ -138,11 +138,11 @@ public class StoreChangelogReaderTest { @Test public void shouldNotRestoreAnythingWhenPartitionIsEmpty() throws Exception { - final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store"); setupConsumer(0, topicPartition); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(0L)); } @@ -151,11 +151,11 @@ public class StoreChangelogReaderTest { public void shouldNotRestoreAnythingWhenCheckpointAtEndOffset() throws Exception { final Long endOffset = 10L; setupConsumer(endOffset, topicPartition); - final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true); + final StateRestorer restorer = new StateRestorer(topicPartition, callback, endOffset, Long.MAX_VALUE, true, "store"); changelogReader.register(restorer); - changelogReader.restore(); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored.size(), equalTo(0)); assertThat(restorer.restoredOffset(), equalTo(endOffset)); } @@ -163,8 +163,8 @@ public class StoreChangelogReaderTest { @Test public void shouldReturnRestoredOffsetsForPersistentStores() throws Exception { setupConsumer(10, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + changelogReader.restore(Collections.emptySet()); final Map restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.singletonMap(topicPartition, 10L))); } @@ -172,8 +172,8 @@ public class StoreChangelogReaderTest { @Test public void shouldNotReturnRestoredOffsetsForNonPersistentStore() throws Exception { setupConsumer(10, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); + changelogReader.restore(Collections.emptySet()); final Map restoredOffsets = changelogReader.restoredOffsets(); assertThat(restoredOffsets, equalTo(Collections.emptyMap())); } @@ -186,8 +186,8 @@ public class StoreChangelogReaderTest { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 1, (byte[]) null, bytes)); consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), 2, bytes, bytes)); consumer.assign(Collections.singletonList(topicPartition)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); - changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); + changelogReader.restore(Collections.emptySet()); assertThat(callback.restored, CoreMatchers.equalTo(Utils.mkList(KeyValue.pair(bytes, bytes), KeyValue.pair(bytes, bytes)))); } @@ -200,15 +200,15 @@ public class StoreChangelogReaderTest { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes)); } consumer.assign(Collections.singletonList(topicPartition)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); - final Collection completedFirstTime = changelogReader.restore(); + final Collection completedFirstTime = changelogReader.restore(Collections.emptySet()); assertTrue(completedFirstTime.isEmpty()); for (int i = 5; i < 10; i++) { consumer.addRecord(new ConsumerRecord<>(topicPartition.topic(), topicPartition.partition(), i, bytes, bytes)); } final Collection expected = Collections.singleton(topicPartition); - assertThat(changelogReader.restore(), equalTo(expected)); + assertThat(changelogReader.restore(Collections.emptySet()), equalTo(expected)); } private void setupConsumer(final long messages, final TopicPartition topicPartition) { @@ -237,8 +237,8 @@ public class StoreChangelogReaderTest { public void shouldCompleteImmediatelyWhenEndOffsetIs0() { final Collection expected = Collections.singleton(topicPartition); setupConsumer(0, topicPartition); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true)); - final Collection restored = changelogReader.restore(); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, true, "store")); + final Collection restored = changelogReader.restore(Collections.emptySet()); assertThat(restored, equalTo(expected)); } @@ -248,9 +248,9 @@ public class StoreChangelogReaderTest { setupConsumer(1, topicPartition); consumer.updateEndOffsets(Collections.singletonMap(topicPartition, 10L)); - changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(topicPartition, callback, null, Long.MAX_VALUE, false, "store")); - assertTrue(changelogReader.restore().isEmpty()); + assertTrue(changelogReader.restore(Collections.emptySet()).isEmpty()); addRecords(9, topicPartition, 1); @@ -259,12 +259,12 @@ public class StoreChangelogReaderTest { consumer.updateBeginningOffsets(Collections.singletonMap(postInitialization, 0L)); consumer.updateEndOffsets(Collections.singletonMap(postInitialization, 3L)); - changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false)); + changelogReader.register(new StateRestorer(postInitialization, callbackTwo, null, Long.MAX_VALUE, false, "store")); final Collection expected = Utils.mkSet(topicPartition, postInitialization); consumer.assign(expected); - assertThat(changelogReader.restore(), equalTo(expected)); + assertThat(changelogReader.restore(Collections.emptySet()), equalTo(expected)); assertThat(callback.restored.size(), equalTo(10)); assertThat(callbackTwo.restored.size(), equalTo(3)); } diff --git a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java index 54fd858..93ce801 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java +++ b/streams/src/test/java/org/apache/kafka/test/MockChangelogReader.java @@ -19,6 +19,7 @@ package org.apache.kafka.test; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.streams.processor.internals.ChangelogReader; import org.apache.kafka.streams.processor.internals.StateRestorer; +import org.apache.kafka.streams.processor.internals.StreamTask; import java.util.Collection; import java.util.Collections; @@ -35,7 +36,7 @@ public class MockChangelogReader implements ChangelogReader { } @Override - public Collection restore() { + public Collection restore(final Collection restoringTasks) { return registered; } diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index cb56fa1..fed8752 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -143,6 +143,9 @@ public class MockProcessorContext implements InternalProcessorContext, RecordCol public void initialized() {} @Override + public void uninitialize() {} + + @Override public File stateDir() { if (stateDir == null) { throw new UnsupportedOperationException("State directory not specified");