kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups
Date Wed, 17 Jan 2018 00:57:33 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 2771f52  KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups
2771f52 is described below

commit 2771f52ea7070a3147058e0d6661e0a7e4158161
Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
AuthorDate: Tue Jan 16 16:44:44 2018 -0800

    KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups
    
    Author: RichardYuSTUG <yohan.richard.yu2@gmail.com>
    
    Reviewers: Guozhang Wang <wangguoz@gmail.com>
    
    Closes #4339 from ConcurrencyPractitioner/kafka-6238
    
    Minor edits on description
---
 .../apache/kafka/streams/TopologyDescription.java  |   2 +
 .../internals/InternalTopologyBuilder.java         | 139 +++++++++++----------
 .../org/apache/kafka/streams/TopologyTest.java     |  12 +-
 3 files changed, 85 insertions(+), 68 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index 01af8bf..131e8d3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -76,6 +76,8 @@ public interface TopologyDescription {
          * @return the "global" processor node
          */
         Processor processor();
+
+        int id();
     }
 
     /**
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 3901002..2f3f7f4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -1289,13 +1289,6 @@ public class InternalTopologyBuilder {
     public TopologyDescription describe() {
         final TopologyDescription description = new TopologyDescription();
 
-        describeSubtopologies(description);
-        describeGlobalStores(description);
-
-        return description;
-    }
-
-    private void describeSubtopologies(final TopologyDescription description) {
         for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet())
{
 
             final Set<String> allNodesOfGroups = nodeGroup.getValue();
@@ -1303,6 +1296,32 @@ public class InternalTopologyBuilder {
 
             if (!isNodeGroupOfGlobalStores) {
                 describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups);
+            } else {
+                describeGlobalStore(description, allNodesOfGroups, nodeGroup.getKey());
+            }
+        }
+
+        return description;
+    }
+
+    private void describeGlobalStore(final TopologyDescription description, final Set<String>
nodes, int id) {
+        final Iterator<String> it = nodes.iterator();
+        while (it.hasNext()) {
+            final String node = it.next();
+
+            if (isGlobalSource(node)) {
+                // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode}
+                it.remove(); // remove sourceNode from group
+                final String processorNode = nodes.iterator().next(); // get remaining processorNode
+
+                description.addGlobalStore(new GlobalStore(
+                    node,
+                    processorNode,
+                    ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
+                    nodeToSourceTopics.get(node).get(0),
+                    id
+                ));
+                break;
             }
         }
     }
@@ -1370,43 +1389,26 @@ public class InternalTopologyBuilder {
                 new HashSet<TopologyDescription.Node>(nodesByName.values())));
     }
 
-    private void describeGlobalStores(final TopologyDescription description) {
-        for (final Map.Entry<Integer, Set<String>> nodeGroup : makeNodeGroups().entrySet())
{
-            final Set<String> nodes = nodeGroup.getValue();
-
-            final Iterator<String> it = nodes.iterator();
-            while (it.hasNext()) {
-                final String node = it.next();
-
-                if (isGlobalSource(node)) {
-                    // we found a GlobalStore node group; those contain exactly two node:
{sourceNode,processorNode}
-                    it.remove(); // remove sourceNode from group
-                    final String processorNode = nodes.iterator().next(); // get remaining
processorNode
-
-                    description.addGlobalStore(new GlobalStore(
-                        node,
-                        processorNode,
-                        ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(),
-                        nodeToSourceTopics.get(node).get(0)
-                    ));
-                    break;
-                }
-            }
-        }
-    }
-
     public final static class GlobalStore implements TopologyDescription.GlobalStore {
         private final Source source;
         private final Processor processor;
+        private final int id;
 
         public GlobalStore(final String sourceName,
                            final String processorName,
                            final String storeName,
-                           final String topicName) {
+                           final String topicName,
+                           final int id) {
             source = new Source(sourceName, topicName);
             processor = new Processor(processorName, Collections.singleton(storeName));
             source.successors.add(processor);
             processor.predecessors.add(source);
+            this.id = id;
+        }
+
+        @Override
+        public int id() {
+            return id;
         }
 
         @Override
@@ -1421,8 +1423,9 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "GlobalStore: " + source.name + " (topic: " + source.topics + ")\n   
  --> "
-                + processor.name + " (store: " + processor.stores.iterator().next() + ")\n";
+            return "Sub-topology: " + id + " for global store (will not generate tasks)\n"
+                    + "    " + source.toString() + "\n"
+                    + "    " + processor.toString() + "\n";
         }
 
         @Override
@@ -1643,7 +1646,7 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return "Sub-topology: " + id + "\n" + nodesAsString();
+            return "Sub-topology: " + id + "\n" + nodesAsString() + "\n";
         }
 
         private String nodesAsString() {
@@ -1723,7 +1726,7 @@ public class InternalTopologyBuilder {
         @Override
         public int compare(final TopologyDescription.GlobalStore globalStore1,
                            final TopologyDescription.GlobalStore globalStore2) {
-            return globalStore1.source().name().compareTo(globalStore2.source().name());
+            return globalStore1.id() - globalStore2.id();
         }
     }
 
@@ -1740,8 +1743,8 @@ public class InternalTopologyBuilder {
     private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator();
 
     public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription
{
-        private final Set<TopologyDescription.Subtopology> subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR);
-        private final Set<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR);
+        private final TreeSet<TopologyDescription.Subtopology> subtopologies = new
TreeSet<>(SUBTOPOLOGY_COMPARATOR);
+        private final TreeSet<TopologyDescription.GlobalStore> globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR);
 
         public void addSubtopology(final TopologyDescription.Subtopology subtopology) {
             subtopologies.add(subtopology);
@@ -1763,33 +1766,43 @@ public class InternalTopologyBuilder {
 
         @Override
         public String toString() {
-            return subtopologiesAsString() + "\n" + globalStoresAsString();
-        }
-
-        private String subtopologiesAsString() {
             final StringBuilder sb = new StringBuilder();
-            sb.append("Sub-topologies:\n");
-            if (subtopologies.isEmpty()) {
-                sb.append("  none\n");
-            } else {
-                for (final TopologyDescription.Subtopology st : subtopologies) {
-                    sb.append("  ");
-                    sb.append(st);
+            sb.append("Topologies:\n ");
+            final TopologyDescription.Subtopology[] sortedSubtopologies = 
+                subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[subtopologies.size()]);
+            final TopologyDescription.GlobalStore[] sortedGlobalStores = 
+                globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[globalStores.size()]);
+            int expectedId = 0;
+            int subtopologiesIndex = sortedSubtopologies.length - 1;
+            int globalStoresIndex = sortedGlobalStores.length - 1;
+            while (subtopologiesIndex != -1 && globalStoresIndex != -1) {
+                sb.append("  ");
+                final TopologyDescription.Subtopology subtopology = 
+                    sortedSubtopologies[subtopologiesIndex];
+                final TopologyDescription.GlobalStore globalStore = 
+                    sortedGlobalStores[globalStoresIndex];
+                if (subtopology.id() == expectedId) {
+                    sb.append(subtopology);
+                    subtopologiesIndex--;
+                } else {
+                    sb.append(globalStore);
+                    globalStoresIndex--;
                 }
+                expectedId++;
             }
-            return sb.toString();
-        }
-
-        private String globalStoresAsString() {
-            final StringBuilder sb = new StringBuilder();
-            sb.append("Global Stores:\n");
-            if (globalStores.isEmpty()) {
-                sb.append("  none\n");
-            } else {
-                for (final TopologyDescription.GlobalStore gs : globalStores) {
-                    sb.append("  ");
-                    sb.append(gs);
-                }
+            while (subtopologiesIndex != -1) {
+                final TopologyDescription.Subtopology subtopology = 
+                    sortedSubtopologies[subtopologiesIndex];
+                sb.append("  ");
+                sb.append(subtopology);
+                subtopologiesIndex--;
+            }
+            while (globalStoresIndex != -1) {
+                final TopologyDescription.GlobalStore globalStore = 
+                    sortedGlobalStores[globalStoresIndex];
+                sb.append("  ");
+                sb.append(globalStore);
+                globalStoresIndex--;
             }
             return sb.toString();
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index 3ba7803..0a45803 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -579,14 +579,14 @@ public class TopologyTest {
 
     @Test
     public void shouldDescribeGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic",
"processor");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic",
"processor", 0);
         assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
     }
 
     @Test
     public void shouldDescribeMultipleGlobalStoreTopology() {
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1",
"processor1");
-        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2",
"processor2");
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1",
"processor1", 0);
+        addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2",
"processor2", 1);
         assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription));
     }
 
@@ -677,7 +677,8 @@ public class TopologyTest {
     private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName,
                                                                 final String sourceName,
                                                                 final String globalTopicName,
-                                                                final String processorName)
{
+                                                                final String processorName,
+                                                                final int id) {
         final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class);
         EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes();
         EasyMock.replay(globalStoreBuilder);
@@ -695,7 +696,8 @@ public class TopologyTest {
             sourceName,
             processorName,
             globalStoreName,
-            globalTopicName);
+            globalTopicName,
+            id);
 
         expectedDescription.addGlobalStore(expectedGlobalStore);
     }

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message