kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-4001: Improve Kafka Streams Join Semantics (KIP-77)
Date Thu, 20 Oct 2016 20:06:29 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 24067e407 -> 62c0972ef


http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 80512d9..58090fd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -72,10 +72,10 @@ public class KStreamKStreamLeftJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStream<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> processor;
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
@@ -84,7 +84,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100),
intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -93,8 +93,10 @@ public class KStreamKStreamLeftJoinTest {
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty
-        // w {}
-        // --> w = {}
+        // w1 {}
+        // w2 {}
+        // --> w1 = { 0:X0, 1:X1 }
+        // --> w2 = {}
 
         for (int i = 0; i < 2; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
@@ -103,38 +105,47 @@ public class KStreamKStreamLeftJoinTest {
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce two items.
-        // w {}
-        // --> w = { 0:Y0, 1:Y1 }
+        // w1 = { 0:X0, 1:X1 }
+        // w2 {}
+        // --> w1 = { 0:X0, 1:X1 }
+        // --> w2 = { 0:Y0, 1:Y1 }
 
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
         driver.flushState();
-        processor.checkAndClearProcessResult();
 
-        // push all four items to the primary stream. this should produce four items.
-        // w = { 0:Y0, 1:Y1 }
-        // --> w = { 0:Y0, 1:Y1 }
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
-        for (int i = 0; i < expectedKeys.length; i++) {
+        // push three items to the primary stream. this should produce four items.
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+        // --> w2 = { 0:Y0, 1:Y1 }
+
+        for (int i = 0; i < 3; i++) {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
         driver.flushState();
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:X2+null");
 
-        // push all items to the other stream. this should produce no items.
-        // w = { 0:Y0, 1:Y1 }
-        // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        // push all items to the other stream. this should produce 5 items
+        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+        // w2 = { 0:Y0, 1:Y1 }
+        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
         driver.flushState();
-        processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("0:X0+YY0", "0:X0+YY0", "1:X1+YY1", "1:X1+YY1",
"2:X2+YY2");
 
-        // push all four items to the primary stream. this should produce four items.
-        // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
-        // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+        // push all four items to the primary stream. this should produce six items.
+        // w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2 }
+        // w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
+        // --> w1 = { 0:X0, 1:X1, 0:X0, 1:X1, 2:X2, 0:XX0, 1:XX1, 2:XX2, 3:XX3 }
+        // --> w2 = { 0:Y0, 1:Y1, 0:YY0, 1:YY1, 2:YY2, 3:YY3}
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
@@ -151,10 +162,10 @@ public class KStreamKStreamLeftJoinTest {
 
         long time = 0L;
 
-        KStream<Integer, String> stream1;
-        KStream<Integer, String> stream2;
-        KStream<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> processor;
+        final KStream<Integer, String> stream1;
+        final KStream<Integer, String> stream2;
+        final KStream<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
         stream1 = builder.stream(intSerde, stringSerde, topic1);
@@ -163,7 +174,7 @@ public class KStreamKStreamLeftJoinTest {
         joined = stream1.leftJoin(stream2, MockValueJoiner.STRING_JOINER, JoinWindows.of(100),
intSerde, stringSerde, stringSerde);
         joined.process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -171,8 +182,10 @@ public class KStreamKStreamLeftJoinTest {
         driver = new KStreamTestDriver(builder, stateDir);
 
         // push two items to the primary stream. the other window is empty. this should produce
two items
-        // w = {}
-        // --> w = {}
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = { 0:X0, 1:X1 }
+        // --> w2 = {}
 
         setRecordContext(time, topic1);
         for (int i = 0; i < 2; i++) {
@@ -182,23 +195,27 @@ public class KStreamKStreamLeftJoinTest {
         processor.checkAndClearProcessResult("0:X0+null", "1:X1+null");
 
         // push two items to the other stream. this should produce no items.
-        // w = {}
-        // --> w = { 0:Y0, 1:Y1 }
+        // w1 = { 0:X0, 1:X1 }
+        // w2 = {}
+        // --> w1 = { 0:X0, 1:X1 }
+        // --> w2 = { 0:Y0, 1:Y1 }
 
         setRecordContext(time, topic2);
         for (int i = 0; i < 2; i++) {
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
         }
         driver.flushState();
-        processor.checkAndClearProcessResult();
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
 
         // clear logically
         time = 1000L;
         setRecordContext(time, topic2);
 
         // push all items to the other stream. this should produce no items.
-        // w = {}
-        // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = {}
+        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
         for (int i = 0; i < expectedKeys.length; i++) {
             setRecordContext(time + i, topic2);
             driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
@@ -206,8 +223,11 @@ public class KStreamKStreamLeftJoinTest {
         driver.flushState();
         processor.checkAndClearProcessResult();
 
-        // gradually expire items in window.
-        // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+        // gradually expire items in window 2.
+        // w1 = {}
+        // w2 = {}
+        // --> w1 = {}
+        // --> w2 = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
 
         time = 1000L + 100L;
         setRecordContext(time, topic1);

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
new file mode 100644
index 0000000..742e852
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.apache.kafka.test.MockValueJoiner;
+import org.apache.kafka.test.TestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKTableJoinTest {
+
+    final private String topic1 = "topic1";
+    final private String topic2 = "topic2";
+
+    final private Serde<Integer> intSerde = Serdes.Integer();
+    final private Serde<String> stringSerde = Serdes.String();
+
+    private KStreamTestDriver driver = null;
+    private File stateDir = null;
+
+    @After
+    public void tearDown() {
+        if (driver != null) {
+            driver.close();
+        }
+        driver = null;
+    }
+
+    @Before
+    public void setUp() throws IOException {
+        stateDir = TestUtils.tempDirectory("kafka-test");
+    }
+
+    @Test
+    public void testJoin() throws Exception {
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+        final KStream<Integer, String> stream;
+        final KTable<Integer, String> table;
+        final MockProcessorSupplier<Integer, String> processor;
+
+        processor = new MockProcessorSupplier<>();
+        stream = builder.stream(intSerde, stringSerde, topic1);
+        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+        stream.join(table, MockValueJoiner.STRING_JOINER).process(processor);
+
+        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+        assertEquals(1, copartitionGroups.size());
+        assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+        driver = new KStreamTestDriver(builder, stateDir);
+        driver.setTime(0L);
+
+        // push two items to the primary stream. the other table is empty
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        // push two items to the other stream. this should not produce any item.
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
+
+        // push all items to the other stream. this should not produce any item
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce four items.
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
+
+        // push two items with null to the other stream as deletes. this should not produce
any item.
+
+        for (int i = 0; i < 2; i++) {
+            driver.process(topic2, expectedKeys[i], null);
+        }
+
+        processor.checkAndClearProcessResult();
+
+        // push all four items to the primary stream. this should produce two items.
+
+        for (int i = 0; i < expectedKeys.length; i++) {
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+        }
+
+        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 37aac0c..2175dd5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -228,13 +228,7 @@ public class KStreamWindowAggregateTest {
                     "[A@0]:0+1+1"
             );
             proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:null",
-                    "[B@0]:null",
-                    "[C@0]:null",
-                    "[D@0]:null",
-                    "[A@0]:null"
-            );
+            proc3.checkAndClearProcessResult();
 
             setRecordContext(5, topic1);
             driver.process(topic1, "A", "1");
@@ -260,13 +254,7 @@ public class KStreamWindowAggregateTest {
                     "[C@0]:0+3+3", "[C@5]:0+3"
             );
             proc2.checkAndClearProcessResult();
-            proc3.checkAndClearProcessResult(
-                    "[A@0]:null", "[A@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[D@0]:null", "[D@5]:null",
-                    "[B@0]:null", "[B@5]:null",
-                    "[C@0]:null", "[C@5]:null"
-            );
+            proc3.checkAndClearProcessResult();
 
             setRecordContext(0, topic1);
             driver.process(topic2, "A", "a");

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index ba10668..1bd8600 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -74,10 +74,10 @@ public class KTableKTableJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1;
-        KTable<Integer, String> table2;
-        KTable<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> processor;
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
@@ -85,17 +85,17 @@ public class KTableKTableJoinTest {
         joined = table1.join(table2, MockValueJoiner.STRING_JOINER);
         joined.toStream().process(processor);
 
-        Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+        final Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer,
String, String>) joined).valueGetterSupplier();
+        final KTableValueGetterSupplier<Integer, String> getterSupplier = ((KTableImpl<Integer,
String, String>) joined).valueGetterSupplier();
 
         driver = new KStreamTestDriver(builder, stateDir);
         driver.setTime(0L);
 
-        KTableValueGetter<Integer, String> getter = getterSupplier.get();
+        final KTableValueGetter<Integer, String> getter = getterSupplier.get();
         getter.init(driver.context());
 
         // push two items to the primary stream. the other table is empty
@@ -105,8 +105,7 @@ public class KTableKTableJoinTest {
         }
         driver.flushState();
 
-        processor.checkAndClearProcessResult("0:null", "1:null");
-        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, null), kv(3, null));
+        processor.checkAndClearProcessResult();
 
         // push two items to the other stream. this should produce two items.
 
@@ -116,17 +115,17 @@ public class KTableKTableJoinTest {
         driver.flushState();
 
         processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"));
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
 
-        processor.checkAndClearProcessResult("0:X0+Y0", "1:X1+Y1", "2:null", "3:null");
-        checkJoinedValues(getter, kv(0, "X0+Y0"), kv(1, "X1+Y1"), kv(2, null), kv(3, null));
+        processor.checkAndClearProcessResult("0:XX0+Y0", "1:XX1+Y1");
+        checkJoinedValues(getter, kv(0, "XX0+Y0"), kv(1, "XX1+Y1"));
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
@@ -134,8 +133,8 @@ public class KTableKTableJoinTest {
         }
         driver.flushState();
 
-        processor.checkAndClearProcessResult("0:X0+YY0", "1:X1+YY1", "2:X2+YY2", "3:X3+YY3");
-        checkJoinedValues(getter, kv(0, "X0+YY0"), kv(1, "X1+YY1"), kv(2, "X2+YY2"), kv(3,
"X3+YY3"));
+        processor.checkAndClearProcessResult("0:XX0+YY0", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(0, "XX0+YY0"), kv(1, "XX1+YY1"), kv(2, "XX2+YY2"), kv(3,
"XX3+YY3"));
 
         // push all four items to the primary stream. this should produce four items.
 
@@ -155,17 +154,17 @@ public class KTableKTableJoinTest {
         driver.flushState();
 
         processor.checkAndClearProcessResult("0:null", "1:null");
-        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "X2+YY2"), kv(3, "X3+YY3"));
+        checkJoinedValues(getter, kv(0, null), kv(1, null));
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
 
-        processor.checkAndClearProcessResult("0:null", "1:null", "2:XX2+YY2", "3:XX3+YY3");
-        checkJoinedValues(getter, kv(0, null), kv(1, null), kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
+        processor.checkAndClearProcessResult("2:XX2+YY2", "3:XX3+YY3");
+        checkJoinedValues(getter, kv(2, "XX2+YY2"), kv(3, "XX3+YY3"));
     }
 
     @Test
@@ -174,10 +173,10 @@ public class KTableKTableJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1;
-        KTable<Integer, String> table2;
-        KTable<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> proc;
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> proc;
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@@ -200,7 +199,7 @@ public class KTableKTableJoinTest {
         }
         driver.flushState();
 
-        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        proc.checkAndClearProcessResult();
 
         // push two items to the other stream. this should produce two items.
 
@@ -211,21 +210,21 @@ public class KTableKTableJoinTest {
 
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
 
-        proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)", "2:(null<-null)",
"3:(null<-null)");
+        proc.checkAndClearProcessResult("0:(XX0+Y0<-null)", "1:(XX1+Y1<-null)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-null)", "1:(X1+YY1<-null)", "2:(X2+YY2<-null)",
"3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(XX0+YY0<-null)", "1:(XX1+YY1<-null)", "2:(XX2+YY2<-null)",
"3:(XX3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four items.
 
@@ -243,13 +242,13 @@ public class KTableKTableJoinTest {
         driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-null)",
"3:(XX3+YY3<-null)");
+        proc.checkAndClearProcessResult("2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
     }
 
     @Test
@@ -258,10 +257,10 @@ public class KTableKTableJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1;
-        KTable<Integer, String> table2;
-        KTable<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> proc;
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> proc;
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@@ -285,7 +284,7 @@ public class KTableKTableJoinTest {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)");
+        proc.checkAndClearProcessResult();
 
         // push two items to the other stream. this should produce two items.
 
@@ -295,20 +294,20 @@ public class KTableKTableJoinTest {
         driver.flushState();
         proc.checkAndClearProcessResult("0:(X0+Y0<-null)", "1:(X1+Y1<-null)");
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+Y0<-X0+Y0)", "1:(X1+Y1<-X1+Y1)", "2:(null<-null)",
"3:(null<-null)");
+        proc.checkAndClearProcessResult("0:(XX0+Y0<-X0+Y0)", "1:(XX1+Y1<-X1+Y1)");
 
         // push all items to the other stream. this should produce four items.
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+Y0)", "1:(X1+YY1<-X1+Y1)", "2:(X2+YY2<-null)",
"3:(X3+YY3<-null)");
+        proc.checkAndClearProcessResult("0:(XX0+YY0<-XX0+Y0)", "1:(XX1+YY1<-XX1+Y1)",
"2:(XX2+YY2<-null)", "3:(XX3+YY3<-null)");
 
         // push all four items to the primary stream. this should produce four items.
 
@@ -316,7 +315,7 @@ public class KTableKTableJoinTest {
             driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(X0+YY0<-X0+YY0)", "1:(X1+YY1<-X1+YY1)",
"2:(X2+YY2<-X2+YY2)", "3:(X3+YY3<-X3+YY3)");
+        proc.checkAndClearProcessResult("0:(X0+YY0<-XX0+YY0)", "1:(X1+YY1<-XX1+YY1)",
"2:(X2+YY2<-XX2+YY2)", "3:(X3+YY3<-XX3+YY3)");
 
         // push two items with null to the other stream as deletes. this should produce two
item.
 
@@ -326,13 +325,13 @@ public class KTableKTableJoinTest {
         driver.flushState();
         proc.checkAndClearProcessResult("0:(null<-X0+YY0)", "1:(null<-X1+YY1)");
 
-        // push all four items to the primary stream. this should produce four items.
+        // push all four items to the primary stream. this should produce two items.
 
         for (int i = 0; i < expectedKeys.length; i++) {
             driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
         }
         driver.flushState();
-        proc.checkAndClearProcessResult("0:(null<-null)", "1:(null<-null)", "2:(XX2+YY2<-X2+YY2)",
"3:(XX3+YY3<-X3+YY3)");
+        proc.checkAndClearProcessResult("2:(XX2+YY2<-X2+YY2)", "3:(XX3+YY3<-X3+YY3)");
     }
 
     private KeyValue<Integer, String> kv(Integer key, String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 5f84678..816979a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -164,10 +164,10 @@ public class KTableKTableLeftJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1;
-        KTable<Integer, String> table2;
-        KTable<Integer, String> joined;
-        MockProcessorSupplier<Integer, String> proc;
+        final KTable<Integer, String> table1;
+        final KTable<Integer, String> table2;
+        final KTable<Integer, String> joined;
+        final MockProcessorSupplier<Integer, String> proc;
 
         table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
         table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
@@ -179,7 +179,7 @@ public class KTableKTableLeftJoinTest {
         driver = new KStreamTestDriver(builder, stateDir);
         driver.setTime(0L);
 
-        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/62c0972e/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index a6249bc..8d1c70a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -19,9 +19,9 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.KTable;
-import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockValueJoiner;
@@ -189,8 +189,8 @@ public class KTableKTableOuterJoinTest {
 
         driver = new KStreamTestDriver(builder, stateDir);
 
-        assertFalse(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
-        assertFalse(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table1).sendingOldValueEnabled());
+        assertTrue(((KTableImpl<?, ?, ?>) table2).sendingOldValueEnabled());
         assertFalse(((KTableImpl<?, ?, ?>) joined).sendingOldValueEnabled());
 
         // push two items to the primary stream. the other table is empty


Mime
View raw message