kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: MINOR: Improve Join integration test coverage, PART I
Date Fri, 12 Jan 2018 23:41:08 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3e21e17  MINOR: Improve Join integration test coverage, PART I
3e21e17 is described below

commit 3e21e17a7d3613bfab8c34555c0f598a07cf0675
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Fri Jan 12 15:40:59 2018 -0800

    MINOR: Improve Join integration test coverage, PART I
    
    0. Rename `JoinIntegrationTest` to `StreamStreamJoinIntegrationTest`, which is only for KStream-KStream joins.
    1. Extract the `AbstractJoinIntegrationTest` which is going to be used for all the join integration test classes, parameterized with and without caching.
    2. Merge `KStreamRepartitionJoinTest.java` into `StreamStreamJoinIntegrationTest.java` with augmented stream-stream join.
    3. Add `TableTableJoinIntegrationTest` with detailed per-step expected results and removed `KTableKTableJoinIntegrationTest`.
    
    Findings of the integration test:
    
    1. Confirmed KAFKA-4309 with caching turned on.
    2. Found bug KAFKA-6398.
    3. Found bug KAFKA-6443.
    4. Found a bug that in CachingKeyValueStore, we would flush before putting the record into the underlying store, when the store is going to be used in the downstream processors with flushing it would result in incorrect results, fixed the issue along with this PR.
    5. Consider a new optimization described in KAFKA-6286.
    
    Future works including stream-table joins will be in other PRs.
    
    Author: Guozhang Wang <wangguoz@gmail.com>
    
    Reviewers: Damian Guy <damian.guy@gmail.com>, Bill Bejeck <bill@confluent.io>
    
    Closes #4331 from guozhangwang/KMinor-join-integration-tests
---
 .../state/internals/CachingKeyValueStore.java      |   6 +-
 .../integration/AbstractJoinIntegrationTest.java   | 292 +++++++++++
 .../streams/integration/JoinIntegrationTest.java   | 400 ---------------
 .../integration/KStreamRepartitionJoinTest.java    | 386 ---------------
 .../KTableKTableJoinIntegrationTest.java           | 399 ---------------
 .../StreamStreamJoinIntegrationTest.java           | 262 ++++++++++
 .../integration/TableTableJoinIntegrationTest.java | 535 +++++++++++++++++++++
 .../java/org/apache/kafka/test/MockMapper.java     |  12 +
 8 files changed, 1104 insertions(+), 1188 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 9fff8cc..f9ab3f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -91,14 +91,14 @@ class CachingKeyValueStore<K, V> extends WrappedStateStore.AbstractStateStore im
         try {
             context.setRecordContext(entry.recordContext());
             if (flushListener != null) {
-
                 final V oldValue = sendOldValues ? serdes.valueFrom(underlying.get(entry.key())) : null;
+                underlying.put(entry.key(), entry.newValue());
                 flushListener.apply(serdes.keyFrom(entry.key().get()),
                                     serdes.valueFrom(entry.newValue()),
                                     oldValue);
-
+            } else {
+                underlying.put(entry.key(), entry.newValue());
             }
-            underlying.put(entry.key(), entry.newValue());
         } finally {
             context.setRecordContext(current);
         }
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
new file mode 100644
index 0000000..16d2611
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/AbstractJoinIntegrationTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestCondition;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.core.Is.is;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public abstract class AbstractJoinIntegrationTest {
+    @ClassRule
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
+
+    @Rule
+    public final TemporaryFolder testFolder = new TemporaryFolder(TestUtils.tempDirectory());
+
+    @Parameterized.Parameters(name = "caching enabled = {0}")
+    public static Collection<Object[]> data() {
+        List<Object[]> values = new ArrayList<>();
+        for (boolean cacheEnabled : Arrays.asList(true, false))
+            values.add(new Object[] {cacheEnabled});
+        return values;
+    }
+
+    static String appID;
+
+    private static final Long COMMIT_INTERVAL = 100L;
+    static final Properties STREAMS_CONFIG = new Properties();
+    static final String INPUT_TOPIC_RIGHT = "inputTopicRight";
+    static final String INPUT_TOPIC_LEFT = "inputTopicLeft";
+    static final String OUTPUT_TOPIC = "outputTopic";
+    private final long anyUniqueKey = 0L;
+
+    private final static Properties PRODUCER_CONFIG = new Properties();
+    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
+
+    private KafkaProducer<Long, String> producer;
+    private KafkaStreams streams;
+
+    StreamsBuilder builder;
+    int numRecordsExpected = 0;
+    AtomicBoolean finalResultReached = new AtomicBoolean(false);
+
+    private final List<Input<String>> input = Arrays.asList(
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, "A"),
+            new Input<>(INPUT_TOPIC_RIGHT, "a"),
+            new Input<>(INPUT_TOPIC_LEFT, "B"),
+            new Input<>(INPUT_TOPIC_RIGHT, "b"),
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, "C"),
+            new Input<>(INPUT_TOPIC_RIGHT, "c"),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_LEFT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, (String) null),
+            new Input<>(INPUT_TOPIC_RIGHT, "d"),
+            new Input<>(INPUT_TOPIC_LEFT, "D")
+    );
+
+    final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
+        @Override
+        public String apply(final String value1, final String value2) {
+            return value1 + "-" + value2;
+        }
+    };
+
+    final boolean cacheEnabled;
+
+    AbstractJoinIntegrationTest(boolean cacheEnabled) {
+        this.cacheEnabled = cacheEnabled;
+    }
+
+    @BeforeClass
+    public static void setupConfigsAndUtils() {
+        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
+        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
+        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, appID + "-result-consumer");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
+        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL);
+    }
+
+    void prepareEnvironment() throws InterruptedException {
+        CLUSTER.createTopics(INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+
+        if (!cacheEnabled)
+            STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
+
+        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, testFolder.getRoot().getPath());
+
+        producer = new KafkaProducer<>(PRODUCER_CONFIG);
+    }
+
+    @After
+    public void cleanup() throws InterruptedException {
+        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_LEFT, INPUT_TOPIC_RIGHT, OUTPUT_TOPIC);
+    }
+
+    private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
+        final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
+        assertThat(result, is(expectedResult));
+    }
+
+    private void checkResult(final String outputTopic, final String expectedFinalResult, final int expectedTotalNumRecords) throws InterruptedException {
+        final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedTotalNumRecords, 30 * 1000L);
+        assertThat(result.get(result.size() - 1), is(expectedFinalResult));
+    }
+
+    /*
+     * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
+     * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
+     */
+    void runTest(final List<List<String>> expectedResult) throws Exception {
+        runTest(expectedResult, null);
+    }
+
+
+    /*
+     * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
+     * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
+     */
+    void runTest(final List<List<String>> expectedResult, final String storeName) throws Exception {
+        assert expectedResult.size() == input.size();
+
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+
+        String expectedFinalResult = null;
+
+        try {
+            streams.start();
+
+            long ts = System.currentTimeMillis();
+
+            final Iterator<List<String>> resultIterator = expectedResult.iterator();
+            for (final Input<String> singleInput : input) {
+                producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
+
+                List<String> expected = resultIterator.next();
+
+                if (expected != null) {
+                    checkResult(OUTPUT_TOPIC, expected);
+                    expectedFinalResult = expected.get(expected.size() - 1);
+                }
+            }
+
+            if (storeName != null) {
+                checkQueryableStore(storeName, expectedFinalResult);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    /*
+     * Runs the actual test. Checks the final result only after expected number of records have been consumed.
+     */
+    void runTest(final String expectedFinalResult) throws Exception {
+        runTest(expectedFinalResult, null);
+    }
+
+    /*
+     * Runs the actual test. Checks the final result only after expected number of records have been consumed.
+     */
+    void runTest(final String expectedFinalResult, final String storeName) throws Exception {
+        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
+        streams = new KafkaStreams(builder.build(), new StreamsConfig(STREAMS_CONFIG));
+
+        try {
+            streams.start();
+
+            long ts = System.currentTimeMillis();
+
+            for (final Input<String> singleInput : input) {
+                producer.send(new ProducerRecord<>(singleInput.topic, null, ++ts, singleInput.record.key, singleInput.record.value)).get();
+            }
+
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    return finalResultReached.get();
+                }
+            }, "Never received expected final result.");
+
+            checkResult(OUTPUT_TOPIC, expectedFinalResult, numRecordsExpected);
+
+            if (storeName != null) {
+                checkQueryableStore(storeName, expectedFinalResult);
+            }
+        } finally {
+            streams.close();
+        }
+    }
+
+    /*
+     * Checks the embedded queryable state store snapshot
+     */
+    private void checkQueryableStore(final String queryableName, final String expectedFinalResult) {
+        final ReadOnlyKeyValueStore<Long, String> store = streams.store(queryableName, QueryableStoreTypes.<Long, String>keyValueStore());
+
+        final KeyValueIterator<Long, String> all = store.all();
+        final KeyValue<Long, String> onlyEntry = all.next();
+
+        try {
+            assertThat(onlyEntry.key, is(anyUniqueKey));
+            assertThat(onlyEntry.value, is(expectedFinalResult));
+            assertThat(all.hasNext(), is(false));
+        } finally {
+            all.close();
+        }
+    }
+
+    private final class Input<V> {
+        String topic;
+        KeyValue<Long, V> record;
+
+        Input(final String topic, final V value) {
+            this.topic = topic;
+            record = KeyValue.pair(anyUniqueKey, value);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
deleted file mode 100644
index faa581b..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ /dev/null
@@ -1,400 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-/**
- * Tests all available joins of Kafka Streams DSL.
- */
-@Category({IntegrationTest.class})
-public class JoinIntegrationTest {
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
-
-    private static final String APP_ID = "join-integration-test";
-    private static final String INPUT_TOPIC_1 = "inputTopicLeft";
-    private static final String INPUT_TOPIC_2 = "inputTopicRight";
-    private static final String OUTPUT_TOPIC = "outputTopic";
-
-    private final static Properties PRODUCER_CONFIG = new Properties();
-    private final static Properties RESULT_CONSUMER_CONFIG = new Properties();
-    private final static Properties STREAMS_CONFIG = new Properties();
-
-    private StreamsBuilder builder;
-    private KStream<Long, String> leftStream;
-    private KStream<Long, String> rightStream;
-    private KTable<Long, String> leftTable;
-    private KTable<Long, String> rightTable;
-
-    private final List<Input<String>> input = Arrays.asList(
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, "A"),
-        new Input<>(INPUT_TOPIC_2, "a"),
-        new Input<>(INPUT_TOPIC_1, "B"),
-        new Input<>(INPUT_TOPIC_2, "b"),
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, "C"),
-        new Input<>(INPUT_TOPIC_2, "c"),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_1, (String) null),
-        new Input<>(INPUT_TOPIC_2, (String) null),
-        new Input<>(INPUT_TOPIC_2, "d"),
-        new Input<>(INPUT_TOPIC_1, "D")
-    );
-
-    private final ValueJoiner<String, String, String> valueJoiner = new ValueJoiner<String, String, String>() {
-        @Override
-        public String apply(final String value1, final String value2) {
-            return value1 + "-" + value2;
-        }
-    };
-
-    @BeforeClass
-    public static void setupConfigsAndUtils() {
-        PRODUCER_CONFIG.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        PRODUCER_CONFIG.put(ProducerConfig.ACKS_CONFIG, "all");
-        PRODUCER_CONFIG.put(ProducerConfig.RETRIES_CONFIG, 0);
-        PRODUCER_CONFIG.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
-        PRODUCER_CONFIG.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, APP_ID + "-result-consumer");
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
-        RESULT_CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-
-        STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        STREAMS_CONFIG.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        STREAMS_CONFIG.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
-        STREAMS_CONFIG.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        STREAMS_CONFIG.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        STREAMS_CONFIG.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-    }
-
-    @Before
-    public void prepareTopology() throws InterruptedException {
-        CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
-
-        builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_1);
-        rightTable = builder.table(INPUT_TOPIC_2);
-        leftStream = leftTable.toStream();
-        rightStream = rightTable.toStream();
-    }
-
-    @After
-    public void cleanup() throws InterruptedException {
-        CLUSTER.deleteTopicsAndWait(120000, INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
-    }
-
-    private void checkResult(final String outputTopic, final List<String> expectedResult) throws InterruptedException {
-        if (expectedResult != null) {
-            final List<String> result = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(RESULT_CONSUMER_CONFIG, outputTopic, expectedResult.size(), 30 * 1000L);
-            assertThat(result, is(expectedResult));
-        }
-    }
-
-    /*
-     * Runs the actual test. Checks the result after each input record to ensure fixed processing order.
-     * If an input tuple does not trigger any result, "expectedResult" should contain a "null" entry
-     */
-    private void runTest(final List<List<String>> expectedResult) throws Exception {
-        assert expectedResult.size() == input.size();
-
-        IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
-        final KafkaStreams streams = new KafkaStreams(builder.build(), STREAMS_CONFIG);
-        try {
-            streams.start();
-
-            long ts = System.currentTimeMillis();
-
-            final Iterator<List<String>> resultIterator = expectedResult.iterator();
-            for (final Input<String> singleInput : input) {
-                IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(singleInput.topic, Collections.singleton(singleInput.record), PRODUCER_CONFIG, ++ts);
-                checkResult(OUTPUT_TOPIC, resultIterator.next());
-            }
-        } finally {
-            streams.close();
-        }
-    }
-
-    @Test
-    public void testInnerKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testOuterKStreamKStream() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KStream-KStream");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Arrays.asList("A-b", "B-b"),
-            null,
-            null,
-            Arrays.asList("C-a", "C-b"),
-            Arrays.asList("A-c", "B-c", "C-c"),
-            null,
-            null,
-            null,
-            Arrays.asList("A-d", "B-d", "C-d"),
-            Arrays.asList("D-a", "D-b", "D-c", "D-d")
-        );
-
-        leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testInnerKStreamKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KStream-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("B-a"),
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftStream.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKStreamKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KStream-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            null,
-            Collections.singletonList("B-a"),
-            null,
-            null,
-            null,
-            Collections.singletonList("C-null"),
-            null,
-            null,
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftStream.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testInnerKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-inner-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            null,
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            Collections.singletonList("C-c"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.join(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testLeftKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-left-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList((String) null),
-            null,
-            Collections.singletonList("C-null"),
-            Collections.singletonList("C-c"),
-            Collections.singletonList("C-null"),
-            Collections.singletonList((String) null),
-            null,
-            null,
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.leftJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    @Test
-    public void testOuterKTableKTable() throws Exception {
-        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + "-outer-KTable-KTable");
-
-        final List<List<String>> expectedResult = Arrays.asList(
-            null,
-            null,
-            Collections.singletonList("A-null"),
-            Collections.singletonList("A-a"),
-            Collections.singletonList("B-a"),
-            Collections.singletonList("B-b"),
-            Collections.singletonList("null-b"),
-            Collections.singletonList((String) null),
-            Collections.singletonList("C-null"),
-            Collections.singletonList("C-c"),
-            Collections.singletonList("C-null"),
-            Collections.singletonList((String) null),
-            null,
-            Collections.singletonList("null-d"),
-            Collections.singletonList("D-d")
-        );
-
-        leftTable.outerJoin(rightTable, valueJoiner).to(OUTPUT_TOPIC);
-
-        runTest(expectedResult);
-    }
-
-    private final class Input<V> {
-        String topic;
-        KeyValue<Long, V> record;
-
-        private final long anyUniqueKey = 0L;
-
-        Input(final String topic, final V value) {
-            this.topic = topic;
-            record = KeyValue.pair(anyUniqueKey, value);
-        }
-    }
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
deleted file mode 100644
index 32546de..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ /dev/null
@@ -1,386 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongSerializer;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.Consumed;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.JoinWindows;
-import org.apache.kafka.streams.kstream.Joined;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KeyValueMapper;
-import org.apache.kafka.streams.kstream.Predicate;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.MockMapper;
-import org.apache.kafka.test.MockValueJoiner;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.core.Is.is;
-
-@Category({IntegrationTest.class})
-public class KStreamRepartitionJoinTest {
-
-    private static final int NUM_BROKERS = 1;
-    private static final long COMMIT_INTERVAL_MS = 300L;
-
-    @ClassRule
-    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = MockValueJoiner.instance(":");
-    private final MockTime mockTime = CLUSTER.time;
-    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
-
-    private StreamsBuilder builder;
-    private Properties streamsConfiguration;
-    private KStream<Long, Integer> streamOne;
-    private KStream<Integer, String> streamTwo;
-    private KStream<Integer, String> streamFour;
-    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
-
-    private final List<String>
-        expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");
-    private KafkaStreams kafkaStreams;
-    private String streamOneInput;
-    private String streamTwoInput;
-    private String streamFourInput;
-    private static volatile int testNo = 0;
-
-    @Before
-    public void before() throws InterruptedException {
-        testNo++;
-        String applicationId = "kstream-repartition-join-test-" + testNo;
-        builder = new StreamsBuilder();
-        createTopics();
-        streamsConfiguration = new Properties();
-        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, COMMIT_INTERVAL_MS);
-        streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
-        streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-
-        streamOne = builder.stream(streamOneInput, Consumed.with(Serdes.Long(), Serdes.Integer()));
-        streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String()));
-        streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String()));
-
-        keyMapper = MockMapper.selectValueKeyValueMapper();
-    }
-
-    @After
-    public void whenShuttingDown() throws IOException {
-        if (kafkaStreams != null) {
-            kafkaStreams.close();
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
-    }
-
-    @Test
-    public void shouldCorrectlyRepartitionOnJoinOperationsWithZeroSizedCache() throws Exception {
-        verifyRepartitionOnJoinOperations(0);
-    }
-
-    @Test
-    public void shouldCorrectlyRepartitionOnJoinOperationsWithNonZeroSizedCache() throws Exception {
-        verifyRepartitionOnJoinOperations(10 * 1024 * 1024);
-    }
-
-    private void verifyRepartitionOnJoinOperations(final int cacheSizeBytes) throws Exception {
-        streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
-        produceMessages();
-        final ExpectedOutputOnTopic mapOne = mapStreamOneAndJoin();
-        final ExpectedOutputOnTopic mapBoth = mapBothStreamsAndJoin();
-        final ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
-        final ExpectedOutputOnTopic selectKeyJoin = selectKeyAndJoin();
-        final ExpectedOutputOnTopic flatMapJoin = flatMapJoin();
-        final ExpectedOutputOnTopic mapRhs = joinMappedRhsStream();
-        final ExpectedOutputOnTopic mapJoinJoin = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
-        final ExpectedOutputOnTopic leftJoin = mapBothStreamsAndLeftJoin();
-
-        startStreams();
-
-        verifyCorrectOutput(mapOne);
-        verifyCorrectOutput(mapBoth);
-        verifyCorrectOutput(mapMapJoin);
-        verifyCorrectOutput(selectKeyJoin);
-        verifyCorrectOutput(flatMapJoin);
-        verifyCorrectOutput(mapRhs);
-        verifyCorrectOutput(mapJoinJoin);
-        verifyCorrectOutput(leftJoin);
-    }
-
-    private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
-        String mapOneStreamAndJoinOutput = "map-one-join-output-" + testNo;
-        doJoin(streamOne.map(keyMapper), streamTwo, mapOneStreamAndJoinOutput);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, mapOneStreamAndJoinOutput);
-    }
-
-    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
-
-        doJoin(map1, map2, "map-both-streams-and-join-" + testNo);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
-    }
-
-    private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException {
-        final KStream<Integer, Integer> mapMapStream = streamOne.map(
-            new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() {
-                @Override
-                public KeyValue<Long, Integer> apply(final Long key, final Integer value) {
-                    if (value == null) {
-                        return new KeyValue<>(null, null);
-                    }
-                    return new KeyValue<>(key + value, value);
-                }
-            }).map(keyMapper);
-
-        final String outputTopic = "map-map-join-" + testNo;
-        doJoin(mapMapStream, streamTwo, outputTopic);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
-    }
-
-    private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception {
-
-        final KStream<Integer, Integer> keySelected =
-            streamOne.selectKey(MockMapper.<Long, Integer>selectValueMapper());
-
-        final String outputTopic = "select-key-join-" + testNo;
-        doJoin(keySelected, streamTwo, outputTopic);
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
-    }
-
-    private ExpectedOutputOnTopic flatMapJoin() throws InterruptedException {
-        final KStream<Integer, Integer> flatMapped = streamOne.flatMap(
-            new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() {
-                @Override
-                public Iterable<KeyValue<Integer, Integer>> apply(final Long key, final Integer value) {
-                    return Collections.singletonList(new KeyValue<>(value, value));
-                }
-            });
-
-        final String outputTopic = "flat-map-join-" + testNo;
-        doJoin(flatMapped, streamTwo, outputTopic);
-
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
-    }
-
-    private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException {
-
-        final String output = "join-rhs-stream-mapped-" + testNo;
-        CLUSTER.createTopic(output);
-        streamTwo
-            .join(streamOne.map(keyMapper),
-                TOSTRING_JOINER,
-                getJoinWindow(),
-                Joined.with(Serdes.Integer(), Serdes.String(), Serdes.Integer()))
-            .to(Serdes.Integer(), Serdes.String(), output);
-
-        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), output);
-    }
-
-    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-
-        final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper());
-
-
-        final String outputTopic = "left-join-" + testNo;
-        CLUSTER.createTopic(outputTopic);
-        map1.leftJoin(map2,
-            TOSTRING_JOINER,
-            getJoinWindow(),
-            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()))
-            .filterNot(new Predicate<Integer, String>() {
-                @Override
-                public boolean test(Integer key, String value) {
-                    // filter not left-only join results
-                    return value.substring(2).equals("null");
-                }
-            })
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-
-        return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, outputTopic);
-    }
-
-    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws InterruptedException {
-        final KStream<Integer, Integer> map1 = streamOne.map(keyMapper);
-
-        final KeyValueMapper<Integer, String, KeyValue<Integer, String>>
-            kvMapper = MockMapper.noOpKeyValueMapper();
-
-        final KStream<Integer, String> map2 = streamTwo.map(kvMapper);
-
-        final KStream<Integer, String> join = map1.join(map2,
-            TOSTRING_JOINER,
-            getJoinWindow(),
-            Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()));
-
-        final String topic = "map-join-join-" + testNo;
-        CLUSTER.createTopic(topic);
-        join.map(kvMapper)
-            .join(streamFour.map(kvMapper),
-                TOSTRING_JOINER,
-                getJoinWindow(),
-                Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()))
-            .to(Serdes.Integer(), Serdes.String(), topic);
-
-
-        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), topic);
-    }
-
-    private JoinWindows getJoinWindow() {
-        return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
-    }
-
-
-    private class ExpectedOutputOnTopic {
-        private final List<String> expectedOutput;
-        private final String outputTopic;
-
-        ExpectedOutputOnTopic(final List<String> expectedOutput, final String outputTopic) {
-            this.expectedOutput = expectedOutput;
-            this.outputTopic = outputTopic;
-        }
-    }
-
-    private void verifyCorrectOutput(final ExpectedOutputOnTopic expectedOutputOnTopic)
-        throws InterruptedException {
-        assertThat(receiveMessages(new StringDeserializer(),
-            expectedOutputOnTopic.expectedOutput.size(),
-            expectedOutputOnTopic.outputTopic),
-            is(expectedOutputOnTopic.expectedOutput));
-    }
-
-    private void produceMessages() throws Exception {
-        produceToStreamOne();
-        produceStreamTwoInputTo(streamTwoInput);
-        produceStreamTwoInputTo(streamFourInput);
-
-    }
-
-    private void produceStreamTwoInputTo(final String streamTwoInput) throws Exception {
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamTwoInput,
-            Arrays.asList(
-                new KeyValue<>(1, "A"),
-                new KeyValue<>(2, "B"),
-                new KeyValue<>(3, "C"),
-                new KeyValue<>(4, "D"),
-                new KeyValue<>(5, "E")),
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                IntegerSerializer.class,
-                StringSerializer.class,
-                new Properties()),
-            mockTime);
-    }
-
-    private void produceToStreamOne() throws Exception {
-        IntegrationTestUtils.produceKeyValuesSynchronously(
-            streamOneInput,
-            Arrays.asList(
-                new KeyValue<>(10L, 1),
-                new KeyValue<>(5L, 2),
-                new KeyValue<>(12L, 3),
-                new KeyValue<>(15L, 4),
-                new KeyValue<>(20L, 5),
-                new KeyValue<Long, Integer>(70L, null)), // nulls should be filtered
-            TestUtils.producerConfig(
-                CLUSTER.bootstrapServers(),
-                LongSerializer.class,
-                IntegerSerializer.class,
-                new Properties()),
-            mockTime);
-    }
-
-    private void createTopics() throws InterruptedException {
-        streamOneInput = "stream-one-" + testNo;
-        streamTwoInput = "stream-two-" + testNo;
-        streamFourInput = "stream-four-" + testNo;
-        CLUSTER.createTopic(streamOneInput, 2, 1);
-        CLUSTER.createTopic(streamTwoInput, 2, 1);
-        CLUSTER.createTopic(streamFourInput, 2, 1);
-    }
-
-
-    private void startStreams() {
-        kafkaStreams = new KafkaStreams(builder.build(), streamsConfiguration);
-        kafkaStreams.start();
-    }
-
-
-    private List<String> receiveMessages(final Deserializer<?> valueDeserializer,
-                                         final int numMessages, final String topic) throws InterruptedException {
-
-        final Properties config = new Properties();
-
-        config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "kstream-test");
-        config.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        config.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-            IntegerDeserializer.class.getName());
-        config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-            valueDeserializer.getClass().getName());
-        final List<String> received = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(
-            config,
-            topic,
-            numMessages,
-            60 * 1000);
-        Collections.sort(received);
-
-        return received;
-    }
-
-    private void doJoin(final KStream<Integer, Integer> lhs,
-                        final KStream<Integer, String> rhs,
-                        final String outputTopic) throws InterruptedException {
-        CLUSTER.createTopic(outputTopic);
-        lhs.join(rhs,
-                 TOSTRING_JOINER,
-                 getJoinWindow(),
-                 Joined.with(Serdes.Integer(), Serdes.Integer(), Serdes.String()))
-            .to(Serdes.Integer(), Serdes.String(), outputTopic);
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
deleted file mode 100644
index a12ffac..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.integration;
-
-import kafka.utils.MockTime;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
-import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
-import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.ValueJoiner;
-import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.KeyValueStore;
-import org.apache.kafka.streams.state.QueryableStoreTypes;
-import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
-import org.apache.kafka.test.IntegrationTest;
-import org.apache.kafka.test.TestUtils;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-
-import static org.hamcrest.CoreMatchers.equalTo;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-@Category({IntegrationTest.class})
-public class KTableKTableJoinIntegrationTest {
-    private final static int NUM_BROKERS = 1;
-
-    @ClassRule
-    public final static EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
-    private final static MockTime MOCK_TIME = CLUSTER.time;
-    private final static String TABLE_1 = "table1";
-    private final static String TABLE_2 = "table2";
-    private final static String TABLE_3 = "table3";
-    private final static String OUTPUT = "output-";
-    private static Properties streamsConfig;
-    private KafkaStreams streams;
-    private final static Properties CONSUMER_CONFIG = new Properties();
-
-    @BeforeClass
-    public static void beforeTest() throws Exception {
-        CLUSTER.createTopics(TABLE_1, TABLE_2, TABLE_3, OUTPUT);
-
-        streamsConfig = new Properties();
-        streamsConfig.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfig.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfig.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
-        streamsConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfig.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
-        streamsConfig.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
-        streamsConfig.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
-        streamsConfig.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-
-        final Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
-        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
-        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-        final List<KeyValue<String, String>> table1 = Arrays.asList(
-            new KeyValue<>("a", "A1"),
-            new KeyValue<>("b", "B1")
-        );
-
-        final List<KeyValue<String, String>> table2 = Arrays.asList(
-            new KeyValue<>("b", "B2"),
-            new KeyValue<>("c", "C2")
-        );
-
-        final List<KeyValue<String, String>> table3 = Arrays.asList(
-            new KeyValue<>("a", "A3"),
-            new KeyValue<>("b", "B3"),
-            new KeyValue<>("c", "C3")
-        );
-
-        // put table 3 first, to make sure data is there when joining T1 with T2
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_3, table3, producerConfig, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_1, table1, producerConfig, MOCK_TIME);
-        IntegrationTestUtils.produceKeyValuesSynchronously(TABLE_2, table2, producerConfig, MOCK_TIME);
-
-        CONSUMER_CONFIG.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        CONSUMER_CONFIG.put(ConsumerConfig.GROUP_ID_CONFIG, "ktable-ktable-consumer");
-        CONSUMER_CONFIG.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-        CONSUMER_CONFIG.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-    }
-
-    @Before
-    public void before() throws IOException {
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
-    }
-
-    @After
-    public void after() throws IOException {
-        if (streams != null) {
-            streams.close();
-            streams = null;
-        }
-        IntegrationTestUtils.purgeLocalStreamsState(streamsConfig);
-    }
-
-    private enum JoinType {
-        INNER, LEFT, OUTER
-    }
-
-
-    @Test
-    public void shouldInnerInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.INNER, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldInnerLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.LEFT, Collections.singletonList(new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldInnerOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldInnerOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.INNER, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.INNER, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.LEFT, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldLeftOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3")), false);
-    }
-
-    @Test
-    public void shouldLeftOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.LEFT, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3")), true);
-    }
-
-    @Test
-    public void shouldOuterInnerJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterInnerJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.INNER, Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-    @Test
-    public void shouldOuterLeftJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterLeftJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.LEFT,  Arrays.asList(
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-    @Test
-    public void shouldOuterOuterJoin() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
-                new KeyValue<>("a", "null-A3"),
-                new KeyValue<>("b", "null-B3"),
-                new KeyValue<>("c", "null-C3"),
-                new KeyValue<>("a", "A1-null-A3"),
-                new KeyValue<>("b", "B1-null-B3"),
-                new KeyValue<>("b", "B1-B2-B3"),
-                new KeyValue<>("c", "null-C2-C3")), false);
-    }
-
-    @Test
-    public void shouldOuterOuterJoinQueryable() throws InterruptedException {
-        verifyKTableKTableJoin(JoinType.OUTER, JoinType.OUTER, Arrays.asList(
-            new KeyValue<>("a", "null-A3"),
-            new KeyValue<>("b", "null-B3"),
-            new KeyValue<>("c", "null-C3"),
-            new KeyValue<>("a", "A1-null-A3"),
-            new KeyValue<>("b", "B1-null-B3"),
-            new KeyValue<>("b", "B1-B2-B3"),
-            new KeyValue<>("c", "null-C2-C3")), true);
-    }
-
-
-    private void verifyKTableKTableJoin(final JoinType joinType1,
-                                        final JoinType joinType2,
-                                        final List<KeyValue<String, String>> expectedResult,
-                                        boolean verifyQueryableState) throws InterruptedException {
-        final String queryableName = verifyQueryableState ? joinType1 + "-" + joinType2 + "-ktable-ktable-join-query" : null;
-        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, joinType1 + "-" + joinType2 + "-ktable-ktable-join" + queryableName);
-
-        streams = prepareTopology(joinType1, joinType2, queryableName);
-        streams.start();
-
-        final List<KeyValue<String, String>> result = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(
-                CONSUMER_CONFIG,
-                OUTPUT,
-                expectedResult.size());
-
-        assertThat(result, equalTo(expectedResult));
-
-        if (verifyQueryableState) {
-            verifyKTableKTableJoinQueryableState(joinType1, joinType2, expectedResult);
-        }
-    }
-
-    private void verifyKTableKTableJoinQueryableState(final JoinType joinType1,
-                                                      final JoinType joinType2,
-                                                      final List<KeyValue<String, String>> expectedResult) {
-        final String queryableName = joinType1 + "-" + joinType2 + "-ktable-ktable-join-query";
-        final ReadOnlyKeyValueStore<String, String> myJoinStore = streams.store(queryableName,
-            QueryableStoreTypes.<String, String>keyValueStore());
-
-        // store only keeps last set of values, not entire stream of value changes
-        final Map<String, String> expectedInStore = new HashMap<>();
-        for (KeyValue<String, String> expected : expectedResult) {
-            expectedInStore.put(expected.key, expected.value);
-        }
-
-        for (Map.Entry<String, String> expected : expectedInStore.entrySet()) {
-            assertEquals(expected.getValue(), myJoinStore.get(expected.getKey()));
-        }
-        final KeyValueIterator<String, String> all = myJoinStore.all();
-        while (all.hasNext()) {
-            KeyValue<String, String> storeEntry = all.next();
-            assertTrue(expectedResult.contains(storeEntry));
-        }
-        all.close();
-
-    }
-
-    private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        final KTable<String, String> table1 = builder.table(TABLE_1);
-        final KTable<String, String> table2 = builder.table(TABLE_2);
-        final KTable<String, String> table3 = builder.table(TABLE_3);
-
-        Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null;
-        if (queryableName != null) {
-            materialized = Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(queryableName)
-                    .withKeySerde(Serdes.String())
-                    .withValueSerde(Serdes.String())
-                    .withCachingDisabled();
-        }
-        join(join(table1, table2, joinType1, null /* no need to query intermediate result */), table3,
-            joinType2, materialized).to(OUTPUT);
-
-        return new KafkaStreams(builder.build(), new StreamsConfig(streamsConfig));
-    }
-
-    private KTable<String, String> join(final KTable<String, String> first,
-                                        final KTable<String, String> second,
-                                        final JoinType joinType,
-                                        final Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized) {
-        final ValueJoiner<String, String, String> joiner = new ValueJoiner<String, String, String>() {
-            @Override
-            public String apply(final String value1, final String value2) {
-                return value1 + "-" + value2;
-            }
-        };
-
-
-        switch (joinType) {
-            case INNER:
-                if (materialized != null) {
-                    return first.join(second, joiner, materialized);
-                } else {
-                    return first.join(second, joiner);
-                }
-            case LEFT:
-                if (materialized != null) {
-                    return first.leftJoin(second, joiner, materialized);
-                } else {
-                    return first.leftJoin(second, joiner);
-                }
-            case OUTER:
-                if (materialized != null) {
-                    return first.outerJoin(second, joiner, materialized);
-                } else {
-                    return first.outerJoin(second, joiner);
-                }
-        }
-
-        throw new RuntimeException("Unknown join type.");
-    }
-
-}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
new file mode 100644
index 0000000..571dc05
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/StreamStreamJoinIntegrationTest.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.JoinWindows;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.MockMapper;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class StreamStreamJoinIntegrationTest extends AbstractJoinIntegrationTest {
+    private KStream<Long, String> leftStream;
+    private KStream<Long, String> rightStream;
+
+    public StreamStreamJoinIntegrationTest(boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+
+        appID = "stream-stream-join-integration-test";
+
+        builder = new StreamsBuilder();
+        leftStream = builder.stream(INPUT_TOPIC_LEFT);
+        rightStream = builder.stream(INPUT_TOPIC_RIGHT);
+    }
+
+    @Test
+    public void testInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            null,
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testInnerRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-repartitioned");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .join(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
+                                 .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+                       valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.leftJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testLeftRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left-repartitioned");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList("A-null"),
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .leftJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
+                                     .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+                        valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+            null,
+            null,
+            Collections.singletonList("A-null"),
+            Collections.singletonList("A-a"),
+            Collections.singletonList("B-a"),
+            Arrays.asList("A-b", "B-b"),
+            null,
+            null,
+            Arrays.asList("C-a", "C-b"),
+            Arrays.asList("A-c", "B-c", "C-c"),
+            null,
+            null,
+            null,
+            Arrays.asList("A-d", "B-d", "C-d"),
+            Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.outerJoin(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testOuterRepartitioned() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                Collections.singletonList("A-null"),
+                Collections.singletonList("A-a"),
+                Collections.singletonList("B-a"),
+                Arrays.asList("A-b", "B-b"),
+                null,
+                null,
+                Arrays.asList("C-a", "C-b"),
+                Arrays.asList("A-c", "B-c", "C-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d", "B-d", "C-d"),
+                Arrays.asList("D-a", "D-b", "D-c", "D-d")
+        );
+
+        leftStream.map(MockMapper.<Long, String>noOpKeyValueMapper())
+                .outerJoin(rightStream.flatMap(MockMapper.<Long, String>noOpFlatKeyValueMapper())
+                                .selectKey(MockMapper.<Long, String>selectKeyKeyValueMapper()),
+                        valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+
+    @Test
+    public void testMultiInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-multi-inner");
+
+        final List<List<String>> expectedResult = Arrays.asList(
+                null,
+                null,
+                null,
+                Collections.singletonList("A-a-a"),
+                Collections.singletonList("B-a-a"),
+                Arrays.asList("A-b-a", "B-b-a", "A-a-b", "B-a-b", "A-b-b", "B-b-b"),
+                null,
+                null,
+                Arrays.asList("C-a-a", "C-a-b", "C-b-a", "C-b-b"),
+                Arrays.asList("A-c-a", "A-c-b", "B-c-a", "B-c-b", "C-c-a", "C-c-b", "A-a-c", "B-a-c",
+                        "A-b-c", "B-b-c", "C-a-c", "C-b-c", "A-c-c", "B-c-c", "C-c-c"),
+                null,
+                null,
+                null,
+                Arrays.asList("A-d-a", "A-d-b", "A-d-c", "B-d-a", "B-d-b", "B-d-c", "C-d-a", "C-d-b", "C-d-c",
+                        "A-a-d", "B-a-d", "A-b-d", "B-b-d", "C-a-d", "C-b-d", "A-c-d", "B-c-d", "C-c-d",
+                        "A-d-d", "B-d-d", "C-d-d"),
+                Arrays.asList("D-a-a", "D-a-b", "D-a-c", "D-a-d", "D-b-a", "D-b-b", "D-b-c", "D-b-d", "D-c-a",
+                        "D-c-b", "D-c-c", "D-c-d", "D-d-a", "D-d-b", "D-d-c", "D-d-d")
+        );
+
+        leftStream.join(rightStream, valueJoiner, JoinWindows.of(10000))
+                .join(rightStream, valueJoiner, JoinWindows.of(10000)).to(OUTPUT_TOPIC);
+
+        runTest(expectedResult);
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
new file mode 100644
index 0000000..f3eceb0
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/TableTableJoinIntegrationTest.java
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.ForeachAction;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Tests all available joins of Kafka Streams DSL.
+ */
+@Category({IntegrationTest.class})
+@RunWith(value = Parameterized.class)
+public class TableTableJoinIntegrationTest extends AbstractJoinIntegrationTest {
+    private KTable<Long, String> leftTable;
+    private KTable<Long, String> rightTable;
+
+    public TableTableJoinIntegrationTest(boolean cacheEnabled) {
+        super(cacheEnabled);
+    }
+
+    @Before
+    public void prepareTopology() throws InterruptedException {
+        super.prepareEnvironment();
+
+        appID = "table-table-join-integration-test";
+
+        builder = new StreamsBuilder();
+        leftTable = builder.table(INPUT_TOPIC_LEFT);
+        rightTable = builder.table(INPUT_TOPIC_RIGHT);
+    }
+
+    final private String expectedFinalJoinResult = "D-d";
+    final private String expectedFinalMultiJoinResult = "D-d-d";
+    final private String storeName = appID + "-store";
+
+    private Materialized<Long, String, KeyValueStore<Bytes, byte[]>> materialized = Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(storeName)
+            .withKeySerde(Serdes.Long())
+            .withValueSerde(Serdes.String())
+            .withCachingDisabled()
+            .withLoggingDisabled();
+
+    final private class CountingPeek implements ForeachAction<Long, String> {
+        final private String expected;
+
+        CountingPeek(final boolean multiJoin) {
+            this.expected = multiJoin ? expectedFinalMultiJoinResult : expectedFinalJoinResult;
+        }
+
+        @Override
+        public void apply(final Long key, final String value) {
+            numRecordsExpected++;
+            if (value.equals(expected)) {
+                boolean ret = finalResultReached.compareAndSet(false, true);
+
+                if (!ret) {
+                    // do nothing; it is possible that we will see multiple duplicates of final results due to KAFKA-4309
+                    // TODO: should be removed when KAFKA-4309 is fixed
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-left");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    Collections.singletonList("A-null"),
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-outer");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().peek(new CountingPeek(false)).to(OUTPUT_TOPIC);
+            runTest(expectedFinalJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    Collections.singletonList("A-null"),
+                    Collections.singletonList("A-a"),
+                    Collections.singletonList("B-a"),
+                    Collections.singletonList("B-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList("C-c"),
+                    Collections.singletonList("C-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Collections.singletonList("null-d"),
+                    Collections.singletonList("D-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner, materialized).toStream().to(OUTPUT_TOPIC);
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            // FIXME: the duplicate below for all the multi-joins
+            //        are due to KAFKA-6443, should be updated once it is fixed.
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    null,
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testInnerOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.join(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Arrays.asList((String) null, null),
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("null-d", "D-d-d")
+            );
+
+            leftTable.join(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testLeftOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d", "D-d-d")
+            );
+
+            leftTable.leftJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream().to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterInner() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-inner");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    null,
+                    null,
+                    Arrays.asList("C-c-c", "C-c-c"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d"),
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .join(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterLeft() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-left");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d"),
+                    Collections.singletonList("D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .leftJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+
+    @Test
+    public void testOuterOuter() throws Exception {
+        STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + "-inner-outer");
+
+        if (cacheEnabled) {
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .peek(new CountingPeek(true))
+                    .to(OUTPUT_TOPIC);
+            runTest(expectedFinalMultiJoinResult, storeName);
+        } else {
+            List<List<String>> expectedResult = Arrays.asList(
+                    null,
+                    null,
+                    null,
+                    Arrays.asList("A-null-null", "A-a-a", "A-a-a"),
+                    Collections.singletonList("B-a-a"),
+                    Arrays.asList("B-b-b", "B-b-b"),
+                    Collections.singletonList("null-b-b"),
+                    Arrays.asList((String) null, null),
+                    null,
+                    Arrays.asList("C-null-null", "C-c-c", "C-c-c"),
+                    Arrays.asList("C-null-null", "C-null-null"),
+                    Collections.singletonList((String) null),
+                    null,
+                    null,
+                    Arrays.asList("null-d-d", "null-d-d", "D-d-d")
+            );
+
+            leftTable.outerJoin(rightTable, valueJoiner)
+                    .outerJoin(rightTable, valueJoiner, materialized)
+                    .toStream()
+                    .to(OUTPUT_TOPIC);
+
+            runTest(expectedResult, storeName);
+        }
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/MockMapper.java b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
index fec9522..5184199 100644
--- a/streams/src/test/java/org/apache/kafka/test/MockMapper.java
+++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java
@@ -20,6 +20,8 @@ import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.ValueMapper;
 
+import java.util.Collections;
+
 public class MockMapper {
 
     private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> {
@@ -29,6 +31,13 @@ public class MockMapper {
         }
     }
 
+    private static class NoOpFlatKeyValueMapper<K, V> implements KeyValueMapper<K, V, Iterable<KeyValue<K, V>>> {
+        @Override
+        public Iterable<KeyValue<K, V>> apply(K key, V value) {
+            return Collections.singletonList(KeyValue.pair(key, value));
+        }
+    }
+
     private static class SelectValueKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<V, V>> {
         @Override
         public KeyValue<V, V> apply(K key, V value) {
@@ -61,6 +70,9 @@ public class MockMapper {
         return new SelectKeyMapper<>();
     }
 
+    public static <K, V> KeyValueMapper<K, V, Iterable<KeyValue<K, V>>> noOpFlatKeyValueMapper() {
+        return new NoOpFlatKeyValueMapper<>();
+    }
 
     public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> noOpKeyValueMapper() {
         return new NoOpKeyValueMapper<>();

-- 
To stop receiving notification emails like this one, please contact
['"commits@kafka.apache.org" <commits@kafka.apache.org>'].

Mime
View raw message