kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 1.0 updated: KAFKA-6205: initialize topology after state stores restoration completed
Date Fri, 26 Jan 2018 18:09:03 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 1.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/1.0 by this push:
     new 82a92b7  KAFKA-6205: initialize topology after state stores restoration completed
82a92b7 is described below

commit 82a92b7ec9ec18e8fd269340e6c370d1a25722a5
Author: Bill Bejeck <bill@confluent.io>
AuthorDate: Fri Jan 26 09:24:40 2018 -0800

    KAFKA-6205: initialize topology after state stores restoration completed
    
    Initialize topology after state store restoration.
    Although IMHO updating some of the existing tests demonstrates the correct order of operations,
I'll probably add an integration test, but I wanted to get this PR in for feedback on the
approach.
    
    Author: Bill Bejeck <bill@confluent.io>
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>, Matthias J. Sax <mjsax@apache.org>
    
    Closes #4415 from bbejeck/KAFKA-6205_restore_state_stores_before_initializing_topology
    
    minor log4j edits
---
 .../streams/processor/internals/AbstractTask.java  |  5 ++--
 .../streams/processor/internals/AssignedTasks.java |  3 ++-
 .../streams/processor/internals/StandbyTask.java   | 10 ++++++--
 .../streams/processor/internals/StreamTask.java    | 13 ++++++----
 .../kafka/streams/processor/internals/Task.java    | 18 +++++++------
 .../streams/processor/internals/TaskManager.java   |  1 +
 .../processor/internals/AbstractTaskTest.java      | 10 +++++---
 .../processor/internals/AssignedTasksTest.java     | 30 ++++++++++++++++------
 .../processor/internals/StandbyTaskTest.java       | 10 ++++----
 .../processor/internals/StreamTaskTest.java        | 20 ++++++++++-----
 .../StreamThreadStateStoreProviderTest.java        |  4 +--
 .../kafka/test/ProcessorTopologyTestDriver.java    |  3 ++-
 12 files changed, 82 insertions(+), 45 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 52465ed..47d1e58 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -197,10 +197,11 @@ public abstract class AbstractTask implements Task {
     }
 
     /**
-     * @throws IllegalStateException If store gets registered after initialized is already
finished
      * @throws StreamsException if the store's change log does not contain the partition
+     *
+     * Package-private for testing only
      */
-    void initializeStateStores() {
+    void registerStateStores() {
         if (topology.stateStores().isEmpty()) {
             return;
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
index 5a8ad6b..6aa9c03 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AssignedTasks.java
@@ -119,7 +119,7 @@ class AssignedTasks implements RestoringTasks {
         for (final Iterator<Map.Entry<TaskId, Task>> it = created.entrySet().iterator();
it.hasNext(); ) {
             final Map.Entry<TaskId, Task> entry = it.next();
             try {
-                if (!entry.getValue().initialize()) {
+                if (!entry.getValue().initializeStateStores()) {
                     log.debug("Transitioning {} {} to restoring", taskTypeName, entry.getKey());
                     addToRestoring(entry.getValue());
                 } else {
@@ -289,6 +289,7 @@ class AssignedTasks implements RestoringTasks {
     private void transitionToRunning(final Task task, final Set<TopicPartition> readyPartitions)
{
         log.debug("transitioning {} {} to running", taskTypeName, task.id());
         running.put(task.id(), task);
+        task.initializeTopology();
         for (TopicPartition topicPartition : task.partitions()) {
             runningByPartition.put(topicPartition, task);
             if (task.hasStateStores()) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 98ec810..fb21ff4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -64,14 +64,20 @@ public class StandbyTask extends AbstractTask {
     }
 
     @Override
-    public boolean initialize() {
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
         checkpointedOffsets = Collections.unmodifiableMap(stateMgr.checkpointed());
         processorContext.initialized();
         taskInitialized = true;
         return true;
     }
 
+    @Override
+    public void initializeTopology() {
+        //no-op
+    }
+
     /**
      * <pre>
      * - update offset limits
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 8180b2c..8766e64 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -158,16 +158,19 @@ public class StreamTask extends AbstractTask implements ProcessorNodePunctuator
     }
 
     @Override
-    public boolean initialize() {
-        log.trace("Initializing");
-        initializeStateStores();
+    public boolean initializeStateStores() {
+        log.trace("Initializing state stores");
+        registerStateStores();
+        return changelogPartitions().isEmpty();
+    }
+
+    @Override
+    public void initializeTopology() {
         initTopology();
         processorContext.initialized();
         taskInitialized = true;
-        return changelogPartitions().isEmpty();
     }
 
-
     /**
      * <pre>
      * - re-initialize the task
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 80e5423..afd719b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -29,6 +29,16 @@ import java.util.Map;
 import java.util.Set;
 
 public interface Task {
+    /**
+     * Initialize the task and return {}true if the task is ready to run, i.e, it has not
state stores
+     * @return true if this task has no state stores that may need restoring.
+     * @throws IllegalStateException If store gets registered after initialized is already
finished
+     * @throws StreamsException if the store's change log does not contain the partition
+     */
+    boolean initializeStateStores();
+
+    void initializeTopology();
+
     void resume();
 
     void commit();
@@ -70,14 +80,6 @@ public interface Task {
     boolean hasStateStores();
 
     /**
-     * initialize the task and return true if the task is ready to run, i.e, it has not state
stores
-     * @return true if this task has no state stores that may need restoring.
-     * @throws IllegalStateException If store gets registered after initialized is already
finished
-     * @throws StreamsException if the store's change log does not contain the partition
-     */
-    boolean initialize();
-
-    /**
      * @return any changelog partitions associated with this task
      */
     Collection<TopicPartition> changelogPartitions();
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 5387425..385729f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -261,6 +261,7 @@ class TaskManager {
         standby.initializeNewTasks();
 
         final Collection<TopicPartition> restored = changelogReader.restore(active);
+
         resumed.addAll(active.updateRestored(restored));
 
         if (!resumed.isEmpty()) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
index 02aa0a0..e64aa21 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractTaskTest.java
@@ -86,7 +86,7 @@ public class AbstractTaskTest {
         final AbstractTask task = createTask(consumer, Collections.singletonList(store));
 
         try {
-            task.initializeStateStores();
+            task.registerStateStores();
             fail("Should have thrown LockException");
         } catch (final LockException e) {
             // ok
@@ -101,7 +101,7 @@ public class AbstractTaskTest {
 
         final AbstractTask task = createTask(consumer, Collections.<StateStore>emptyList());
 
-        task.initializeStateStores();
+        task.registerStateStores();
 
         // should fail if lock is called
         EasyMock.verify(stateDirectory);
@@ -176,10 +176,12 @@ public class AbstractTaskTest {
                 return 0;
             }
 
-            @Override
-            public boolean initialize() {
+            public boolean initializeStateStores() {
                 return false;
             }
+
+            @Override
+            public void initializeTopology() {}
         };
     }
 
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
index a721936..81a2ca3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AssignedTasksTest.java
@@ -89,7 +89,7 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldInitializeNewTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         EasyMock.replay(t1);
@@ -101,10 +101,14 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksNeedingRestoreToRestoring() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         final Set<TopicPartition> t2partitions = Collections.singleton(tp2);
         EasyMock.expect(t2.partitions()).andReturn(t2partitions);
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
@@ -125,7 +129,9 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldMoveInitializedTasksThatDontNeedRestoringToRunning() {
-        EasyMock.expect(t2.initialize()).andReturn(true);
+        EasyMock.expect(t2.initializeStateStores()).andReturn(true);
+        t2.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t2.partitions()).andReturn(Collections.singleton(tp2));
         EasyMock.expect(t2.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t2.hasStateStores()).andReturn(false);
@@ -142,10 +148,12 @@ public class AssignedTasksTest {
     @Test
     public void shouldTransitionFullyRestoredTasksToRunning() {
         final Set<TopicPartition> task1Partitions = Utils.mkSet(tp1);
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(task1Partitions).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Utils.mkSet(changeLog1, changeLog2)).anyTimes();
         EasyMock.expect(t1.hasStateStores()).andReturn(true).anyTimes();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         addAndInitTask();
@@ -169,7 +177,7 @@ public class AssignedTasksTest {
 
     @Test
     public void shouldCloseRestoringTasks() {
-        EasyMock.expect(t1.initialize()).andReturn(false);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(false);
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptySet());
         t1.close(false, false);
@@ -236,6 +244,8 @@ public class AssignedTasksTest {
         mockRunningTaskSuspension();
         t1.resume();
         EasyMock.expectLastCall();
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.replay(t1);
 
         assertThat(suspendTask(), nullValue());
@@ -266,7 +276,9 @@ public class AssignedTasksTest {
     }
 
     private void mockTaskInitialization() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1));
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList());
         EasyMock.expect(t1.hasStateStores()).andReturn(false);
@@ -449,7 +461,9 @@ public class AssignedTasksTest {
     }
 
     private void mockRunningTaskSuspension() {
-        EasyMock.expect(t1.initialize()).andReturn(true);
+        EasyMock.expect(t1.initializeStateStores()).andReturn(true);
+        t1.initializeTopology();
+        EasyMock.expectLastCall().once();
         EasyMock.expect(t1.hasStateStores()).andReturn(false).anyTimes();
         EasyMock.expect(t1.partitions()).andReturn(Collections.singleton(tp1)).anyTimes();
         EasyMock.expect(t1.changelogPartitions()).andReturn(Collections.<TopicPartition>emptyList()).anyTimes();
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index 3ecdbb4..e2d9c94 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -166,7 +166,7 @@ public class StandbyTaskTest {
     public void testStorePartitions() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology,
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         assertEquals(Utils.mkSet(partition2, partition1), new HashSet<>(task.checkpointedOffsets().keySet()));
     }
 
@@ -188,7 +188,7 @@ public class StandbyTaskTest {
     public void testUpdate() throws IOException {
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, topicPartitions, topology,
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         final Set<TopicPartition> partition = Collections.singleton(partition2);
         restoreStateConsumer.assign(partition);
 
@@ -236,7 +236,7 @@ public class StandbyTaskTest {
 
         StreamsConfig config = createConfig(baseDir);
         StandbyTask task = new StandbyTask(taskId, applicationId, ktablePartitions, ktableTopology,
consumer, changelogReader, config, null, stateDirectory);
-        task.initialize();
+        task.initializeStateStores();
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
 
         for (ConsumerRecord<Integer, Integer> record : Arrays.asList(
@@ -358,7 +358,7 @@ public class StandbyTaskTest {
                                                  null,
                                                  stateDirectory
         );
-        task.initialize();
+        task.initializeStateStores();
 
 
         restoreStateConsumer.assign(new ArrayList<>(task.checkpointedOffsets().keySet()));
@@ -411,7 +411,7 @@ public class StandbyTaskTest {
                 closedStateManager.set(true);
             }
         };
-        task.initialize();
+        task.initializeStateStores();
         try {
             task.close(true, false);
             fail("should have thrown exception");
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index 897a2c5..4bbd0d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -165,7 +165,8 @@ public class StreamTaskTest {
         stateDirectory = new StateDirectory("applicationId", baseDir.getPath(), new MockTime());
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer,
                               changelogReader, config, streamsMetrics, stateDirectory, null,
time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
     }
 
     @After
@@ -458,7 +459,9 @@ public class StreamTaskTest {
 
         task = new StreamTask(taskId00, applicationId, partitions, topology, consumer, changelogReader,
config,
             streamsMetrics, stateDirectory, null, time, producer);
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
+
         final int offset = 20;
         task.addRecords(partition1, Collections.singletonList(
                 new ConsumerRecord<>(partition1.topic(), partition1.partition(), offset,
0L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)));
@@ -616,7 +619,8 @@ public class StreamTaskTest {
                 };
             }
         };
-        streamTask.initialize();
+        streamTask.initializeStateStores();
+        streamTask.initializeTopology();
 
         time.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG));
 
@@ -743,7 +747,8 @@ public class StreamTaskTest {
     public void shouldThrowExceptionIfAnyExceptionsRaisedDuringCloseButStillCloseAllProcessorNodesTopology()
{
         task.close(true, false);
         task = createTaskThatThrowsExceptionOnClose();
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.close(true, false);
             fail("should have thrown runtime exception");
@@ -971,7 +976,8 @@ public class StreamTaskTest {
         final StreamTask task = new StreamTask(taskId00, applicationId, Utils.mkSet(partition1),
topology, consumer,
                                                changelogReader, eosConfig, streamsMetrics,
stateDirectory, null, time, producer);
 
-        task.initialize();
+        task.initializeStateStores();
+        task.initializeTopology();
         try {
             task.suspend();
             fail("should have thrown an exception");
@@ -1043,7 +1049,7 @@ public class StreamTaskTest {
                                                time,
                                                producer);
 
-        assertTrue(task.initialize());
+        assertTrue(task.initializeStateStores());
     }
 
     @Test
@@ -1071,7 +1077,7 @@ public class StreamTaskTest {
                                                time,
                                                producer);
 
-        assertFalse(task.initialize());
+        assertFalse(task.initializeStateStores());
     }
 
     @SuppressWarnings("unchecked")
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index 1368051..900c8d4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -106,12 +106,12 @@ public class StreamThreadStateStoreProviderTest {
         stateDirectory = new StateDirectory(applicationId, stateConfigDir, new MockTime());
         taskOne = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 0));
-        taskOne.initialize();
+        taskOne.initializeStateStores();
         tasks.put(new TaskId(0, 0),
                   taskOne);
         taskTwo = createStreamsTask(applicationId, streamsConfig, clientSupplier, topology,
                                     new TaskId(0, 1));
-        taskTwo.initialize();
+        taskTwo.initializeStateStores();
         tasks.put(new TaskId(0, 1),
                   taskTwo);
 
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 1d91b52..7d6dde5 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -248,7 +248,8 @@ public class ProcessorTopologyTestDriver {
                                   cache,
                                   new MockTime(),
                                   producer);
-            task.initialize();
+            task.initializeStateStores();
+            task.initializeTopology();
         }
     }
 

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message