From commits-return-8726-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Wed Jan 17 01:57:37 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 27C2018065B for ; Wed, 17 Jan 2018 01:57:37 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 17BD6160C46; Wed, 17 Jan 2018 00:57:37 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 10412160C34 for ; Wed, 17 Jan 2018 01:57:35 +0100 (CET) Received: (qmail 85288 invoked by uid 500); 17 Jan 2018 00:57:35 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 85279 invoked by uid 99); 17 Jan 2018 00:57:35 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 17 Jan 2018 00:57:35 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 57C3181F9B; Wed, 17 Jan 2018 00:57:33 +0000 (UTC) Date: Wed, 17 Jan 2018 00:57:33 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch trunk updated: KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151615065291.14991.11557811977433616889@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/trunk X-Git-Reftype: branch X-Git-Oldrev: 8ded496c9be619fe29d00b406b3c1e094abc202c X-Git-Newrev: 2771f52ea7070a3147058e0d6661e0a7e4158161 X-Git-Rev: 2771f52ea7070a3147058e0d6661e0a7e4158161 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/trunk by this push: new 2771f52 KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups 2771f52 is described below commit 2771f52ea7070a3147058e0d6661e0a7e4158161 Author: RichardYuSTUG AuthorDate: Tue Jan 16 16:44:44 2018 -0800 KAFKA-6328: Sort node groups considering global stores in InternalTopologyBuilder#makeNodeGroups Author: RichardYuSTUG Reviewers: Guozhang Wang Closes #4339 from ConcurrencyPractitioner/kafka-6238 Minor edits on description --- .../apache/kafka/streams/TopologyDescription.java | 2 + .../internals/InternalTopologyBuilder.java | 139 +++++++++++---------- .../org/apache/kafka/streams/TopologyTest.java | 12 +- 3 files changed, 85 insertions(+), 68 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 01af8bf..131e8d3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -76,6 +76,8 @@ public interface TopologyDescription { * @return the "global" processor node */ Processor processor(); + + int id(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 3901002..2f3f7f4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1289,13 +1289,6 @@ public class InternalTopologyBuilder { public TopologyDescription describe() { final TopologyDescription description = new TopologyDescription(); - describeSubtopologies(description); - describeGlobalStores(description); - - return description; - } - - private void describeSubtopologies(final TopologyDescription description) { for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) { final Set allNodesOfGroups = nodeGroup.getValue(); @@ -1303,6 +1296,32 @@ public class InternalTopologyBuilder { if (!isNodeGroupOfGlobalStores) { describeSubtopology(description, nodeGroup.getKey(), allNodesOfGroups); + } else { + describeGlobalStore(description, allNodesOfGroups, nodeGroup.getKey()); + } + } + + return description; + } + + private void describeGlobalStore(final TopologyDescription description, final Set nodes, int id) { + final Iterator it = nodes.iterator(); + while (it.hasNext()) { + final String node = it.next(); + + if (isGlobalSource(node)) { + // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode} + it.remove(); // remove sourceNode from group + final String processorNode = nodes.iterator().next(); // get remaining processorNode + + description.addGlobalStore(new GlobalStore( + node, + processorNode, + ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(), + nodeToSourceTopics.get(node).get(0), + id + )); + break; } } } @@ -1370,43 +1389,26 @@ public class InternalTopologyBuilder { new HashSet(nodesByName.values()))); } - private void describeGlobalStores(final TopologyDescription description) { - for (final Map.Entry> nodeGroup : makeNodeGroups().entrySet()) { - final Set nodes = nodeGroup.getValue(); - - final Iterator it = nodes.iterator(); - while (it.hasNext()) { - final String node = it.next(); - - if (isGlobalSource(node)) { - // we found a GlobalStore node group; those contain exactly two node: {sourceNode,processorNode} - it.remove(); // remove sourceNode from group - final String processorNode = nodes.iterator().next(); // get remaining processorNode - - description.addGlobalStore(new GlobalStore( - node, - processorNode, - ((ProcessorNodeFactory) nodeFactories.get(processorNode)).stateStoreNames.iterator().next(), - nodeToSourceTopics.get(node).get(0) - )); - break; - } - } - } - } - public final static class GlobalStore implements TopologyDescription.GlobalStore { private final Source source; private final Processor processor; + private final int id; public GlobalStore(final String sourceName, final String processorName, final String storeName, - final String topicName) { + final String topicName, + final int id) { source = new Source(sourceName, topicName); processor = new Processor(processorName, Collections.singleton(storeName)); source.successors.add(processor); processor.predecessors.add(source); + this.id = id; + } + + @Override + public int id() { + return id; } @Override @@ -1421,8 +1423,9 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "GlobalStore: " + source.name + " (topic: " + source.topics + ")\n --> " - + processor.name + " (store: " + processor.stores.iterator().next() + ")\n"; + return "Sub-topology: " + id + " for global store (will not generate tasks)\n" + + " " + source.toString() + "\n" + + " " + processor.toString() + "\n"; } @Override @@ -1643,7 +1646,7 @@ public class InternalTopologyBuilder { @Override public String toString() { - return "Sub-topology: " + id + "\n" + nodesAsString(); + return "Sub-topology: " + id + "\n" + nodesAsString() + "\n"; } private String nodesAsString() { @@ -1723,7 +1726,7 @@ public class InternalTopologyBuilder { @Override public int compare(final TopologyDescription.GlobalStore globalStore1, final TopologyDescription.GlobalStore globalStore2) { - return globalStore1.source().name().compareTo(globalStore2.source().name()); + return globalStore1.id() - globalStore2.id(); } } @@ -1740,8 +1743,8 @@ public class InternalTopologyBuilder { private final static SubtopologyComparator SUBTOPOLOGY_COMPARATOR = new SubtopologyComparator(); public final static class TopologyDescription implements org.apache.kafka.streams.TopologyDescription { - private final Set subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); - private final Set globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); + private final TreeSet subtopologies = new TreeSet<>(SUBTOPOLOGY_COMPARATOR); + private final TreeSet globalStores = new TreeSet<>(GLOBALSTORE_COMPARATOR); public void addSubtopology(final TopologyDescription.Subtopology subtopology) { subtopologies.add(subtopology); @@ -1763,33 +1766,43 @@ public class InternalTopologyBuilder { @Override public String toString() { - return subtopologiesAsString() + "\n" + globalStoresAsString(); - } - - private String subtopologiesAsString() { final StringBuilder sb = new StringBuilder(); - sb.append("Sub-topologies:\n"); - if (subtopologies.isEmpty()) { - sb.append(" none\n"); - } else { - for (final TopologyDescription.Subtopology st : subtopologies) { - sb.append(" "); - sb.append(st); + sb.append("Topologies:\n "); + final TopologyDescription.Subtopology[] sortedSubtopologies = + subtopologies.descendingSet().toArray(new TopologyDescription.Subtopology[subtopologies.size()]); + final TopologyDescription.GlobalStore[] sortedGlobalStores = + globalStores.descendingSet().toArray(new TopologyDescription.GlobalStore[globalStores.size()]); + int expectedId = 0; + int subtopologiesIndex = sortedSubtopologies.length - 1; + int globalStoresIndex = sortedGlobalStores.length - 1; + while (subtopologiesIndex != -1 && globalStoresIndex != -1) { + sb.append(" "); + final TopologyDescription.Subtopology subtopology = + sortedSubtopologies[subtopologiesIndex]; + final TopologyDescription.GlobalStore globalStore = + sortedGlobalStores[globalStoresIndex]; + if (subtopology.id() == expectedId) { + sb.append(subtopology); + subtopologiesIndex--; + } else { + sb.append(globalStore); + globalStoresIndex--; } + expectedId++; } - return sb.toString(); - } - - private String globalStoresAsString() { - final StringBuilder sb = new StringBuilder(); - sb.append("Global Stores:\n"); - if (globalStores.isEmpty()) { - sb.append(" none\n"); - } else { - for (final TopologyDescription.GlobalStore gs : globalStores) { - sb.append(" "); - sb.append(gs); - } + while (subtopologiesIndex != -1) { + final TopologyDescription.Subtopology subtopology = + sortedSubtopologies[subtopologiesIndex]; + sb.append(" "); + sb.append(subtopology); + subtopologiesIndex--; + } + while (globalStoresIndex != -1) { + final TopologyDescription.GlobalStore globalStore = + sortedGlobalStores[globalStoresIndex]; + sb.append(" "); + sb.append(globalStore); + globalStoresIndex--; } return sb.toString(); } diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java index 3ba7803..0a45803 100644 --- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java @@ -579,14 +579,14 @@ public class TopologyTest { @Test public void shouldDescribeGlobalStoreTopology() { - addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor"); + addGlobalStoreToTopologyAndExpectedDescription("globalStore", "source", "globalTopic", "processor", 0); assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); } @Test public void shouldDescribeMultipleGlobalStoreTopology() { - addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1"); - addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2"); + addGlobalStoreToTopologyAndExpectedDescription("globalStore1", "source1", "globalTopic1", "processor1", 0); + addGlobalStoreToTopologyAndExpectedDescription("globalStore2", "source2", "globalTopic2", "processor2", 1); assertThat(topology.describe(), equalTo((TopologyDescription) expectedDescription)); } @@ -677,7 +677,8 @@ public class TopologyTest { private void addGlobalStoreToTopologyAndExpectedDescription(final String globalStoreName, final String sourceName, final String globalTopicName, - final String processorName) { + final String processorName, + final int id) { final KeyValueStoreBuilder globalStoreBuilder = EasyMock.createNiceMock(KeyValueStoreBuilder.class); EasyMock.expect(globalStoreBuilder.name()).andReturn(globalStoreName).anyTimes(); EasyMock.replay(globalStoreBuilder); @@ -695,7 +696,8 @@ public class TopologyTest { sourceName, processorName, globalStoreName, - globalTopicName); + globalTopicName, + id); expectedDescription.addGlobalStore(expectedGlobalStore); } -- To stop receiving notification emails like this one, please contact ['"commits@kafka.apache.org" '].