kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mj...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch (#4507)
Date Wed, 07 Feb 2018 19:24:00 GMT
This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new b3cfc6f  KAFKA-6367: StateRestoreListener use actual last restored offset for restored
batch (#4507)
b3cfc6f is described below

commit b3cfc6fb3dfd029c5d9914419408492edd6919a7
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Wed Feb 7 14:07:32 2018 -0500

    KAFKA-6367: StateRestoreListener use actual last restored offset for restored batch (#4507)
    
    Author: Bill Bejeck <bill@confluent.io>
    
    Reviewers: Guozhang Wang <guozhang@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../streams/processor/StateRestoreListener.java    |  4 ++--
 .../processor/internals/StoreChangelogReader.java  |  5 +++--
 .../internals/StoreChangelogReaderTest.java        | 24 +++++++++++++++++-----
 3 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
index c80a736..ea1c288 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/StateRestoreListener.java
@@ -43,7 +43,7 @@ public interface StateRestoreListener {
      * @param topicPartition the TopicPartition containing the values to restore
      * @param storeName      the name of the store undergoing restoration
      * @param startingOffset the starting offset of the entire restoration process for this
TopicPartition
-     * @param endingOffset   the ending offset of the entire restoration process for this
TopicPartition
+     * @param endingOffset   the exclusive ending offset of the entire restoration process
for this TopicPartition
      */
     void onRestoreStart(final TopicPartition topicPartition,
                         final String storeName,
@@ -62,7 +62,7 @@ public interface StateRestoreListener {
      *
      * @param topicPartition the TopicPartition containing the values to restore
      * @param storeName the name of the store undergoing restoration
-     * @param batchEndOffset the ending offset for the current restored batch for this TopicPartition
+     * @param batchEndOffset the inclusive ending offset for the current restored batch for
this TopicPartition
      * @param numRestored the total number of records restored in this batch for this TopicPartition
      */
     void onBatchRestored(final TopicPartition topicPartition,
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
index 950a2c6..8d85b1d 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java
@@ -260,12 +260,14 @@ public class StoreChangelogReader implements ChangelogReader {
         long nextPosition = -1;
         int numberRecords = records.size();
         int numberRestored = 0;
+        long lastRestoredOffset = -1;
         for (final ConsumerRecord<byte[], byte[]> record : records) {
             final long offset = record.offset();
             if (restorer.hasCompleted(offset, endOffset)) {
                 nextPosition = record.offset();
                 break;
             }
+            lastRestoredOffset = offset;
             numberRestored++;
             if (record.key() != null) {
                 restoreRecords.add(KeyValue.pair(record.key(), record.value()));
@@ -281,8 +283,7 @@ public class StoreChangelogReader implements ChangelogReader {
 
         if (!restoreRecords.isEmpty()) {
             restorer.restore(restoreRecords);
-            restorer.restoreBatchCompleted(nextPosition, records.size());
-
+            restorer.restoreBatchCompleted(lastRestoredOffset, records.size());
         }
 
         return nextPosition;
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
index 9401029..bb0c51e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StoreChangelogReaderTest.java
@@ -198,15 +198,28 @@ public class StoreChangelogReaderTest {
         assertThat(callbackTwo.restored.size(), equalTo(3));
 
         assertAllCallbackStatesExecuted(callback, "storeName1");
-        assertCorrectOffsetsReportedByListener(callback, 0L, 10L, 10L);
+        assertCorrectOffsetsReportedByListener(callback, 0L, 9L, 10L);
 
         assertAllCallbackStatesExecuted(callbackOne, "storeName2");
-        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 5L, 5L);
+        assertCorrectOffsetsReportedByListener(callbackOne, 0L, 4L, 5L);
 
         assertAllCallbackStatesExecuted(callbackTwo, "storeName3");
-        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 3L, 3L);
+        assertCorrectOffsetsReportedByListener(callbackTwo, 0L, 2L, 3L);
     }
 
+    @Test
+    public void shouldOnlyReportTheLastRestoredOffset() {
+        setupConsumer(10, topicPartition);
+        changelogReader
+            .register(new StateRestorer(topicPartition, restoreListener, null, 5, true, "storeName1"));
+        changelogReader.restore(active);
+
+        assertThat(callback.restored.size(), equalTo(5));
+        assertAllCallbackStatesExecuted(callback, "storeName1");
+        assertCorrectOffsetsReportedByListener(callback, 0L, 4L, 5L);
+    }
+
+
     private void assertAllCallbackStatesExecuted(final MockStateRestoreListener restoreListener,
                                                  final String storeName) {
         assertThat(restoreListener.storeNameCalledStates.get(RESTORE_START), equalTo(storeName));
@@ -217,11 +230,12 @@ public class StoreChangelogReaderTest {
 
     private void assertCorrectOffsetsReportedByListener(final MockStateRestoreListener restoreListener,
                                                         final long startOffset,
-                                                        final long batchOffset, final long
endOffset) {
+                                                        final long batchOffset,
+                                                        final long totalRestored) {
 
         assertThat(restoreListener.restoreStartOffset, equalTo(startOffset));
         assertThat(restoreListener.restoredBatchOffset, equalTo(batchOffset));
-        assertThat(restoreListener.restoreEndOffset, equalTo(endOffset));
+        assertThat(restoreListener.totalNumRestored, equalTo(totalRestored));
     }
 
     @Test

-- 
To stop receiving notification emails like this one, please contact
mjsax@apache.org.

Mime
View raw message