kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Remove unused constructor param from ProcessorStateManager
Date Mon, 16 Jan 2017 19:40:50 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 6e42ce8fe -> b62804a25


MINOR: Remove unused constructor param from ProcessorStateManager

Remove applicationId parameter as it is no longer used.

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Guozhang Wang <wangguoz@gmail.com>

Closes #2385 from dguy/minor-remove-unused-param


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b62804a2
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b62804a2
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b62804a2

Branch: refs/heads/trunk
Commit: b62804a252b96932952006a483b3afd7da7363ab
Parents: 6e42ce8
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Jan 16 11:40:47 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Mon Jan 16 11:40:47 2017 -0800

----------------------------------------------------------------------
 .../streams/processor/internals/AbstractTask.java   |  2 +-
 .../processor/internals/ProcessorStateManager.java  |  3 +--
 .../internals/ProcessorStateManagerTest.java        | 16 ++++++++--------
 3 files changed, 10 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
index 0730c68..55418d5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java
@@ -70,7 +70,7 @@ public abstract class AbstractTask {
 
         // create the processor state manager
         try {
-            this.stateMgr = new ProcessorStateManager(applicationId, id, partitions, restoreConsumer,
isStandby, stateDirectory, topology.storeToChangelogTopic());
+            this.stateMgr = new ProcessorStateManager(id, partitions, restoreConsumer, isStandby,
stateDirectory, topology.storeToChangelogTopic());
 
         } catch (IOException e) {
             throw new ProcessorStateException(String.format("task [%s] Error while creating
the state manager", id), e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index a21c3e8..ad16c77 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -74,8 +74,7 @@ public class ProcessorStateManager implements StateManager {
      *                       (this might be recoverable by retrying)
      * @throws IOException if any severe error happens while creating or locking the state
directory
      */
-    public ProcessorStateManager(final String applicationId,
-                                 final TaskId taskId,
+    public ProcessorStateManager(final TaskId taskId,
                                  final Collection<TopicPartition> sources,
                                  final Consumer<byte[], byte[]> restoreConsumer,
                                  final boolean isStandby,

http://git-wip-us.apache.org/repos/asf/kafka/blob/b62804a2/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
index de54723..602601a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java
@@ -212,7 +212,7 @@ public class ProcessorStateManagerTest {
     public void testNoTopic() throws IOException {
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, new HashMap<String,
String>() {
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions,
new MockRestoreConsumer(), false, stateDirectory, new HashMap<String, String>() {
             {
                 put(nonPersistentStoreName, nonPersistentStoreName);
             }
@@ -244,7 +244,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore("persistentStore",
true); // persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>()
{
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions,
restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreName);
@@ -298,7 +298,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false); // non persistent store
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
2), noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>()
{
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 2), noPartitions,
restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreTopicName);
@@ -381,7 +381,7 @@ public class ProcessorStateManagerTest {
         // if there is an source partition, inherit the partition id
         Set<TopicPartition> sourcePartitions = Utils.mkSet(new TopicPartition(storeTopicName3,
1));
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
sourcePartitions, restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, sourcePartitions,
restoreConsumer, true, stateDirectory, storeToChangelogTopic); // standby
         try {
             restoreConsumer.reset();
 
@@ -415,7 +415,7 @@ public class ProcessorStateManagerTest {
 
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions,
restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
         try {
             stateMgr.register(mockStateStore, true, mockStateStore.stateRestoreCallback);
 
@@ -453,7 +453,7 @@ public class ProcessorStateManagerTest {
         MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName,
true);
         MockStateStoreSupplier.MockStateStore nonPersistentStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
 
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory, new HashMap<String, String>()
{
+        ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions,
restoreConsumer, false, stateDirectory, new HashMap<String, String>() {
             {
                 put(persistentStoreName, persistentStoreTopicName);
                 put(nonPersistentStoreName, nonPersistentStoreTopicName);
@@ -491,7 +491,7 @@ public class ProcessorStateManagerTest {
     @Test
     public void shouldRegisterStoreWithoutLoggingEnabledAndNotBackedByATopic() throws Exception
{
         MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore(nonPersistentStoreName,
false);
-        ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, new TaskId(0,
1), noPartitions, new MockRestoreConsumer(), false, stateDirectory, Collections.<String,
String>emptyMap());
+        ProcessorStateManager stateMgr = new ProcessorStateManager(new TaskId(0, 1), noPartitions,
new MockRestoreConsumer(), false, stateDirectory, Collections.<String, String>emptyMap());
         stateMgr.register(mockStateStore, false, mockStateStore.stateRestoreCallback);
         assertNotNull(stateMgr.getStore(nonPersistentStoreName));
     }
@@ -512,7 +512,7 @@ public class ProcessorStateManagerTest {
 
 
         final MockStateStoreSupplier.MockStateStore persistentStore = new MockStateStoreSupplier.MockStateStore(persistentStoreName,
true);
-        final ProcessorStateManager stateMgr = new ProcessorStateManager(applicationId, taskId,
noPartitions, restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
+        final ProcessorStateManager stateMgr = new ProcessorStateManager(taskId, noPartitions,
restoreConsumer, false, stateDirectory, Collections.<String, String>emptyMap());
 
         restoreConsumer.reset();
         stateMgr.register(persistentStore, true, persistentStore.stateRestoreCallback);


Mime
View raw message