kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6761: Reduce streams footprint part IV add optimization (#5451)
Date Wed, 15 Aug 2018 00:46:45 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e64e5b  KAFKA-6761: Reduce streams footprint part IV add optimization (#5451)
3e64e5b is described below

commit 3e64e5b9c03f13f58e6a049ff1fed039c6a15d69
Author: Bill Bejeck <bbejeck@gmail.com>
AuthorDate: Tue Aug 14 20:46:40 2018 -0400

    KAFKA-6761: Reduce streams footprint part IV add optimization (#5451)
    
    This PR adds the optimization of eliminating multiple repartition topics when the KStream resulting from a key-changing operation executes other methods using the new key and reduces the repartition topics to one.
    
    Note that this PR leaves in place the optimization for re-using a source topic as a changelog topic for source KTable instances. I'll have another follow-up PR to move the source topic optimization to a method within InternalStreamsBuilder so it can be performed in the same area of the code.
    
    Additionally, the current value of StreamsConfig.OPTIMIZE is all and we'll need to have another KIP to change the value to 2.1.
    
    An integration test RepartitionOptimizingIntegrationTest which asserts the same results for an optimized topology with one repartition topic as the un-optimized version with four repartition topics.
    
    Reviewers: Matthias J. Sax <matthias@confluent.io>, John Roesler <john@confluent.io>, Guozhang Wang <wangguoz@gmail.com>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   |   3 +-
 .../kstream/internals/InternalStreamsBuilder.java  | 220 ++++++++++-
 .../streams/kstream/internals/KStreamImpl.java     |  17 +-
 .../graph/OptimizableRepartitionNode.java          |   8 +
 .../kstream/internals/graph/StreamsGraphNode.java  |  51 ++-
 .../RepartitionOptimizingIntegrationTest.java      | 440 +++++++++++++++++++++
 ...artitionWithMergeOptimizingIntegrationTest.java | 302 ++++++++++++++
 .../kstream/internals/graph/StreamsGraphTest.java  | 133 +++++--
 8 files changed, 1108 insertions(+), 66 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 0442e2b..4c9ee93 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -530,8 +530,7 @@ public class StreamsBuilder {
      * @return the {@link Topology} that represents the specified processing logic
      */
     public synchronized Topology build(final Properties props) {
-        // the props instance will be used once optimization framework merged
-        internalStreamsBuilder.buildAndOptimizeTopology();
+        internalStreamsBuilder.buildAndOptimizeTopology(props);
         return topology;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index 8895591..e5cd066 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -16,12 +16,16 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.internals.graph.GlobalStoreNode;
+import org.apache.kafka.streams.kstream.internals.graph.OptimizableRepartitionNode;
 import org.apache.kafka.streams.kstream.internals.graph.ProcessorParameters;
 import org.apache.kafka.streams.kstream.internals.graph.StateStoreNode;
 import org.apache.kafka.streams.kstream.internals.graph.StreamSourceNode;
@@ -35,13 +39,18 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
 import java.util.Objects;
 import java.util.PriorityQueue;
+import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
 import java.util.regex.Pattern;
 
 public class InternalStreamsBuilder implements InternalNameProvider {
@@ -49,8 +58,9 @@ public class InternalStreamsBuilder implements InternalNameProvider {
     final InternalTopologyBuilder internalTopologyBuilder;
     private final AtomicInteger index = new AtomicInteger(0);
 
-    private final AtomicInteger nodeIdCounter = new AtomicInteger(0);
-    private final NodeIdComparator nodeIdComparator = new NodeIdComparator();
+    private final AtomicInteger buildPriorityIndex = new AtomicInteger(0);
+    private final Map<StreamsGraphNode, Set<OptimizableRepartitionNode>> keyChangingOperationsToOptimizableRepartitionNodes = new HashMap<>();
+    private final Set<StreamsGraphNode> mergeNodes = new HashSet<>();
 
     private static final String TOPOLOGY_ROOT = "root";
     private static final Logger LOG = LoggerFactory.getLogger(InternalStreamsBuilder.class);
@@ -205,14 +215,17 @@ public class InternalStreamsBuilder implements InternalNameProvider {
                        stateUpdateSupplier);
     }
 
-    void addGraphNode(final StreamsGraphNode parent, final StreamsGraphNode child) {
+    void addGraphNode(final StreamsGraphNode parent,
+                      final StreamsGraphNode child) {
         Objects.requireNonNull(parent, "parent node can't be null");
         Objects.requireNonNull(child, "child node can't be null");
-        parent.addChildNode(child);
+        parent.addChild(child);
         maybeAddNodeForOptimizationMetadata(child);
     }
 
-    void addGraphNode(final Collection<StreamsGraphNode> parents, final StreamsGraphNode child) {
+
+    void addGraphNode(final Collection<StreamsGraphNode> parents,
+                      final StreamsGraphNode child) {
         Objects.requireNonNull(parents, "parent node can't be null");
         Objects.requireNonNull(child, "child node can't be null");
 
@@ -225,13 +238,37 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
-    void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
-        node.setId(nodeIdCounter.getAndIncrement());
+    private void maybeAddNodeForOptimizationMetadata(final StreamsGraphNode node) {
+        node.setBuildPriority(buildPriorityIndex.getAndIncrement());
+
+        if (node.parentNodes().isEmpty() && !node.nodeName().equals(TOPOLOGY_ROOT)) {
+            throw new IllegalStateException(
+                "Nodes should not have a null parent node.  Name: " + node.nodeName() + " Type: "
+                + node.getClass().getSimpleName());
+        }
+
+        if (node.isKeyChangingOperation()) {
+            keyChangingOperationsToOptimizableRepartitionNodes.put(node, new HashSet<>());
+        } else if (node instanceof OptimizableRepartitionNode) {
+            final StreamsGraphNode parentNode = getKeyChangingParentNode(node);
+            if (parentNode != null) {
+                keyChangingOperationsToOptimizableRepartitionNodes.get(parentNode).add((OptimizableRepartitionNode) node);
+            }
+        } else if (node.isMergeNode()) {
+            mergeNodes.add(node);
+        }
     }
 
+    // use this method for testing only
     public void buildAndOptimizeTopology() {
+        buildAndOptimizeTopology(null);
+    }
+
+    public void buildAndOptimizeTopology(final Properties props) {
+
+        maybePerformOptimizations(props);
 
-        final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, nodeIdComparator);
+        final PriorityQueue<StreamsGraphNode> graphNodePriorityQueue = new PriorityQueue<>(5, Comparator.comparing(StreamsGraphNode::buildPriority));
 
         graphNodePriorityQueue.offer(root);
 
@@ -253,17 +290,168 @@ public class InternalStreamsBuilder implements InternalNameProvider {
         }
     }
 
+    private void maybePerformOptimizations(final Properties props) {
 
-    public StreamsGraphNode root() {
-        return root;
+        if (props != null && props.getProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION).equals(StreamsConfig.OPTIMIZE)) {
+            LOG.debug("Optimizing the Kafka Streams graph for repartition nodes");
+            maybeOptimizeRepartitionOperations();
+        }
     }
 
-    private static class NodeIdComparator implements Comparator<StreamsGraphNode>, Serializable {
+    @SuppressWarnings("unchecked")
+    private void maybeOptimizeRepartitionOperations() {
+        maybeUpdateKeyChangingRepartitionNodeMap();
 
-        @Override
-        public int compare(final StreamsGraphNode o1,
-                           final StreamsGraphNode o2) {
-            return o1.id().compareTo(o2.id());
+        for (final Map.Entry<StreamsGraphNode, Set<OptimizableRepartitionNode>> entry : keyChangingOperationsToOptimizableRepartitionNodes.entrySet()) {
+
+            final StreamsGraphNode keyChangingNode = entry.getKey();
+
+            if (entry.getValue().isEmpty()) {
+                continue;
+            }
+
+            final SerializedInternal serialized = new SerializedInternal(getRepartitionSerdes(entry.getValue()));
+
+            final StreamsGraphNode optimizedSingleRepartition = createRepartitionNode(keyChangingNode.nodeName(),
+                                                                                      serialized.keySerde(),
+                                                                                      serialized.valueSerde());
+
+            // re-use parent buildPriority to make sure the single repartition graph node is evaluated before downstream nodes
+            optimizedSingleRepartition.setBuildPriority(keyChangingNode.buildPriority());
+
+            for (final OptimizableRepartitionNode repartitionNodeToBeReplaced : entry.getValue()) {
+
+                final StreamsGraphNode keyChangingNodeChild = findParentNodeMatching(repartitionNodeToBeReplaced, gn -> gn.parentNodes().contains(keyChangingNode));
+
+                if (keyChangingNodeChild == null) {
+                    throw new StreamsException(String.format("Found a null keyChangingChild node for %s", repartitionNodeToBeReplaced));
+                }
+
+                LOG.debug("Found the child node of the key changer {} from the repartition {}.", keyChangingNodeChild, repartitionNodeToBeReplaced);
+
+                // need to add children of key-changing node as children of optimized repartition
+                // in order to process records from re-partitioning
+                optimizedSingleRepartition.addChild(keyChangingNodeChild);
+
+                LOG.debug("Removing {} from {}  children {}", keyChangingNodeChild, keyChangingNode, keyChangingNode.children());
+                // now remove children from key-changing node
+                keyChangingNode.removeChild(keyChangingNodeChild);
+
+                // now need to get children of repartition node so we can remove repartition node
+                final Collection<StreamsGraphNode> repartitionNodeToBeReplacedChildren = repartitionNodeToBeReplaced.children();
+                final Collection<StreamsGraphNode> parentsOfRepartitionNodeToBeReplaced = repartitionNodeToBeReplaced.parentNodes();
+
+                for (final StreamsGraphNode repartitionNodeToBeReplacedChild : repartitionNodeToBeReplacedChildren) {
+                    for (final StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) {
+                        parentNode.addChild(repartitionNodeToBeReplacedChild);
+                    }
+                }
+
+                for (final StreamsGraphNode parentNode : parentsOfRepartitionNodeToBeReplaced) {
+                    parentNode.removeChild(repartitionNodeToBeReplaced);
+                }
+                repartitionNodeToBeReplaced.clearChildren();
+
+                LOG.debug("Updated node {} children {}", optimizedSingleRepartition, optimizedSingleRepartition.children());
+            }
+
+            keyChangingNode.addChild(optimizedSingleRepartition);
+            keyChangingOperationsToOptimizableRepartitionNodes.remove(entry.getKey());
+        }
+    }
+
+    private void maybeUpdateKeyChangingRepartitionNodeMap() {
+        final Map<StreamsGraphNode, Set<StreamsGraphNode>> mergeNodesToKeyChangers = new HashMap<>();
+        for (final StreamsGraphNode mergeNode : mergeNodes) {
+            mergeNodesToKeyChangers.put(mergeNode, new HashSet<>());
+            final Collection<StreamsGraphNode> keys = keyChangingOperationsToOptimizableRepartitionNodes.keySet();
+            for (final StreamsGraphNode key : keys) {
+                final StreamsGraphNode maybeParentKey = findParentNodeMatching(mergeNode, node -> node.parentNodes().contains(key));
+                if (maybeParentKey != null) {
+                    mergeNodesToKeyChangers.get(mergeNode).add(key);
+                }
+            }
         }
+
+        for (final Map.Entry<StreamsGraphNode, Set<StreamsGraphNode>> entry : mergeNodesToKeyChangers.entrySet()) {
+            final StreamsGraphNode mergeKey = entry.getKey();
+            final Collection<StreamsGraphNode> keyChangingParents = entry.getValue();
+            final Set<OptimizableRepartitionNode> repartitionNodes = new HashSet<>();
+            for (final StreamsGraphNode keyChangingParent : keyChangingParents) {
+                repartitionNodes.addAll(keyChangingOperationsToOptimizableRepartitionNodes.get(keyChangingParent));
+                keyChangingOperationsToOptimizableRepartitionNodes.remove(keyChangingParent);
+            }
+
+            keyChangingOperationsToOptimizableRepartitionNodes.put(mergeKey, repartitionNodes);
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    private OptimizableRepartitionNode createRepartitionNode(final String name,
+                                                             final Serde keySerde,
+                                                             final Serde valueSerde) {
+
+        final OptimizableRepartitionNode.OptimizableRepartitionNodeBuilder repartitionNodeBuilder = OptimizableRepartitionNode.optimizableRepartitionNodeBuilder();
+        KStreamImpl.createRepartitionedSource(this,
+                                              keySerde,
+                                              valueSerde,
+                                              name + "-optimized",
+                                              name,
+                                              repartitionNodeBuilder);
+
+        return repartitionNodeBuilder.build();
+
+    }
+
+    private StreamsGraphNode getKeyChangingParentNode(final StreamsGraphNode repartitionNode) {
+        final StreamsGraphNode shouldBeKeyChangingNode = findParentNodeMatching(repartitionNode, n -> n.isKeyChangingOperation() || n.isValueChangingOperation());
+
+        final StreamsGraphNode keyChangingNode = findParentNodeMatching(repartitionNode, StreamsGraphNode::isKeyChangingOperation);
+        if (shouldBeKeyChangingNode != null && shouldBeKeyChangingNode.equals(keyChangingNode)) {
+            return keyChangingNode;
+        }
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private SerializedInternal getRepartitionSerdes(final Collection<OptimizableRepartitionNode> repartitionNodes) {
+        Serde keySerde = null;
+        Serde valueSerde = null;
+
+        for (final OptimizableRepartitionNode repartitionNode : repartitionNodes) {
+            if (keySerde == null && repartitionNode.keySerde() != null) {
+                keySerde = repartitionNode.keySerde();
+            }
+
+            if (valueSerde == null && repartitionNode.valueSerde() != null) {
+                valueSerde = repartitionNode.valueSerde();
+            }
+
+            if (keySerde != null && valueSerde != null) {
+                break;
+            }
+        }
+
+        return new SerializedInternal(Serialized.with(keySerde, valueSerde));
+    }
+
+    private StreamsGraphNode findParentNodeMatching(final StreamsGraphNode startSeekingNode,
+                                                    final Predicate<StreamsGraphNode> parentNodePredicate) {
+        if (parentNodePredicate.test(startSeekingNode)) {
+            return startSeekingNode;
+        }
+        StreamsGraphNode foundParentNode = null;
+
+        for (final StreamsGraphNode parentNode : startSeekingNode.parentNodes()) {
+            if (parentNodePredicate.test(parentNode)) {
+                return parentNode;
+            }
+            foundParentNode = findParentNodeMatching(parentNode, parentNodePredicate);
+        }
+        return foundParentNode;
+    }
+
+    public StreamsGraphNode root() {
+        return root;
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 803faf6..42c20a5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -219,6 +219,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final ProcessorGraphNode<? super K, ? super V> mapValuesProcessorNode = new ProcessorGraphNode<>(name,
                                                                                                          processorParameters,
                                                                                                          repartitionRequired);
+        mapValuesProcessorNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, mapValuesProcessorNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, mapValuesProcessorNode);
@@ -274,6 +275,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
         final ProcessorGraphNode<? super K, ? super V> flatMapValuesNode = new ProcessorGraphNode<>(name,
                                                                                                     processorParameters,
                                                                                                     repartitionRequired);
+        flatMapValuesNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, flatMapValuesNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, flatMapValuesNode);
@@ -343,7 +345,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                             processorParameters,
                                                                                             requireRepartitioning);
 
-
+        mergeNode.setMergeNode(true);
         builder.addGraphNode(Arrays.asList(this.streamsGraphNode, streamImpl.streamsGraphNode), mergeNode);
         return new KStreamImpl<>(builder, name, allSourceNodes, requireRepartitioning, mergeNode);
     }
@@ -491,6 +493,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                                                                        stateStoreNames,
                                                                                        null,
                                                                                        repartitionRequired);
+        transformNode.setValueChangingOperation(true);
         builder.addGraphNode(this.streamsGraphNode, transformNode);
 
         return new KStreamImpl<>(builder, name, sourceNodes, this.repartitionRequired, transformNode);
@@ -529,7 +532,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                       joiner,
                       windows,
                       joined,
-                      new KStreamImplJoin(false, false, this.streamsGraphNode));
+                      new KStreamImplJoin(false, false));
 
     }
 
@@ -545,7 +548,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
                                              final ValueJoiner<? super V, ? super VO, ? extends VR> joiner,
                                              final JoinWindows windows,
                                              final Joined<K, V, VO> joined) {
-        return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true, this.streamsGraphNode));
+        return doJoin(other, joiner, windows, joined, new KStreamImplJoin(true, true));
     }
 
     private <V1, R> KStream<K, R> doJoin(final KStream<K, V1> other,
@@ -656,7 +659,7 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
             joiner,
             windows,
             joined,
-            new KStreamImplJoin(true, false, this.streamsGraphNode)
+            new KStreamImplJoin(true, false)
         );
 
     }
@@ -840,14 +843,12 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
 
         private final boolean leftOuter;
         private final boolean rightOuter;
-        private final StreamsGraphNode parentGraphNode;
+
 
         KStreamImplJoin(final boolean leftOuter,
-                        final boolean rightOuter,
-                        final StreamsGraphNode parentGraphNode) {
+                        final boolean rightOuter) {
             this.leftOuter = leftOuter;
             this.rightOuter = rightOuter;
-            this.parentGraphNode = parentGraphNode;
         }
 
         @SuppressWarnings("unchecked")
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
index 7198df0..4d81b1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/OptimizableRepartitionNode.java
@@ -46,6 +46,14 @@ public class OptimizableRepartitionNode<K, V> extends BaseRepartitionNode {
 
     }
 
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
     @Override
     Serializer<V> getValueSerializer() {
         return valueSerde != null ? valueSerde.serializer() : null;
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
index 902a4e9..fac5923 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphNode.java
@@ -31,7 +31,9 @@ public abstract class StreamsGraphNode {
     private final String nodeName;
     private final boolean repartitionRequired;
     private boolean keyChangingOperation;
-    private Integer id;
+    private boolean valueChangingOperation;
+    private boolean mergeNode;
+    private Integer buildPriority;
     private boolean hasWrittenToTopology = false;
 
     public StreamsGraphNode(final String nodeName,
@@ -44,7 +46,7 @@ public abstract class StreamsGraphNode {
         return parentNodes;
     }
 
-    public String[] parentNodeNames() {
+    String[] parentNodeNames() {
         final String[] parentNames = new String[parentNodes.size()];
         int index = 0;
         for (final StreamsGraphNode parentNode : parentNodes) {
@@ -62,17 +64,24 @@ public abstract class StreamsGraphNode {
         return true;
     }
 
-    public void addParentNode(final StreamsGraphNode parentNode) {
-        parentNodes.add(parentNode);
-    }
-
     public Collection<StreamsGraphNode> children() {
         return new LinkedHashSet<>(childNodes);
     }
 
-    public void addChildNode(final StreamsGraphNode childNode) {
+    public void clearChildren() {
+        for (final StreamsGraphNode childNode : childNodes) {
+            childNode.parentNodes.remove(this);
+        }
+        childNodes.clear();
+    }
+
+    public boolean removeChild(final StreamsGraphNode child) {
+        return childNodes.remove(child) && child.parentNodes.remove(this);
+    }
+
+    public void addChild(final StreamsGraphNode childNode) {
         this.childNodes.add(childNode);
-        childNode.addParentNode(this);
+        childNode.parentNodes.add(this);
     }
 
     public String nodeName() {
@@ -87,16 +96,32 @@ public abstract class StreamsGraphNode {
         return keyChangingOperation;
     }
 
+    public boolean isValueChangingOperation() {
+        return valueChangingOperation;
+    }
+
+    public boolean isMergeNode() {
+        return mergeNode;
+    }
+
+    public void setMergeNode(final boolean mergeNode) {
+        this.mergeNode = mergeNode;
+    }
+
+    public void setValueChangingOperation(final boolean valueChangingOperation) {
+        this.valueChangingOperation = valueChangingOperation;
+    }
+
     public void keyChangingOperation(final boolean keyChangingOperation) {
         this.keyChangingOperation = keyChangingOperation;
     }
 
-    public void setId(final int id) {
-        this.id = id;
+    public void setBuildPriority(final int buildPriority) {
+        this.buildPriority = buildPriority;
     }
 
-    public Integer id() {
-        return this.id;
+    public Integer buildPriority() {
+        return this.buildPriority;
     }
 
     public abstract void writeToTopology(final InternalTopologyBuilder topologyBuilder);
@@ -114,7 +139,7 @@ public abstract class StreamsGraphNode {
         final String[] parentNames = parentNodeNames();
         return "StreamsGraphNode{" +
                "nodeName='" + nodeName + '\'' +
-               ", id=" + id +
+               ", buildPriority=" + buildPriority +
                " parentNodes=" + Arrays.toString(parentNames) + '}';
     }
 }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
new file mode 100644
index 0000000..e192c70
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionOptimizingIntegrationTest.java
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.Joined;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import kafka.utils.MockTime;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@Category({IntegrationTest.class})
+public class RepartitionOptimizingIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_TOPIC = "input";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String AGGREGATION_TOPIC = "outputTopic_1";
+    private static final String REDUCE_TOPIC = "outputTopic_2";
+    private static final String JOINED_TOPIC = "joinedOutputTopic";
+
+    private static final int ONE_REPARTITION_TOPIC = 1;
+    private static final int FOUR_REPARTITION_TOPICS = 4;
+
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+
+    private Properties streamsConfiguration;
+
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
+
+    @Before
+    public void setUp() throws Exception {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+            "maybe-optimized-test-app",
+            CLUSTER.bootstrapServers(),
+            Serdes.String().getClass().getName(),
+            Serdes.String().getClass().getName(),
+            props);
+
+        CLUSTER.createTopics(INPUT_TOPIC,
+                             COUNT_TOPIC,
+                             AGGREGATION_TOPIC,
+                             REDUCE_TOPIC,
+                             JOINED_TOPIC);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    @Test
+    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
+        runIntegrationTest(StreamsConfig.OPTIMIZE,
+                           ONE_REPARTITION_TOPIC);
+    }
+
+    @Test
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
+        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
+                           FOUR_REPARTITION_TOPICS);
+    }
+
+
+    private void runIntegrationTest(final String optimizationConfig,
+                                    final int expectedNumberRepartitionTopics) throws Exception {
+
+        final Initializer<Integer> initializer = () -> 0;
+        final Aggregator<String, String, Integer> aggregator = (k, v, agg) -> agg + v.length();
+
+        final Reducer<String> reducer = (v1, v2) -> v1 + ":" + v2;
+
+        final List<String> processorValueCollector = new ArrayList<>();
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceStream = builder.stream(INPUT_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedStream = sourceStream.map((k, v) -> KeyValue.pair(k.toUpperCase(Locale.getDefault()), v));
+
+        mappedStream.filter((k, v) -> k.equals("B")).mapValues(v -> v.toUpperCase(Locale.getDefault()))
+            .process(() -> new SimpleProcessor(processorValueCollector));
+
+        final KStream<String, Long> countStream = mappedStream.groupByKey().count(Materialized.with(Serdes.String(), Serdes.Long())).toStream();
+
+        countStream.to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
+
+        mappedStream.groupByKey().aggregate(initializer,
+                                            aggregator,
+                                            Materialized.with(Serdes.String(), Serdes.Integer()))
+            .toStream().to(AGGREGATION_TOPIC, Produced.with(Serdes.String(), Serdes.Integer()));
+
+        // adding operators for case where the repartition node is further downstream
+        mappedStream.filter((k, v) -> true).peek((k, v) -> System.out.println(k + ":" + v)).groupByKey()
+            .reduce(reducer, Materialized.with(Serdes.String(), Serdes.String()))
+            .toStream().to(REDUCE_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+
+        mappedStream.filter((k, v) -> k.equals("A"))
+            .join(countStream, (v1, v2) -> v1 + ":" + v2.toString(),
+                  JoinWindows.of(5000),
+                  Joined.with(Serdes.String(), Serdes.String(), Serdes.Long()))
+            .to(JOINED_TOPIC);
+
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+
+        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_TOPIC, getKeyValues(), producerConfig, mockTime);
+
+        final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
+        final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, IntegerDeserializer.class);
+        final Properties consumerConfig3 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+
+        final Topology topology = builder.build(streamsConfiguration);
+        final String topologyString = topology.describe().toString();
+
+        if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
+            assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
+        } else {
+            assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
+        }
+
+
+        /*
+           confirming number of expected repartition topics here
+         */
+        assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString));
+
+        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
+        streams.start();
+
+        final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 3L), KeyValue.pair("B", 3L), KeyValue.pair("C", 3L));
+        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size());
+
+        final List<KeyValue<String, Integer>> expectedAggKeyValues = Arrays.asList(KeyValue.pair("A", 9), KeyValue.pair("B", 9), KeyValue.pair("C", 9));
+        final List<KeyValue<String, Integer>> receivedAggKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, AGGREGATION_TOPIC, expectedAggKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedReduceKeyValues = Arrays.asList(KeyValue.pair("A", "foo:bar:baz"), KeyValue.pair("B", "foo:bar:baz"), KeyValue.pair("C", "foo:bar:baz"));
+        final List<KeyValue<String, Integer>> receivedReduceKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, REDUCE_TOPIC, expectedAggKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedJoinKeyValues = Arrays.asList(KeyValue.pair("A", "foo:3"), KeyValue.pair("A", "bar:3"), KeyValue.pair("A", "baz:3"));
+        final List<KeyValue<String, Integer>> receivedJoinKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig3, JOINED_TOPIC, expectedJoinKeyValues.size());
+
+
+        final List<String> expectedCollectedProcessorValues = Arrays.asList("FOO", "BAR", "BAZ");
+
+        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
+        assertThat(receivedAggKeyValues, equalTo(expectedAggKeyValues));
+        assertThat(receivedReduceKeyValues, equalTo(expectedReduceKeyValues));
+        assertThat(receivedJoinKeyValues, equalTo(expectedJoinKeyValues));
+
+        assertThat(3, equalTo(processorValueCollector.size()));
+        assertThat(processorValueCollector, equalTo(expectedCollectedProcessorValues));
+
+        streams.close(5, TimeUnit.SECONDS);
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private List<KeyValue<String, String>> getKeyValues() {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+        final String[] keys = new String[]{"a", "b", "c"};
+        final String[] values = new String[]{"foo", "bar", "baz"};
+        for (final String key : keys) {
+            for (final String value : values) {
+                keyValueList.add(KeyValue.pair(key, value));
+            }
+        }
+        return keyValueList;
+    }
+
+
+    private static class SimpleProcessor extends AbstractProcessor<String, String> {
+
+        final List<String> valueList;
+
+        SimpleProcessor(final List<String> valueList) {
+            this.valueList = valueList;
+        }
+
+        @Override
+        public void process(final String key, final String value) {
+            valueList.add(value);
+        }
+    }
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                              + "   Sub-topology: 0\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n"
+                                                              + "      --> KSTREAM-MAP-0000000001\n"
+                                                              + "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
+                                                              + "      --> KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000040\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
+                                                              + "      --> KSTREAM-MAPVALUES-0000000003\n"
+                                                              + "      <-- KSTREAM-MAP-0000000001\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000040 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000039\n"
+                                                              + "      <-- KSTREAM-MAP-0000000001\n"
+                                                              + "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
+                                                              + "      --> KSTREAM-PROCESSOR-0000000004\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000002\n"
+                                                              + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
+                                                              + "      --> none\n"
+                                                              + "      <-- KSTREAM-MAPVALUES-0000000003\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000039 (topic: KSTREAM-MAP-0000000001-optimized-repartition)\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000040\n"
+                                                              + "\n"
+                                                              + "  Sub-topology: 1\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000041 (topics: [KSTREAM-MAP-0000000001-optimized-repartition])\n"
+                                                              + "      --> KSTREAM-FILTER-0000000020, KSTREAM-AGGREGATE-0000000007, KSTREAM-AGGREGATE-0000000014, KSTREAM-FILTER-0000000029\n"
+                                                              + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
+                                                              + "      --> KTABLE-TOSTREAM-0000000011\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
+                                                              + "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
+                                                              + "      <-- KSTREAM-AGGREGATE-0000000007\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                              + "      --> KSTREAM-PEEK-0000000021\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
+                                                              + "      --> KSTREAM-WINDOWED-0000000033\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
+                                                              + "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
+                                                              + "      --> KSTREAM-REDUCE-0000000023\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000020\n"
+                                                              + "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                              + "      --> KSTREAM-JOINTHIS-0000000035\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000029\n"
+                                                              + "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                              + "      --> KSTREAM-JOINOTHER-0000000036\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000011\n"
+                                                              + "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
+                                                              + "      --> KTABLE-TOSTREAM-0000000018\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000041\n"
+                                                              + "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                              + "      --> KSTREAM-MERGE-0000000037\n"
+                                                              + "      <-- KSTREAM-WINDOWED-0000000034\n"
+                                                              + "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                              + "      --> KSTREAM-MERGE-0000000037\n"
+                                                              + "      <-- KSTREAM-WINDOWED-0000000033\n"
+                                                              + "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
+                                                              + "      --> KTABLE-TOSTREAM-0000000027\n"
+                                                              + "      <-- KSTREAM-PEEK-0000000021\n"
+                                                              + "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000038\n"
+                                                              + "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
+                                                              + "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000019\n"
+                                                              + "      <-- KSTREAM-AGGREGATE-0000000014\n"
+                                                              + "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000028\n"
+                                                              + "      <-- KSTREAM-REDUCE-0000000023\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000011\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000018\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000027\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
+                                                              + "      <-- KSTREAM-MERGE-0000000037\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                                + "   Sub-topology: 0\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000000 (topics: [input])\n"
+                                                                + "      --> KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-MAP-0000000001 (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000020, KSTREAM-FILTER-0000000002, KSTREAM-FILTER-0000000009, KSTREAM-FILTER-0000000016, KSTREAM-FILTER-0000000029\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000020 (stores: [])\n"
+                                                                + "      --> KSTREAM-PEEK-0000000021\n"
+                                                                + "      <-- KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000002 (stores: [])\n"
+                                                                + "      --> KSTREAM-MAPVALUES-0000000003\n"
+                                                                + "      <-- KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000029 (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000031\n"
+                                                                + "      <-- KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-PEEK-0000000021 (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000025\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000020\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000009 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000008\n"
+                                                                + "      <-- KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000016 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000015\n"
+                                                                + "      <-- KSTREAM-MAP-0000000001\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000025 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000024\n"
+                                                                + "      <-- KSTREAM-PEEK-0000000021\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000031 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000030\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000029\n"
+                                                                + "    Processor: KSTREAM-MAPVALUES-0000000003 (stores: [])\n"
+                                                                + "      --> KSTREAM-PROCESSOR-0000000004\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000002\n"
+                                                                + "    Processor: KSTREAM-PROCESSOR-0000000004 (stores: [])\n"
+                                                                + "      --> none\n"
+                                                                + "      <-- KSTREAM-MAPVALUES-0000000003\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000008 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000009\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000015 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000016\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000024 (topic: KSTREAM-REDUCE-STATE-STORE-0000000022-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000025\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000030 (topic: KSTREAM-FILTER-0000000029-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000031\n"
+                                                                + "\n"
+                                                                + "  Sub-topology: 1\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000010 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000006-repartition])\n"
+                                                                + "      --> KSTREAM-AGGREGATE-0000000007\n"
+                                                                + "    Processor: KSTREAM-AGGREGATE-0000000007 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000006])\n"
+                                                                + "      --> KTABLE-TOSTREAM-0000000011\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000010\n"
+                                                                + "    Processor: KTABLE-TOSTREAM-0000000011 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000012, KSTREAM-WINDOWED-0000000034\n"
+                                                                + "      <-- KSTREAM-AGGREGATE-0000000007\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000032 (topics: [KSTREAM-FILTER-0000000029-repartition])\n"
+                                                                + "      --> KSTREAM-WINDOWED-0000000033\n"
+                                                                + "    Processor: KSTREAM-WINDOWED-0000000033 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                                + "      --> KSTREAM-JOINTHIS-0000000035\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000032\n"
+                                                                + "    Processor: KSTREAM-WINDOWED-0000000034 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                                + "      --> KSTREAM-JOINOTHER-0000000036\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000011\n"
+                                                                + "    Processor: KSTREAM-JOINOTHER-0000000036 (stores: [KSTREAM-JOINTHIS-0000000035-store])\n"
+                                                                + "      --> KSTREAM-MERGE-0000000037\n"
+                                                                + "      <-- KSTREAM-WINDOWED-0000000034\n"
+                                                                + "    Processor: KSTREAM-JOINTHIS-0000000035 (stores: [KSTREAM-JOINOTHER-0000000036-store])\n"
+                                                                + "      --> KSTREAM-MERGE-0000000037\n"
+                                                                + "      <-- KSTREAM-WINDOWED-0000000033\n"
+                                                                + "    Processor: KSTREAM-MERGE-0000000037 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000038\n"
+                                                                + "      <-- KSTREAM-JOINTHIS-0000000035, KSTREAM-JOINOTHER-0000000036\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000012 (topic: outputTopic_0)\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000011\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000038 (topic: joinedOutputTopic)\n"
+                                                                + "      <-- KSTREAM-MERGE-0000000037\n"
+                                                                + "\n"
+                                                                + "  Sub-topology: 2\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000017 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000013-repartition])\n"
+                                                                + "      --> KSTREAM-AGGREGATE-0000000014\n"
+                                                                + "    Processor: KSTREAM-AGGREGATE-0000000014 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000013])\n"
+                                                                + "      --> KTABLE-TOSTREAM-0000000018\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000017\n"
+                                                                + "    Processor: KTABLE-TOSTREAM-0000000018 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000019\n"
+                                                                + "      <-- KSTREAM-AGGREGATE-0000000014\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000018\n"
+                                                                + "\n"
+                                                                + "  Sub-topology: 3\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000026 (topics: [KSTREAM-REDUCE-STATE-STORE-0000000022-repartition])\n"
+                                                                + "      --> KSTREAM-REDUCE-0000000023\n"
+                                                                + "    Processor: KSTREAM-REDUCE-0000000023 (stores: [KSTREAM-REDUCE-STATE-STORE-0000000022])\n"
+                                                                + "      --> KTABLE-TOSTREAM-0000000027\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000026\n"
+                                                                + "    Processor: KTABLE-TOSTREAM-0000000027 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000028\n"
+                                                                + "      <-- KSTREAM-REDUCE-0000000023\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000028 (topic: outputTopic_2)\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000027\n\n";
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
new file mode 100644
index 0000000..200062e
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RepartitionWithMergeOptimizingIntegrationTest.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.StreamsTestUtils;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import kafka.utils.MockTime;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+@Category({IntegrationTest.class})
+public class RepartitionWithMergeOptimizingIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
+    private static final String INPUT_A_TOPIC = "inputA";
+    private static final String INPUT_B_TOPIC = "inputB";
+    private static final String COUNT_TOPIC = "outputTopic_0";
+    private static final String COUNT_STRING_TOPIC = "outputTopic_1";
+
+
+    private static final int ONE_REPARTITION_TOPIC = 1;
+    private static final int TWO_REPARTITION_TOPICS = 2;
+
+    private final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
+
+    private Properties streamsConfiguration;
+
+
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+    private final MockTime mockTime = CLUSTER.time;
+
+    @Before
+    public void setUp() throws Exception {
+        final Properties props = new Properties();
+        props.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 1024 * 10);
+        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000);
+        props.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+
+        streamsConfiguration = StreamsTestUtils.getStreamsConfig(
+            "maybe-optimized-with-merge-test-app",
+            CLUSTER.bootstrapServers(),
+            Serdes.String().getClass().getName(),
+            Serdes.String().getClass().getName(),
+            props);
+
+        CLUSTER.createTopics(COUNT_TOPIC,
+                             COUNT_STRING_TOPIC,
+                             INPUT_A_TOPIC,
+                             INPUT_B_TOPIC);
+
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        CLUSTER.deleteAllTopicsAndWait(30_000L);
+    }
+
+    @Test
+    public void shouldSendCorrectRecords_OPTIMIZED() throws Exception {
+        runIntegrationTest(StreamsConfig.OPTIMIZE,
+                           ONE_REPARTITION_TOPIC);
+    }
+
+    @Test
+    public void shouldSendCorrectResults_NO_OPTIMIZATION() throws Exception {
+        runIntegrationTest(StreamsConfig.NO_OPTIMIZATION,
+                           TWO_REPARTITION_TOPICS);
+    }
+
+
+    private void runIntegrationTest(final String optimizationConfig,
+                                    final int expectedNumberRepartitionTopics) throws Exception {
+
+
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> sourceAStream = builder.stream(INPUT_A_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> sourceBStream = builder.stream(INPUT_B_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
+
+        final KStream<String, String> mappedAStream = sourceAStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v));
+        final KStream<String, String> mappedBStream = sourceBStream.map((k, v) -> KeyValue.pair(v.split(":")[0], v));
+
+        final KStream<String, String> mergedStream = mappedAStream.merge(mappedBStream);
+
+        mergedStream.groupByKey().count().toStream().to(COUNT_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));
+        mergedStream.groupByKey().count().toStream().mapValues(v -> v.toString()).to(COUNT_STRING_TOPIC, Produced.with(Serdes.String(), Serdes.String()));
+
+        streamsConfiguration.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizationConfig);
+
+        final Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
+
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_A_TOPIC, getKeyValues(), producerConfig, mockTime);
+        IntegrationTestUtils.produceKeyValuesSynchronously(INPUT_B_TOPIC, getKeyValues(), producerConfig, mockTime);
+
+        final Properties consumerConfig1 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, LongDeserializer.class);
+        final Properties consumerConfig2 = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
+
+        final Topology topology = builder.build(streamsConfiguration);
+        final String topologyString = topology.describe().toString();
+        System.out.println(topologyString);
+
+        if (optimizationConfig.equals(StreamsConfig.OPTIMIZE)) {
+            assertEquals(EXPECTED_OPTIMIZED_TOPOLOGY, topologyString);
+        } else {
+            assertEquals(EXPECTED_UNOPTIMIZED_TOPOLOGY, topologyString);
+        }
+
+
+        /*
+           confirming number of expected repartition topics here
+         */
+        assertEquals(expectedNumberRepartitionTopics, getCountOfRepartitionTopicsFound(topologyString));
+
+        final KafkaStreams streams = new KafkaStreams(topology, streamsConfiguration);
+        streams.start();
+
+        final List<KeyValue<String, Long>> expectedCountKeyValues = Arrays.asList(KeyValue.pair("A", 6L), KeyValue.pair("B", 6L), KeyValue.pair("C", 6L));
+        final List<KeyValue<String, Long>> receivedCountKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig1, COUNT_TOPIC, expectedCountKeyValues.size());
+
+        final List<KeyValue<String, String>> expectedStringCountKeyValues = Arrays.asList(KeyValue.pair("A", "6"), KeyValue.pair("B", "6"), KeyValue.pair("C", "6"));
+        final List<KeyValue<String, String>> receivedCountStringKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig2, COUNT_STRING_TOPIC, expectedStringCountKeyValues.size());
+
+        assertThat(receivedCountKeyValues, equalTo(expectedCountKeyValues));
+        assertThat(receivedCountStringKeyValues, equalTo(expectedStringCountKeyValues));
+
+        streams.close(5, TimeUnit.SECONDS);
+    }
+
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
+
+    private List<KeyValue<String, String>> getKeyValues() {
+        final List<KeyValue<String, String>> keyValueList = new ArrayList<>();
+        final String[] keys = new String[]{"X", "Y", "Z"};
+        final String[] values = new String[]{"A:foo", "B:foo", "C:foo"};
+        for (final String key : keys) {
+            for (final String value : values) {
+                keyValueList.add(KeyValue.pair(key, value));
+            }
+        }
+        return keyValueList;
+    }
+
+
+
+    private static final String EXPECTED_OPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                              + "   Sub-topology: 0\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
+                                                              + "      --> KSTREAM-MAP-0000000002\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
+                                                              + "      --> KSTREAM-MAP-0000000003\n"
+                                                              + "    Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
+                                                              + "      --> KSTREAM-MERGE-0000000004\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                                              + "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
+                                                              + "      --> KSTREAM-MERGE-0000000004\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000001\n"
+                                                              + "    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
+                                                              + "      --> KSTREAM-FILTER-0000000021\n"
+                                                              + "      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
+                                                              + "    Processor: KSTREAM-FILTER-0000000021 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000020\n"
+                                                              + "      <-- KSTREAM-MERGE-0000000004\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000020 (topic: KSTREAM-MERGE-0000000004-optimized-repartition)\n"
+                                                              + "      <-- KSTREAM-FILTER-0000000021\n"
+                                                              + "\n"
+                                                              + "  Sub-topology: 1\n"
+                                                              + "    Source: KSTREAM-SOURCE-0000000022 (topics: [KSTREAM-MERGE-0000000004-optimized-repartition])\n"
+                                                              + "      --> KSTREAM-AGGREGATE-0000000006, KSTREAM-AGGREGATE-0000000013\n"
+                                                              + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
+                                                              + "      --> KTABLE-TOSTREAM-0000000017\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000022\n"
+                                                              + "    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
+                                                              + "      --> KTABLE-TOSTREAM-0000000010\n"
+                                                              + "      <-- KSTREAM-SOURCE-0000000022\n"
+                                                              + "    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
+                                                              + "      --> KSTREAM-MAPVALUES-0000000018\n"
+                                                              + "      <-- KSTREAM-AGGREGATE-0000000013\n"
+                                                              + "    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000019\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000017\n"
+                                                              + "    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
+                                                              + "      --> KSTREAM-SINK-0000000011\n"
+                                                              + "      <-- KSTREAM-AGGREGATE-0000000006\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
+                                                              + "      <-- KTABLE-TOSTREAM-0000000010\n"
+                                                              + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                              + "      <-- KSTREAM-MAPVALUES-0000000018\n\n";
+
+
+    private static final String EXPECTED_UNOPTIMIZED_TOPOLOGY = "Topologies:\n"
+                                                                + "   Sub-topology: 0\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000000 (topics: [inputA])\n"
+                                                                + "      --> KSTREAM-MAP-0000000002\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000001 (topics: [inputB])\n"
+                                                                + "      --> KSTREAM-MAP-0000000003\n"
+                                                                + "    Processor: KSTREAM-MAP-0000000002 (stores: [])\n"
+                                                                + "      --> KSTREAM-MERGE-0000000004\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000000\n"
+                                                                + "    Processor: KSTREAM-MAP-0000000003 (stores: [])\n"
+                                                                + "      --> KSTREAM-MERGE-0000000004\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000001\n"
+                                                                + "    Processor: KSTREAM-MERGE-0000000004 (stores: [])\n"
+                                                                + "      --> KSTREAM-FILTER-0000000008, KSTREAM-FILTER-0000000015\n"
+                                                                + "      <-- KSTREAM-MAP-0000000002, KSTREAM-MAP-0000000003\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000008 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000007\n"
+                                                                + "      <-- KSTREAM-MERGE-0000000004\n"
+                                                                + "    Processor: KSTREAM-FILTER-0000000015 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000014\n"
+                                                                + "      <-- KSTREAM-MERGE-0000000004\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000007 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000008\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000014 (topic: KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition)\n"
+                                                                + "      <-- KSTREAM-FILTER-0000000015\n"
+                                                                + "\n"
+                                                                + "  Sub-topology: 1\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000009 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000005-repartition])\n"
+                                                                + "      --> KSTREAM-AGGREGATE-0000000006\n"
+                                                                + "    Processor: KSTREAM-AGGREGATE-0000000006 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000005])\n"
+                                                                + "      --> KTABLE-TOSTREAM-0000000010\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000009\n"
+                                                                + "    Processor: KTABLE-TOSTREAM-0000000010 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000011\n"
+                                                                + "      <-- KSTREAM-AGGREGATE-0000000006\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000011 (topic: outputTopic_0)\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000010\n"
+                                                                + "\n"
+                                                                + "  Sub-topology: 2\n"
+                                                                + "    Source: KSTREAM-SOURCE-0000000016 (topics: [KSTREAM-AGGREGATE-STATE-STORE-0000000012-repartition])\n"
+                                                                + "      --> KSTREAM-AGGREGATE-0000000013\n"
+                                                                + "    Processor: KSTREAM-AGGREGATE-0000000013 (stores: [KSTREAM-AGGREGATE-STATE-STORE-0000000012])\n"
+                                                                + "      --> KTABLE-TOSTREAM-0000000017\n"
+                                                                + "      <-- KSTREAM-SOURCE-0000000016\n"
+                                                                + "    Processor: KTABLE-TOSTREAM-0000000017 (stores: [])\n"
+                                                                + "      --> KSTREAM-MAPVALUES-0000000018\n"
+                                                                + "      <-- KSTREAM-AGGREGATE-0000000013\n"
+                                                                + "    Processor: KSTREAM-MAPVALUES-0000000018 (stores: [])\n"
+                                                                + "      --> KSTREAM-SINK-0000000019\n"
+                                                                + "      <-- KTABLE-TOSTREAM-0000000017\n"
+                                                                + "    Sink: KSTREAM-SINK-0000000019 (topic: outputTopic_1)\n"
+                                                                + "      <-- KSTREAM-MAPVALUES-0000000018\n\n";
+
+}
\ No newline at end of file
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
index 2bddf41..d65f27e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/graph/StreamsGraphTest.java
@@ -18,15 +18,121 @@
 package org.apache.kafka.streams.kstream.internals.graph;
 
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.junit.Test;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 import static org.junit.Assert.assertEquals;
 
 public class StreamsGraphTest {
+
+    final Pattern repartitionTopicPattern = Pattern.compile("Sink: .*-repartition");
     
+
+
+    // Test builds topology in succesive manner but only graph node not yet processed written to topology
+
+    @Test
+    public void shouldBeAbleToBuildTopologyIncrementally() {
+        final StreamsBuilder builder = new StreamsBuilder();
+
+        final KStream<String, String> stream = builder.stream("topic");
+        final KStream<String, String> streamII = builder.stream("other-topic");
+        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
+
+
+        final KStream<String, String> joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of(5000));
+
+        // build step one
+        assertEquals(expectedJoinedTopology, builder.build().describe().toString());
+
+        final KStream<String, String> filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo"));
+        // build step two
+        assertEquals(expectedJoinedFilteredTopology, builder.build().describe().toString());
+
+        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
+        // build step three
+        assertEquals(expectedFullTopology, builder.build().describe().toString());
+
+    }
+
+    @Test
+    public void shouldNotOptimizeWithValueOrKeyChangingOperatorsAfterInitialKeyChange() {
+
+        final Topology attemptedOptimize = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.OPTIMIZE);
+        final Topology noOptimization = getTopologyWithChangingValuesAfterChangingKey(StreamsConfig.NO_OPTIMIZATION);
+
+        assertEquals(attemptedOptimize.describe().toString(), noOptimization.describe().toString());
+        assertEquals(2, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(2, getCountOfRepartitionTopicsFound(noOptimization.describe().toString()));
+    }
+
+    // no need to optimize as user has already performed the repartitioning manually
+    @Test
+    public void shouldNotOptimizeWhenAThroughOperationIsDone() {
+
+        final Topology attemptedOptimize = getTopologyWithThroughOperation(StreamsConfig.OPTIMIZE);
+        final Topology noOptimziation = getTopologyWithThroughOperation(StreamsConfig.NO_OPTIMIZATION);
+
+        assertEquals(attemptedOptimize.describe().toString(), noOptimziation.describe().toString());
+        assertEquals(0, getCountOfRepartitionTopicsFound(attemptedOptimize.describe().toString()));
+        assertEquals(0, getCountOfRepartitionTopicsFound(noOptimziation.describe().toString()));
+
+    }
+
+    private Topology getTopologyWithChangingValuesAfterChangingKey(final String optimizeConfig) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+
+        final KStream<String, String> inputStream = builder.stream("input");
+        final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v);
+
+        mappedKeyStream.mapValues(v -> v.toUpperCase(Locale.getDefault())).groupByKey().count().toStream().to("output");
+        mappedKeyStream.flatMapValues(v -> Arrays.asList(v.split("\\s"))).groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output");
+
+        return builder.build(properties);
+
+    }
+
+    private Topology getTopologyWithThroughOperation(final String optimizeConfig) {
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        final Properties properties = new Properties();
+        properties.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, optimizeConfig);
+
+        final KStream<String, String> inputStream = builder.stream("input");
+        final KStream<String, String> mappedKeyStream = inputStream.selectKey((k, v) -> k + v).through("through-topic");
+
+        mappedKeyStream.groupByKey().count().toStream().to("output");
+        mappedKeyStream.groupByKey().windowedBy(TimeWindows.of(5000)).count().toStream().to("windowed-output");
+
+        return builder.build(properties);
+
+    }
+
+    private int getCountOfRepartitionTopicsFound(final String topologyString) {
+        final Matcher matcher = repartitionTopicPattern.matcher(topologyString);
+        final List<String> repartitionTopicsFound = new ArrayList<>();
+        while (matcher.find()) {
+            repartitionTopicsFound.add(matcher.group());
+        }
+        return repartitionTopicsFound.size();
+    }
+
     private String expectedJoinedTopology = "Topologies:\n"
                                             + "   Sub-topology: 0\n"
                                             + "    Source: KSTREAM-SOURCE-0000000000 (topics: [topic])\n"
@@ -104,31 +210,4 @@ public class StreamsGraphTest {
                                           + "    Sink: KSTREAM-SINK-0000000009 (topic: output-topic)\n"
                                           + "      <-- KSTREAM-MAPVALUES-0000000008\n\n";
 
-    // Test builds topology in succesive manner but only graph node not yet processed written to topology
-
-    @Test
-    public void shouldBeAbleToBuildTopologyIncrementally() {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KStream<String, String> stream = builder.stream("topic");
-        final KStream<String, String> streamII = builder.stream("other-topic");
-        final ValueJoiner<String, String, String> valueJoiner = (v, v2) -> v + v2;
-
-
-        final KStream<String, String> joinedStream = stream.join(streamII, valueJoiner, JoinWindows.of(5000));
-
-        // build step one
-        assertEquals(expectedJoinedTopology, builder.build().describe().toString());
-
-        final KStream<String, String> filteredJoinStream = joinedStream.filter((k, v) -> v.equals("foo"));
-        // build step two
-        assertEquals(expectedJoinedFilteredTopology, builder.build().describe().toString());
-
-        filteredJoinStream.mapValues(v -> v + "some value").to("output-topic");
-        // build step three
-        assertEquals(expectedFullTopology, builder.build().describe().toString());
-
-
-    }
-
 }


Mime
View raw message