kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: refactored code duplicates in several files (Streams project) (#4357)
Date Tue, 02 Jan 2018 18:54:22 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d61a592  MINOR: refactored code duplicates in several files (Streams project) (#4357)
d61a592 is described below

commit d61a592c8a5ee2279760c2c7c72550160032bf8a
Author: Wladimir Schmidt <wlsc@users.noreply.github.com>
AuthorDate: Tue Jan 2 19:54:20 2018 +0100

    MINOR: refactored code duplicates in several files (Streams project) (#4357)
    
    * Removed code duplicate from GlobalProcessorContextImpl and ProcessorContextImpl to parent
class AbstractProcessorContext
    
    * Exchanged concrete implementations with interfaces to make code more maintainable
    
    * Refactored major code duplicates in InternalTopologyBuilder
    
    * Formatted function parameters as per code review
    Added final to code introduced in this PR
    
    * Added missing finals to putNodeGroupName function
    Rearranged parameters for resetTopicsPattern function
    
    Reviewers: Matthias J. Sax <mjsax@apache.org>, Guozhang Wang <wangguoz@gmail.com>,
Bill Bejeck <bbejeck@gmail.com>
---
 .../internals/AbstractProcessorContext.java        | 15 +++++
 .../internals/GlobalProcessorContextImpl.java      | 17 -----
 .../internals/InternalTopologyBuilder.java         | 72 ++++++++++++----------
 .../processor/internals/ProcessorContextImpl.java  | 14 -----
 4 files changed, 53 insertions(+), 65 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 87408c6..e9b5a4c 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -26,6 +26,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import java.io.File;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 
@@ -163,6 +164,20 @@ public abstract class AbstractProcessorContext implements InternalProcessorConte
         return combined;
     }
 
+    @SuppressWarnings("unchecked")
+    @Override
+    public <K, V> void forward(final K key, final V value) {
+        final ProcessorNode previousNode = currentNode();
+        try {
+            for (final ProcessorNode child : (List<ProcessorNode>) currentNode().children())
{
+                setCurrentNode(child);
+                child.process(key, value);
+            }
+        } finally {
+            setCurrentNode(previousNode);
+        }
+    }
+
     @Override
     public Map<String, Object> appConfigsWithPrefix(final String prefix) {
         return config.originalsWithPrefix(prefix);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index fbb4cb6..37e7cb5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -25,8 +25,6 @@ import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
-import java.util.List;
-
 public class GlobalProcessorContextImpl extends AbstractProcessorContext {
 
 
@@ -42,21 +40,6 @@ public class GlobalProcessorContextImpl extends AbstractProcessorContext
{
         return stateManager.getGlobalStore(name);
     }
 
-    @SuppressWarnings("unchecked")
-    @Override
-    public <K, V> void forward(K key, V value) {
-        final ProcessorNode previousNode = currentNode();
-        try {
-            for (ProcessorNode child : (List<ProcessorNode>) currentNode().children())
{
-                setCurrentNode(child);
-                child.process(key, value);
-            }
-        } finally {
-            setCurrentNode(previousNode);
-        }
-    }
-
-
     /**
      * @throws UnsupportedOperationException on every invocation
      */
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index e71959b..39ea44f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -77,17 +77,17 @@ public class InternalTopologyBuilder {
     private final List<Set<String>> copartitionSourceGroups = new ArrayList<>();
 
     // map from source processor names to subscribed topics (without application-id prefix
for internal topics)
-    private final HashMap<String, List<String>> nodeToSourceTopics = new HashMap<>();
+    private final Map<String, List<String>> nodeToSourceTopics = new HashMap<>();
 
     // map from source processor names to regex subscription patterns
-    private final HashMap<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
+    private final Map<String, Pattern> nodeToSourcePatterns = new LinkedHashMap<>();
 
     // map from sink processor names to subscribed topic (without application-id prefix for
internal topics)
-    private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
+    private final Map<String, String> nodeToSinkTopic = new HashMap<>();
 
     // map from topics to their matched regex patterns, this is to ensure one topic is passed
through on source node
     // even if it can be matched by multiple regex patterns
-    private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
+    private final Map<String, Pattern> topicToPatterns = new HashMap<>();
 
     // map from state store names to all the topics subscribed from source processors that
     // are connected to these state stores
@@ -804,43 +804,45 @@ public class InternalTopologyBuilder {
     }
 
     private Map<Integer, Set<String>> makeNodeGroups() {
-        final HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
-        final HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
+        final Map<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
+        final Map<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
 
         // Go through source nodes first. This makes the group id assignment easy to predict
in tests
-        final HashSet<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
+        final Set<String> allSourceNodes = new HashSet<>(nodeToSourceTopics.keySet());
         allSourceNodes.addAll(nodeToSourcePatterns.keySet());
 
         for (final String nodeName : Utils.sorted(allSourceNodes)) {
-            final String root = nodeGrouper.root(nodeName);
-            Set<String> nodeGroup = rootToNodeGroup.get(root);
-            if (nodeGroup == null) {
-                nodeGroup = new HashSet<>();
-                rootToNodeGroup.put(root, nodeGroup);
-                nodeGroups.put(nodeGroupId++, nodeGroup);
-            }
-            nodeGroup.add(nodeName);
+            nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
         }
 
         // Go through non-source nodes
         for (final String nodeName : Utils.sorted(nodeFactories.keySet())) {
             if (!nodeToSourceTopics.containsKey(nodeName)) {
-                final String root = nodeGrouper.root(nodeName);
-                Set<String> nodeGroup = rootToNodeGroup.get(root);
-                if (nodeGroup == null) {
-                    nodeGroup = new HashSet<>();
-                    rootToNodeGroup.put(root, nodeGroup);
-                    nodeGroups.put(nodeGroupId++, nodeGroup);
-                }
-                nodeGroup.add(nodeName);
+                nodeGroupId = putNodeGroupName(nodeName, nodeGroupId, nodeGroups, rootToNodeGroup);
             }
         }
 
         return nodeGroups;
     }
 
+    private int putNodeGroupName(final String nodeName,
+                                 final int nodeGroupId,
+                                 final Map<Integer, Set<String>> nodeGroups,
+                                 final Map<String, Set<String>> rootToNodeGroup)
{
+        int newNodeGroupId = nodeGroupId;
+        final String root = nodeGrouper.root(nodeName);
+        Set<String> nodeGroup = rootToNodeGroup.get(root);
+        if (nodeGroup == null) {
+            nodeGroup = new HashSet<>();
+            rootToNodeGroup.put(root, nodeGroup);
+            nodeGroups.put(newNodeGroupId++, nodeGroup);
+        }
+        nodeGroup.add(nodeName);
+        return newNodeGroupId;
+    }
+
     public synchronized ProcessorTopology build() {
         return build((Integer) null);
     }
@@ -1127,21 +1129,23 @@ public class InternalTopologyBuilder {
     }
 
     public synchronized Pattern earliestResetTopicsPattern() {
-        final List<String> topics = maybeDecorateInternalSourceTopics(earliestResetTopics);
-        final Pattern earliestPattern = buildPatternForOffsetResetTopics(topics, earliestResetPatterns);
-
-        ensureNoRegexOverlap(earliestPattern, latestResetPatterns, latestResetTopics);
-
-        return earliestPattern;
+        return resetTopicsPattern(earliestResetTopics, earliestResetPatterns, latestResetTopics,
latestResetPatterns);
     }
 
     public synchronized Pattern latestResetTopicsPattern() {
-        final List<String> topics = maybeDecorateInternalSourceTopics(latestResetTopics);
-        final Pattern latestPattern = buildPatternForOffsetResetTopics(topics, latestResetPatterns);
+        return resetTopicsPattern(latestResetTopics, latestResetPatterns, earliestResetTopics,
earliestResetPatterns);
+    }
+
+    private Pattern resetTopicsPattern(final Set<String> resetTopics,
+                                       final Set<Pattern> resetPatterns,
+                                       final Set<String> otherResetTopics,
+                                       final Set<Pattern> otherResetPatterns) {
+        final List<String> topics = maybeDecorateInternalSourceTopics(resetTopics);
+        final Pattern pattern = buildPatternForOffsetResetTopics(topics, resetPatterns);
 
-        ensureNoRegexOverlap(latestPattern, earliestResetPatterns, earliestResetTopics);
+        ensureNoRegexOverlap(pattern, otherResetPatterns, otherResetTopics);
 
-        return  latestPattern;
+        return pattern;
     }
 
     // TODO: we should check regex overlap at topology construction time and then throw TopologyException
@@ -1338,7 +1342,7 @@ public class InternalTopologyBuilder {
                                      final Integer subtopologyId,
                                      final Set<String> nodeNames) {
 
-        final HashMap<String, AbstractNode> nodesByName = new HashMap<>();
+        final Map<String, AbstractNode> nodesByName = new HashMap<>();
 
         // add all nodes
         for (final String nodeName : nodeNames) {
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 5010cf1..317581a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -77,20 +77,6 @@ public class ProcessorContextImpl extends AbstractProcessorContext implements
Re
 
     @SuppressWarnings("unchecked")
     @Override
-    public <K, V> void forward(final K key, final V value) {
-        final ProcessorNode previousNode = currentNode();
-        try {
-            for (ProcessorNode child : (List<ProcessorNode>) currentNode().children())
{
-                setCurrentNode(child);
-                child.process(key, value);
-            }
-        } finally {
-            setCurrentNode(previousNode);
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
     public <K, V> void forward(final K key, final V value, final int childIndex) {
         final ProcessorNode previousNode = currentNode();
         final ProcessorNode child = (ProcessorNode<K, V>) currentNode().children().get(childIndex);

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message