kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka git commit: KAFKA-3561: Auto create through topic for KStream aggregation and join
Date Thu, 16 Jun 2016 18:56:37 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
index 0c94084..1f9a473 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java
@@ -38,7 +38,7 @@ public interface PartitionGrouper {
      * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this
      * interface. See {@link DefaultPartitionGrouper} for more information.
      *
-     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics
+     * @param topicGroups The map from the {@link TopologyBuilder#topicGroups()} topic group} id to topics
      * @param metadata Metadata of the consuming cluster
      * @return a map of task ids to groups of partitions
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 1743baf..19440e4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -68,6 +68,8 @@ public class TopologyBuilder {
     private final HashMap<String, Pattern> topicToPatterns = new HashMap<>();
     private final HashMap<String, String> nodeToSinkTopic = new HashMap<>();
     private SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
+    private String applicationId;
+
     private Map<Integer, Set<String>> nodeGroups = null;
     private Pattern topicPattern;
 
@@ -601,8 +603,8 @@ public class TopologyBuilder {
      *
      * @return groups of topic names
      */
-    public Map<Integer, TopicsInfo> topicGroups(String applicationId) {
-        Map<Integer, TopicsInfo> topicGroups = new HashMap<>();
+    public Map<Integer, TopicsInfo> topicGroups() {
+        Map<Integer, TopicsInfo> topicGroups = new LinkedHashMap<>();
 
 
         if (subscriptionUpdates.hasUpdates()) {
@@ -629,6 +631,12 @@ public class TopologyBuilder {
                     // if some of the topics are internal, add them to the internal topics
                     for (String topic : topics) {
                         if (this.internalTopicNames.contains(topic)) {
+                            if (applicationId == null) {
+                                throw new TopologyBuilderException("There are internal topics and"
+                                                                   + " applicationId hasn't been "
+                                                                   + "set. Call setApplicationId "
+                                                                   + "first");
+                            }
                             // prefix the internal topic name with the application id
                             String internalTopic = applicationId + "-" + topic;
                             internalSourceTopics.add(internalTopic);
@@ -681,7 +689,7 @@ public class TopologyBuilder {
     }
 
     private Map<Integer, Set<String>> makeNodeGroups() {
-        HashMap<Integer, Set<String>> nodeGroups = new HashMap<>();
+        HashMap<Integer, Set<String>> nodeGroups = new LinkedHashMap<>();
         HashMap<String, Set<String>> rootToNodeGroup = new HashMap<>();
 
         int nodeGroupId = 0;
@@ -739,13 +747,30 @@ public class TopologyBuilder {
             for (String node : nodeNames) {
                 String[] topics = nodeToSourceTopics.get(node);
                 if (topics != null)
-                    copartitionGroup.addAll(Arrays.asList(topics));
+                    copartitionGroup.addAll(convertInternalTopicNames(topics));
             }
             list.add(Collections.unmodifiableSet(copartitionGroup));
         }
         return Collections.unmodifiableList(list);
     }
 
+    private List<String> convertInternalTopicNames(String...topics) {
+        final List<String> topicNames = new ArrayList<>();
+        for (String topic : topics) {
+            if (internalTopicNames.contains(topic)) {
+                if (applicationId == null) {
+                    throw new TopologyBuilderException("there are internal topics "
+                                                       + "and applicationId hasn't been set. Call "
+                                                       + "setApplicationId first");
+                }
+                topicNames.add(applicationId + "-" + topic);
+            } else {
+                topicNames.add(topic);
+            }
+        }
+        return topicNames;
+    }
+
     /**
      * Build the topology for the specified topic group. This is called automatically when passing this builder into the
      * {@link org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig)} constructor.
@@ -814,10 +839,15 @@ public class TopologyBuilder {
      * Get the names of topics that are to be consumed by the source nodes created by this builder.
      * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null
      */
-    public Set<String> sourceTopics(String applicationId) {
+    public Set<String> sourceTopics() {
         Set<String> topics = new HashSet<>();
         for (String topic : sourceTopicNames) {
             if (internalTopicNames.contains(topic)) {
+                if (applicationId == null) {
+                    throw new TopologyBuilderException("there are internal topics and "
+                                                       + "applicationId is null. Call "
+                                                       + "setApplicationId before sourceTopics");
+                }
                 topics.add(applicationId + "-" + topic);
             } else {
                 topics.add(topic);
@@ -849,4 +879,14 @@ public class TopologyBuilder {
     public void updateSubscriptions(SubscriptionUpdates subscriptionUpdates) {
         this.subscriptionUpdates = subscriptionUpdates;
     }
+
+    /**
+     * Set the applicationId. This is required before calling
+     * {@link #sourceTopics}, {@link #topicGroups} and {@link #copartitionSources}
+     * @param applicationId   the streams applicationId. Should be the same as set by
+     * {@link org.apache.kafka.streams.StreamsConfig#APPLICATION_ID_CONFIG}
+     */
+    public void setApplicationId(String applicationId) {
+        this.applicationId = applicationId;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
index eb731be..07f9a1f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java
@@ -81,7 +81,9 @@ public class RecordCollector {
             if (partitions != null)
                 partition = partitioner.partition(record.key(), record.value(), partitions.size());
         }
-        this.producer.send(new ProducerRecord<>(record.topic(), partition, keyBytes, valBytes), callback);
+        this.producer.send(new ProducerRecord<>(record.topic(), partition, record.timestamp(),
+                                                keyBytes,
+                                                valBytes), callback);
     }
 
     public void flush() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index adefab9..4b52511 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
@@ -259,7 +260,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
 
         streamThread.builder.updateSubscriptions(subscriptionUpdates);
-        this.topicGroups = streamThread.builder.topicGroups(streamThread.applicationId);
+        this.topicGroups = streamThread.builder.topicGroups();
 
         // ensure the co-partitioning topics within the group have the same number of partitions,
         // and enforce the number of partitions for those internal topics.
@@ -270,14 +271,15 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             sourceTopicGroups.put(entry.getKey(), entry.getValue().sourceTopics);
             internalSourceTopicGroups.put(entry.getKey(), entry.getValue().interSourceTopics);
         }
-        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
 
-        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups, metadata);
 
-        // for those internal source topics that do not have co-partition enforcement,
+        // for all internal source topics
         // set the number of partitions to the maximum of the depending sub-topologies source topics
+        Map<TopicPartition, PartitionInfo> internalPartitionInfos = new HashMap<>();
+        Set<String> allInternalTopicNames = new HashSet<>();
         for (Map.Entry<Integer, TopologyBuilder.TopicsInfo> entry : topicGroups.entrySet()) {
             Set<String> internalTopics = entry.getValue().interSourceTopics;
+            allInternalTopicNames.addAll(internalTopics);
             for (String internalTopic : internalTopics) {
                 Set<TaskId> tasks = internalSourceTopicToTaskIds.get(internalTopic);
 
@@ -288,20 +290,41 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                         if (otherSinkTopics.contains(internalTopic)) {
                             for (String topic : other.getValue().sourceTopics) {
-                                List<PartitionInfo> infos = metadata.partitionsForTopic(topic);
-
-                                if (infos != null && infos.size() > numPartitions)
-                                    numPartitions = infos.size();
+                                Integer partitions = null;
+                                // It is possible the sourceTopic is another internal topic, i.e,
+                                // map().join().join(map())
+                                if (allInternalTopicNames.contains(topic)) {
+                                    Set<TaskId> taskIds = internalSourceTopicToTaskIds.get(topic);
+                                    if (taskIds != null) {
+                                        for (TaskId taskId : taskIds) {
+                                            partitions = taskId.partition;
+                                        }
+                                    }
+                                } else {
+                                    partitions = metadata.partitionCountForTopic(topic);
+                                }
+                                if (partitions != null && partitions > numPartitions) {
+                                    numPartitions = partitions;
+                                }
                             }
                         }
                     }
-
                     internalSourceTopicToTaskIds.put(internalTopic, Collections.singleton(new TaskId(entry.getKey(), numPartitions)));
+                    for (int partition = 0; partition < numPartitions; partition++) {
+                        internalPartitionInfos.put(new TopicPartition(internalTopic, partition),
+                                                   new PartitionInfo(internalTopic, partition, null, new Node[0], new Node[0]));
+                    }
                 }
             }
         }
 
-        Map<TopicPartition, PartitionInfo> internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
+
+        Collection<Set<String>> copartitionTopicGroups = streamThread.builder.copartitionGroups();
+        ensureCopartitioning(copartitionTopicGroups, internalSourceTopicGroups,
+                             metadata.withPartitions(internalPartitionInfos));
+
+
+        internalPartitionInfos = prepareTopic(internalSourceTopicToTaskIds, false, false);
         internalSourceTopicToTaskIds.clear();
 
         Cluster metadataWithInternalTopics = metadata;
@@ -469,10 +492,22 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             }
         }
 
+        if (numPartitions == -1) {
+            for (String topic : internalTopics) {
+                if (copartitionGroup.contains(topic)) {
+                    Integer partitions = metadata.partitionCountForTopic(topic);
+                    if (partitions != null && partitions > numPartitions) {
+                        numPartitions = partitions;
+                    }
+                }
+            }
+        }
         // enforce co-partitioning restrictions to internal topics reusing internalSourceTopicToTaskIds
         for (String topic : internalTopics) {
-            if (copartitionGroup.contains(topic))
-                internalSourceTopicToTaskIds.put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+            if (copartitionGroup.contains(topic)) {
+                internalSourceTopicToTaskIds
+                    .put(topic, Collections.singleton(new TaskId(-1, numPartitions)));
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 64127a8..d1ce40f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -161,7 +161,7 @@ public class StreamThread extends Thread {
         this.applicationId = applicationId;
         this.config = config;
         this.builder = builder;
-        this.sourceTopics = builder.sourceTopics(applicationId);
+        this.sourceTopics = builder.sourceTopics();
         this.topicPattern = builder.sourceTopicPattern();
         this.clientId = clientId;
         this.processId = processId;

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 809a238..b642b2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -25,7 +25,6 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -135,12 +134,12 @@ public class InternalTopicIntegrationTest {
                 public Iterable<String> apply(String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
-            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+            }).groupBy(new KeyValueMapper<String, String, String>() {
                 @Override
-                public KeyValue<String, String> apply(String key, String value) {
-                    return new KeyValue<String, String>(value, value);
+                public String apply(String key, String value) {
+                    return value;
                 }
-            }).countByKey("Counts").toStream();
+            }).count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 9e9d366..ea216f3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -205,12 +205,13 @@ public class JoinIntegrationTest {
                 }
             })
             // Compute the total per region by summing the individual click counts per region.
-            .reduceByKey(new Reducer<Long>() {
+            .groupByKey(stringSerde, longSerde)
+            .reduce(new Reducer<Long>() {
                 @Override
                 public Long apply(Long value1, Long value2) {
                     return value1 + value2;
                 }
-            }, stringSerde, longSerde, "ClicksPerRegionUnwindowed");
+            }, "ClicksPerRegionUnwindowed");
 
         // Write the (continuously updating) results to the output topic.
         clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
new file mode 100644
index 0000000..44e92f7
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KGroupedStreamIntegrationTest.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KGroupedStreamIntegrationTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+    private static volatile int testNo = 0;
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String outputTopic;
+    private KGroupedStream<String, String> groupedStream;
+    private Reducer<String> reducer;
+    private Initializer<Integer> initializer;
+    private Aggregator<String, String, Integer> aggregator;
+    private KStream<Integer, String> stream;
+
+
+    @Before
+    public void before() {
+        testNo++;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        String applicationId = "kgrouped-stream-test-" +
+                       testNo;
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kgrouped-stream-test");
+
+        KeyValueMapper<Integer, String, String>
+            mapper =
+            new KeyValueMapper<Integer, String, String>() {
+                @Override
+                public String apply(Integer key, String value) {
+                    return value;
+                }
+            };
+        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+        groupedStream = stream
+            .groupBy(
+                mapper,
+                Serdes.String(),
+                Serdes.String());
+
+        reducer = new Reducer<String>() {
+            @Override
+            public String apply(String value1, String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        initializer = new Initializer<Integer>() {
+            @Override
+            public Integer apply() {
+                return 0;
+            }
+        };
+        aggregator = new Aggregator<String, String, Integer>() {
+            @Override
+            public Integer apply(String aggKey, String value, Integer aggregate) {
+                return aggregate + value.length();
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+
+    @Test
+    public void shouldReduce() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream
+            .reduce(reducer, "reduce-by-key")
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, String>> results = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, String>>() {
+            @Override
+            public int compare(KeyValue<String, String> o1, KeyValue<String, String> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(KeyValue.pair("A", "A"),
+                                             KeyValue.pair("A", "A:A"),
+                                             KeyValue.pair("B", "B"),
+                                             KeyValue.pair("B", "B:B"),
+                                             KeyValue.pair("C", "C"),
+                                             KeyValue.pair("C", "C:C"),
+                                             KeyValue.pair("D", "D"),
+                                             KeyValue.pair("D", "D:D"),
+                                             KeyValue.pair("E", "E"),
+                                             KeyValue.pair("E", "E:E"))));
+    }
+
+    @SuppressWarnings("unchecked")
+    private static <K extends Comparable, V extends Comparable> int compare(final KeyValue<K, V> o1,
+                                                                            final KeyValue<K, V> o2) {
+        final int keyComparison = o1.key.compareTo(o2.key);
+        if (keyComparison == 0) {
+            return o1.value.compareTo(o2.value);
+        }
+        return keyComparison;
+    }
+
+    @Test
+    public void shouldReduceWindowed() throws Exception {
+        long firstBatchTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstBatchTimestamp);
+        long secondBatchTimestamp = System.currentTimeMillis();
+        produceMessages(secondBatchTimestamp);
+        produceMessages(secondBatchTimestamp);
+
+        groupedStream
+            .reduce(reducer, TimeWindows.of("reduce-time-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<String>, String, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, String value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, String>> windowedOutput = receiveMessages(
+            new StringDeserializer(),
+            new StringDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, String>>
+            comparator =
+            new Comparator<KeyValue<String, String>>() {
+                @Override
+                public int compare(final KeyValue<String, String> o1,
+                                   final KeyValue<String, String> o2) {
+                    return KGroupedStreamIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedOutput, comparator);
+        long firstBatchWindow = firstBatchTimestamp / 500 * 500;
+        long secondBatchWindow = secondBatchTimestamp / 500 * 500;
+
+        assertThat(windowedOutput, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A"),
+                new KeyValue<>("A@" + secondBatchWindow, "A:A"),
+                new KeyValue<>("B@" + firstBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B"),
+                new KeyValue<>("B@" + secondBatchWindow, "B:B"),
+                new KeyValue<>("C@" + firstBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C"),
+                new KeyValue<>("C@" + secondBatchWindow, "C:C"),
+                new KeyValue<>("D@" + firstBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D"),
+                new KeyValue<>("D@" + secondBatchWindow, "D:D"),
+                new KeyValue<>("E@" + firstBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E"),
+                new KeyValue<>("E@" + secondBatchWindow, "E:E")
+            )
+        ));
+    }
+
+    @Test
+    public void shouldAggregate() throws Exception {
+        produceMessages(System.currentTimeMillis());
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            Serdes.Integer(),
+            "aggregate-by-selected-key")
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Integer>> results = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 10);
+
+        Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
+            @Override
+            public int compare(KeyValue<String, Integer> o1, KeyValue<String, Integer> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1),
+            KeyValue.pair("A", 2),
+            KeyValue.pair("B", 1),
+            KeyValue.pair("B", 2),
+            KeyValue.pair("C", 1),
+            KeyValue.pair("C", 2),
+            KeyValue.pair("D", 1),
+            KeyValue.pair("D", 2),
+            KeyValue.pair("E", 1),
+            KeyValue.pair("E", 2)
+        )));
+    }
+
+    @Test
+    public void shouldAggregateWindowed() throws Exception {
+        long firstTimestamp = System.currentTimeMillis() - 1000;
+        produceMessages(firstTimestamp);
+        long secondTimestamp = System.currentTimeMillis();
+        produceMessages(secondTimestamp);
+        produceMessages(secondTimestamp);
+
+        groupedStream.aggregate(
+            initializer,
+            aggregator,
+            TimeWindows.of("aggregate-by-key-windowed", 500L),
+            Serdes.Integer())
+            .toStream(new KeyValueMapper<Windowed<String>, Integer, String>() {
+                @Override
+                public String apply(Windowed<String> windowedKey, Integer value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            })
+            .to(Serdes.String(), Serdes.Integer(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
+            new StringDeserializer(),
+            new IntegerDeserializer()
+            , 15);
+
+        Comparator<KeyValue<String, Integer>>
+            comparator =
+            new Comparator<KeyValue<String, Integer>>() {
+                @Override
+                public int compare(final KeyValue<String, Integer> o1,
+                                   final KeyValue<String, Integer> o2) {
+                    return KGroupedStreamIntegrationTest.compare(o1, o2);
+                }
+            };
+
+        Collections.sort(windowedMessages, comparator);
+
+        long firstWindow = firstTimestamp / 500 * 500;
+        long secondWindow = secondTimestamp / 500 * 500;
+
+        assertThat(windowedMessages, is(
+            Arrays.asList(
+                new KeyValue<>("A@" + firstWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 1),
+                new KeyValue<>("A@" + secondWindow, 2),
+                new KeyValue<>("B@" + firstWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 1),
+                new KeyValue<>("B@" + secondWindow, 2),
+                new KeyValue<>("C@" + firstWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 1),
+                new KeyValue<>("C@" + secondWindow, 2),
+                new KeyValue<>("D@" + firstWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 1),
+                new KeyValue<>("D@" + secondWindow, 2),
+                new KeyValue<>("E@" + firstWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 1),
+                new KeyValue<>("E@" + secondWindow, 2)
+            )));
+    }
+
+    @Test
+    public void shouldCount() throws Exception {
+        produceMessages(System.currentTimeMillis());
+
+        groupedStream.count("count-by-key")
+            .to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        produceMessages(System.currentTimeMillis());
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("A", 1L),
+            KeyValue.pair("A", 2L),
+            KeyValue.pair("B", 1L),
+            KeyValue.pair("B", 2L),
+            KeyValue.pair("C", 1L),
+            KeyValue.pair("C", 2L),
+            KeyValue.pair("D", 1L),
+            KeyValue.pair("D", 2L),
+            KeyValue.pair("E", 1L),
+            KeyValue.pair("E", 2L)
+        )));
+    }
+
+    @Test
+    public void shouldGroupByKey() throws Exception {
+        long timestamp = System.currentTimeMillis();
+        produceMessages(timestamp);
+        produceMessages(timestamp);
+
+        stream.groupByKey(Serdes.Integer(), Serdes.String())
+            .count(TimeWindows.of("count-windows", 500L))
+            .toStream(new KeyValueMapper<Windowed<Integer>, Long, String>() {
+                @Override
+                public String apply(final Windowed<Integer> windowedKey, final Long value) {
+                    return windowedKey.key() + "@" + windowedKey.window().start();
+                }
+            }).to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        startStreams();
+
+        List<KeyValue<String, Long>> results = receiveMessages(
+            new StringDeserializer(),
+            new LongDeserializer()
+            , 10);
+        Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(KeyValue<String, Long> o1, KeyValue<String, Long> o2) {
+                return KGroupedStreamIntegrationTest.compare(o1, o2);
+            }
+        });
+
+        long window = timestamp / 500 * 500;
+        assertThat(results, is(Arrays.asList(
+            KeyValue.pair("1@" + window, 1L),
+            KeyValue.pair("1@" + window, 2L),
+            KeyValue.pair("2@" + window, 1L),
+            KeyValue.pair("2@" + window, 2L),
+            KeyValue.pair("3@" + window, 1L),
+            KeyValue.pair("3@" + window, 2L),
+            KeyValue.pair("4@" + window, 1L),
+            KeyValue.pair("4@" + window, 2L),
+            KeyValue.pair("5@" + window, 1L),
+            KeyValue.pair("5@" + window, 2L)
+        )));
+
+    }
+
+
+    private void produceMessages(long timestamp)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()),
+            timestamp);
+    }
+
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput, 3, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private <K, V> List<KeyValue<K, V>> receiveMessages(final Deserializer<K>
+                                                            keyDeserializer,
+                                                        final Deserializer<V>
+                                                            valueDeserializer,
+                                                        final int numMessages)
+        throws InterruptedException {
+        final Properties consumerProperties = new Properties();
+        consumerProperties
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerProperties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kgroupedstream-test-" +
+                                                                       testNo);
+        consumerProperties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerProperties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       keyDeserializer.getClass().getName());
+        consumerProperties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       valueDeserializer.getClass().getName());
+        return IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerProperties,
+                                                                        outputTopic,
+                                                                        numMessages, 60 * 1000);
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
new file mode 100644
index 0000000..221d349
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -0,0 +1,565 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at <p> http://www.apache.org/licenses/LICENSE-2.0 <p> Unless required by
+ * applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+public class KStreamRepartitionJoinTest {
+
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
+        new EmbeddedSingleNodeKafkaCluster();
+
+    private static volatile int testNo = 0;
+
+    private KStreamBuilder builder;
+    private Properties streamsConfiguration;
+    private KStream<Long, Integer> streamOne;
+    private KStream<Integer, String> streamTwo;
+    private KStream<Integer, Integer> streamThree;
+    private KStream<Integer, String> streamFour;
+    private KTable<Integer, String> kTable;
+    private ValueJoiner<Integer, String, String> valueJoiner;
+    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>
+        keyMapper;
+
+    private final List<String>
+        expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
+    private KafkaStreams kafkaStreams;
+    private String streamOneInput;
+    private String streamTwoInput;
+    private String streamFourInput;
+    private String tableInput;
+    private String outputTopic;
+    private String streamThreeInput;
+
+
+
+    @Before
+    public void before() {
+        testNo++;
+        String applicationId = "kstream-repartition-join-test" + testNo;
+        builder = new KStreamBuilder();
+        createTopics();
+        streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
+                                 applicationId);
+        streamsConfiguration
+            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kstream-repartition-test");
+
+        streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
+        streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
+        streamThree = builder.stream(Serdes.Integer(), Serdes.Integer(), streamThreeInput);
+        streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
+
+        kTable = builder.table(Serdes.Integer(), Serdes.String(), tableInput);
+
+        valueJoiner = new ValueJoiner<Integer, String, String>() {
+            @Override
+            public String apply(final Integer value1, final String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+
+        keyMapper = new KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>>() {
+            @Override
+            public KeyValue<Integer, Integer> apply(final Long key, final Integer value) {
+                return new KeyValue<>(value, value);
+            }
+        };
+    }
+
+    @After
+    public void whenShuttingDown() throws IOException {
+        if (kafkaStreams != null) {
+            kafkaStreams.close();
+        }
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+    }
+
+    @Test
+    public void shouldMapStreamOneAndJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+        doJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldMapBothStreamsAndJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KStream<Integer, String> map2 = streamTwo.map(
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            });
+
+        doJoin(map1, map2);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+
+    }
+
+    @Test
+    public void shouldMapMapJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer> mapMapStream = streamOne.map(
+            new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
+                @Override
+                public KeyValue<Long, Integer> apply(Long key, Integer value) {
+                    return new KeyValue<>(key + value, value);
+                }
+            }).map(keyMapper);
+
+        doJoin(mapMapStream, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+
+    @Test
+    public void shouldSelectKeyAndJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            keySelected =
+            streamOne.selectKey(new KeyValueMapper<Long, Integer, Integer>() {
+                @Override
+                public Integer apply(final Long key, final Integer value) {
+                    return value;
+                }
+            });
+
+        doJoin(keySelected, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+
+    @Test
+    public void shouldFlatMapJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
+            new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
+                @Override
+                public Iterable<KeyValue<Integer, Integer>> apply(Long key,
+                                                                  Integer value) {
+                    return Collections.singletonList(new KeyValue<>(value, value));
+                }
+            });
+
+        doJoin(flatMapped, streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldJoinTwoStreamsPartitionedTheSame() throws Exception {
+        produceMessages();
+        doJoin(streamThree, streamTwo);
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
+    }
+
+    @Test
+    public void shouldJoinWithRhsStreamMapped() throws Exception {
+        produceMessages();
+
+        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
+            @Override
+            public String apply(String value1, Integer value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        streamTwo
+            .join(streamOne.map(keyMapper),
+                  joiner,
+                  JoinWindows.of("the-join").within(60 * 1000),
+                  Serdes.Integer(),
+                  Serdes.String(),
+                  Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"));
+    }
+
+    @Test
+    public void shouldLeftJoinTwoStreamsPartitionedTheSame() throws Exception {
+        produceMessages();
+        doLeftJoin(streamThree, streamTwo);
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("10:A", "20:B", "30:C", "40:D", "50:E"));
+    }
+
+    @Test
+    public void shouldMapStreamOneAndLeftJoin() throws ExecutionException, InterruptedException {
+        produceMessages();
+        doLeftJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    @Test
+    public void shouldMapBothStreamsAndLeftJoin() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KStream<Integer, String> map2 = streamTwo.map(
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            });
+
+        doLeftJoin(map1, map2);
+        startStreams();
+
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        if (!received.equals(expectedStreamOneTwoJoin)) {
+            produceToStreamOne();
+            verifyCorrectOutput(expectedStreamOneTwoJoin);
+        }
+
+    }
+
+    @Test
+    public void shouldLeftJoinWithRhsStreamMapped() throws Exception {
+        produceMessages();
+
+        ValueJoiner<String, Integer, String> joiner = new ValueJoiner<String, Integer, String>() {
+            @Override
+            public String apply(String value1, Integer value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        streamTwo
+            .leftJoin(streamOne.map(keyMapper),
+                      joiner,
+                      JoinWindows.of("the-join").within(60 * 1000),
+                      Serdes.Integer(),
+                      null,
+                      Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        List<String> expectedMessages = Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5");
+        if (!received.equals(expectedMessages)) {
+            produceStreamTwoInputTo(streamTwoInput);
+            verifyCorrectOutput(expectedMessages);
+        }
+    }
+
+    @Test
+    public void shouldLeftJoinWithKTableAfterMap() throws Exception {
+        produceMessages();
+        streamOne.map(keyMapper)
+            .leftJoin(kTable, valueJoiner, Serdes.Integer(), Serdes.Integer())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+        assertThat(received, is(expectedStreamOneTwoJoin));
+    }
+
+    @Test
+    public void shouldLeftJoinWithTableProducedFromGroupBy() throws Exception {
+        produceMessages();
+        KTable<Integer, String> aggTable =
+            streamOne.map(keyMapper)
+                .groupByKey(Serdes.Integer(), Serdes.Integer())
+                .aggregate(new Initializer<String>() {
+                    @Override
+                    public String apply() {
+                        return "";
+                    }
+                }, new Aggregator<Integer, Integer, String>() {
+                    @Override
+                    public String apply(final Integer aggKey, final Integer value,
+                                        final String aggregate) {
+                        return aggregate + ":" + value;
+                    }
+                }, Serdes.String(), "agg-by-key");
+
+        streamTwo.leftJoin(aggTable, new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + "@" + value2;
+            }
+        }, Serdes.Integer(), Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+
+        receiveMessages(new StringDeserializer(), 5);
+        produceStreamTwoInputTo(streamTwoInput);
+        List<String> received = receiveMessages(new StringDeserializer(), 5);
+
+        assertThat(received, is(Arrays.asList("A@:1", "B@:2", "C@:3", "D@:4", "E@:5")));
+
+    }
+
+
+    @Test
+    public void shouldJoinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
+        produceMessages();
+
+        final KStream<Integer, Integer>
+            map1 =
+            streamOne.map(keyMapper);
+
+        final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
+            kvMapper =
+            new KeyValueMapper<Integer, String, KeyValue<Integer, String>>() {
+                @Override
+                public KeyValue<Integer, String> apply(Integer key,
+                                                       String value) {
+                    return new KeyValue<>(key, value);
+                }
+            };
+
+        final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
+
+        final KStream<Integer, String> join = map1.join(map2,
+                                                        valueJoiner,
+                                                        JoinWindows.of("the-join")
+                                                            .within(60 * 1000),
+                                                        Serdes.Integer(),
+                                                        Serdes.Integer(),
+                                                        Serdes.String());
+
+        ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
+            @Override
+            public String apply(final String value1, final String value2) {
+                return value1 + ":" + value2;
+            }
+        };
+        join.map(kvMapper)
+            .join(streamFour.map(kvMapper),
+                  joiner,
+                  JoinWindows.of("the-other-join").within(60 * 1000),
+                  Serdes.Integer(),
+                  Serdes.String(),
+                  Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+
+        startStreams();
+        verifyCorrectOutput(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"));
+    }
+
+    @Test
+    public void shouldFilterNullKeysWhenRepartionedOnJoin() throws Exception {
+        produceMessages();
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOneInput,
+            Collections.singleton(
+                new KeyValue<Long, Integer>(70L, null)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                LongSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+
+        doJoin(streamOne.map(keyMapper), streamTwo);
+        startStreams();
+        verifyCorrectOutput(expectedStreamOneTwoJoin);
+    }
+
+    private void produceMessages()
+        throws ExecutionException, InterruptedException {
+        produceToStreamOne();
+        produceStreamTwoInputTo(streamTwoInput);
+        produceToStreamThree();
+        produceStreamTwoInputTo(tableInput);
+        produceStreamTwoInputTo(streamFourInput);
+
+    }
+
+    private void produceToStreamThree()
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamThreeInput,
+            Arrays.asList(
+                new KeyValue<>(1, 10),
+                new KeyValue<>(2, 20),
+                new KeyValue<>(3, 30),
+                new KeyValue<>(4, 40),
+                new KeyValue<>(5, 50)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+    }
+
+    private void produceStreamTwoInputTo(final String streamTwoInput)
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamTwoInput,
+            Arrays.asList(
+                new KeyValue<>(1, "A"),
+                new KeyValue<>(2, "B"),
+                new KeyValue<>(3, "C"),
+                new KeyValue<>(4, "D"),
+                new KeyValue<>(5, "E")),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                IntegerSerializer.class,
+                StringSerializer.class,
+                new Properties()));
+    }
+
+    private void produceToStreamOne()
+        throws ExecutionException, InterruptedException {
+        IntegrationTestUtils.produceKeyValuesSynchronously(
+            streamOneInput,
+            Arrays.asList(
+                new KeyValue<>(10L, 1),
+                new KeyValue<>(5L, 2),
+                new KeyValue<>(12L, 3),
+                new KeyValue<>(15L, 4),
+                new KeyValue<>(20L, 5)),
+            TestUtils.producerConfig(
+                CLUSTER.bootstrapServers(),
+                LongSerializer.class,
+                IntegerSerializer.class,
+                new Properties()));
+    }
+
+    private void createTopics() {
+        streamOneInput = "stream-one-" + testNo;
+        streamTwoInput = "stream-two-" + testNo;
+        streamThreeInput = "stream-three-" + testNo;
+        streamFourInput = "stream-four-" + testNo;
+        tableInput = "table-stream-two-" + testNo;
+        outputTopic = "output-" + testNo;
+        CLUSTER.createTopic(streamOneInput);
+        CLUSTER.createTopic(streamTwoInput, 2, 1);
+        CLUSTER.createTopic(streamThreeInput, 2, 1);
+        CLUSTER.createTopic(streamFourInput);
+        CLUSTER.createTopic(tableInput, 2, 1);
+        CLUSTER.createTopic(outputTopic);
+    }
+
+
+    private void startStreams() {
+        kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
+        kafkaStreams.start();
+    }
+
+
+    private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
+                                         final int numMessages) throws InterruptedException {
+
+        final Properties config = new Properties();
+
+        config
+            .setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test-" + testNo);
+        config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+                                       IntegerDeserializer.class.getName());
+        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+                                       valueDeserializer.getClass().getName());
+        List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
+                                                                                      outputTopic,
+                                                                                      numMessages,
+                                                                                      60 *
+                                                                                      1000);
+        Collections.sort(received);
+        return received;
+    }
+
+    private void verifyCorrectOutput(List<String> expectedMessages) throws InterruptedException {
+        assertThat(receiveMessages(new StringDeserializer(), expectedMessages.size()),
+                   is(expectedMessages));
+    }
+
+    private void doJoin(KStream<Integer, Integer> lhs,
+                        KStream<Integer, String> rhs) {
+        lhs.join(rhs,
+                 valueJoiner,
+                 JoinWindows.of("the-join").within(60 * 1000),
+                 Serdes.Integer(),
+                 Serdes.Integer(),
+                 Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+    }
+
+    private void doLeftJoin(KStream<Integer, Integer> lhs,
+                            KStream<Integer, String> rhs) {
+        lhs.leftJoin(rhs,
+                     valueJoiner,
+                     JoinWindows.of("the-join").within(60 * 1000),
+                     Serdes.Integer(),
+                     Serdes.Integer(),
+                     Serdes.String())
+            .to(Serdes.Integer(), Serdes.String(), outputTopic);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
index e00cd13..2966590 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -36,6 +36,8 @@ import static org.hamcrest.CoreMatchers.equalTo;
 import static org.junit.Assert.assertThat;
 
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
@@ -56,7 +58,7 @@ public class WordCountIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC, 2, 1);
         CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
     }
 
@@ -65,9 +67,9 @@ public class WordCountIntegrationTest {
         List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
         List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList(
             new KeyValue<>("hello", 1L),
+            new KeyValue<>("hello", 2L),
             new KeyValue<>("world", 1L),
             new KeyValue<>("world", 2L),
-            new KeyValue<>("hello", 2L),
             new KeyValue<>("world", 3L)
         );
 
@@ -101,12 +103,12 @@ public class WordCountIntegrationTest {
                 public Iterable<String> apply(String value) {
                     return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
                 }
-            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+            }).groupBy(new KeyValueMapper<String, String, String>() {
                 @Override
-                public KeyValue<String, String> apply(String key, String value) {
-                    return new KeyValue<String, String>(value, value);
+                public String apply(final String key, final String value) {
+                    return value;
                 }
-            }).countByKey("Counts")
+            }).count("Counts")
             .toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
@@ -139,6 +141,16 @@ public class WordCountIntegrationTest {
         consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
         List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig,
             DEFAULT_OUTPUT_TOPIC, expectedWordCounts.size());
+        Collections.sort(actualWordCounts, new Comparator<KeyValue<String, Long>>() {
+            @Override
+            public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
+                int keyComparison = o1.key.compareTo(o2.key);
+                if (keyComparison == 0) {
+                    return o1.value.compareTo(o2.value);
+                }
+                return keyComparison;
+            }
+        });
         streams.close();
         assertThat(actualWordCounts, equalTo(expectedWordCounts));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index c3f9089..83b431c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -134,10 +134,21 @@ public class IntegrationTestUtils {
     public static <K, V> void produceKeyValuesSynchronously(
         String topic, Collection<KeyValue<K, V>> records, Properties producerConfig)
         throws ExecutionException, InterruptedException {
+        produceKeyValuesSynchronouslyWithTimestamp(topic,
+                                                   records,
+                                                   producerConfig,
+                                                   null);
+    }
+
+    public static <K, V> void produceKeyValuesSynchronouslyWithTimestamp(String topic,
+                                                                         Collection<KeyValue<K, V>> records,
+                                                                         Properties producerConfig,
+                                                                         Long timestamp)
+        throws ExecutionException, InterruptedException {
         Producer<K, V> producer = new KafkaProducer<>(producerConfig);
         for (KeyValue<K, V> record : records) {
             Future<RecordMetadata> f = producer.send(
-                new ProducerRecord<>(topic, record.key, record.value));
+                new ProducerRecord<>(topic, null, timestamp, record.key, record.value));
             f.get();
         }
         producer.flush();
@@ -226,4 +237,5 @@ public class IntegrationTestUtils {
             Thread.sleep(Math.min(waitTime, 100L));
         }
     }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 65a4b54..1a608a7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -78,7 +78,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
+            .within(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
@@ -156,7 +157,8 @@ public class KStreamKStreamLeftJoinTest {
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
         stream2 = builder.stream(intSerde, stringSerde, topic2);
-        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test").within(100), intSerde, stringSerde);
+        joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of("test")
+            .within(100), intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
         Collection<Set<String>> copartitionGroups = builder.copartitionGroups();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index 2c6108b..8bc9a77 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -17,14 +17,12 @@
 
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
 import org.apache.kafka.test.TestUtils;
@@ -144,19 +142,5 @@ public class KStreamKTableLeftJoinTest {
         processor.checkAndClearProcessResult("0:XX0+null", "1:XX1+null", "2:XX2+YY2", "3:XX3+YY3");
     }
 
-    @Test(expected = KafkaException.class)
-    public void testNotJoinable() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        KStream<Integer, String> stream;
-        KTable<Integer, String> table;
-        MockProcessorSupplier<Integer, String> processor;
-
-        processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topic1).map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper());
-        table = builder.table(intSerde, stringSerde, topic2);
-
-        stream.leftJoin(table, MockValueJoiner.STRING_JOINER).process(processor);
-    }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index f4fe3a6..db533e4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -71,10 +71,12 @@ public class KStreamWindowAggregateTest {
 
             KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table2 =
-                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream1.groupByKey(strSerde,
+                                   strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);
@@ -149,20 +151,22 @@ public class KStreamWindowAggregateTest {
 
             KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
             KTable<Windowed<String>, String> table1 =
-                stream1.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream1.groupByKey(strSerde, strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic1-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
             table1.toStream().process(proc1);
 
             KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
             KTable<Windowed<String>, String> table2 =
-                stream2.aggregateByKey(MockInitializer.STRING_INIT, MockAggregator.STRING_ADDER,
-                    TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
-                    strSerde,
-                    strSerde);
+                stream2.groupByKey(strSerde, strSerde)
+                    .aggregate(MockInitializer.STRING_INIT,
+                               MockAggregator.STRING_ADDER,
+                               TimeWindows.of("topic2-Canonized", 10).advanceBy(5),
+                               strSerde);
 
             MockProcessorSupplier<Windowed<String>, String> proc2 = new MockProcessorSupplier<>();
             table2.toStream().process(proc2);

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
index 28acf09..107d832 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java
@@ -139,7 +139,7 @@ public class TopologyBuilderTest {
     @Test
     public void testSourceTopics() {
         final TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId("X");
         builder.addSource("source-1", "topic-1");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -150,7 +150,7 @@ public class TopologyBuilderTest {
         expected.add("topic-2");
         expected.add("X-topic-3");
 
-        assertEquals(expected, builder.sourceTopics("X"));
+        assertEquals(expected, builder.sourceTopics());
     }
 
     @Test
@@ -259,7 +259,7 @@ public class TopologyBuilderTest {
 
         builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), Collections.<String>emptySet()));
@@ -277,7 +277,7 @@ public class TopologyBuilderTest {
     @Test
     public void testTopicGroupsByStateStore() {
         final TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId("X");
         builder.addSource("source-1", "topic-1", "topic-1x");
         builder.addSource("source-2", "topic-2");
         builder.addSource("source-3", "topic-3");
@@ -297,7 +297,7 @@ public class TopologyBuilderTest {
         builder.addStateStore(supplier);
         builder.connectProcessorAndStateStores("processor-5", "store-3");
 
-        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups("X");
+        Map<Integer, TopicsInfo> topicGroups = builder.topicGroups();
 
         Map<Integer, TopicsInfo> expectedTopicGroups = new HashMap<>();
         expectedTopicGroups.put(0, new TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), Collections.<String>emptySet(), mkSet(ProcessorStateManager.storeChangelogTopic("X", "store-1"))));

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 17bda54..4f7037c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -269,9 +269,9 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithStates() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
-
+        String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
-
+        builder.setApplicationId(applicationId);
         builder.addSource("source1", "topic1");
         builder.addSource("source2", "topic2");
 
@@ -295,10 +295,11 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
@@ -474,8 +475,9 @@ public class StreamPartitionAssignorTest {
     @Test
     public void testAssignWithInternalTopics() throws Exception {
         StreamsConfig config = new StreamsConfig(configProps());
-
+        String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
         builder.addInternalTopic("topicX");
         builder.addSource("source1", "topic1");
         builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
@@ -489,10 +491,11 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, "test", client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
-        partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
         MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
         partitionAssignor.setInternalTopicManager(internalTopicManager);
 
@@ -501,13 +504,55 @@ public class StreamPartitionAssignorTest {
         subscriptions.put("consumer10",
                 new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
         assertEquals(1, internalTopicManager.readyTopics.size());
         assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX"));
     }
 
+    @Test
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+        StreamsConfig config = new StreamsConfig(configProps());
+        String applicationId = "test";
+        TopologyBuilder builder = new TopologyBuilder();
+        builder.setApplicationId(applicationId);
+        builder.addInternalTopic("topicX");
+        builder.addSource("source1", "topic1");
+        builder.addProcessor("processor1", new MockProcessorSupplier(), "source1");
+        builder.addSink("sink1", "topicX", "processor1");
+        builder.addSource("source2", "topicX");
+        builder.addInternalTopic("topicZ");
+        builder.addProcessor("processor2", new MockProcessorSupplier(), "source2");
+        builder.addSink("sink2", "topicZ", "processor2");
+        builder.addSource("source3", "topicZ");
+        List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ");
+        Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2);
+
+        UUID uuid1 = UUID.randomUUID();
+        String client1 = "client1";
+
+        MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+
+        StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
+        MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(clientSupplier.restoreConsumer);
+        partitionAssignor.setInternalTopicManager(internalTopicManager);
+
+        Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put("consumer10",
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+
+        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+
+        // check prepared internal topics
+        assertEquals(2, internalTopicManager.readyTopics.size());
+        assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
+    }
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/7d9d1cb2/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index fbe7754..1e1e3f4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -23,6 +23,7 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
@@ -106,7 +107,11 @@ public class SmokeTestClient extends SmokeTestUtil {
         data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
 
         // min
-        data.aggregateByKey(
+        KGroupedStream<String, Integer>
+            groupedData =
+            data.groupByKey(stringSerde, intSerde);
+
+        groupedData.aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MAX_VALUE;
@@ -119,7 +124,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-min"),
-                stringSerde,
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -129,7 +133,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
 
         // max
-        data.aggregateByKey(
+        groupedData.aggregate(
                 new Initializer<Integer>() {
                     public Integer apply() {
                         return Integer.MIN_VALUE;
@@ -142,7 +146,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("uwin-max"),
-                stringSerde,
                 intSerde
         ).toStream().map(
                 new Unwindow<String, Integer>()
@@ -152,7 +155,7 @@ public class SmokeTestClient extends SmokeTestUtil {
         maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
 
         // sum
-        data.aggregateByKey(
+        groupedData.aggregate(
                 new Initializer<Long>() {
                     public Long apply() {
                         return 0L;
@@ -165,7 +168,6 @@ public class SmokeTestClient extends SmokeTestUtil {
                     }
                 },
                 UnlimitedWindows.of("win-sum"),
-                stringSerde,
                 longSerde
         ).toStream().map(
                 new Unwindow<String, Long>()
@@ -176,10 +178,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
 
         // cnt
-        data.countByKey(
-                UnlimitedWindows.of("uwin-cnt"),
-                stringSerde
-        ).toStream().map(
+        groupedData.count(UnlimitedWindows.of("uwin-cnt"))
+            .toStream().map(
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
 
@@ -206,10 +206,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, doubleSerde, "avg");
 
         // windowed count
-        data.countByKey(
-                TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE),
-                stringSerde
-        ).toStream().map(
+        groupedData.count(TimeWindows.of("tumbling-win-cnt", WINDOW_SIZE))
+            .toStream().map(
                 new KeyValueMapper<Windowed<String>, Long, KeyValue<String, Long>>() {
                     @Override
                     public KeyValue<String, Long> apply(Windowed<String> key, Long value) {


Mime
View raw message