kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3016: phase-2. stream join implementations
Date Wed, 06 Jan 2016 22:34:45 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk a788c65f0 -> 5aad4999d


http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
new file mode 100644
index 0000000..dbb5515
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -0,0 +1,289 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.JoinWindowSpec;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.test.KStreamTestDriver;
+import org.apache.kafka.test.MockProcessorSupplier;
+import org.junit.Test;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class KStreamKStreamLeftJoinTest {
+
+    private String topic1 = "topic1";
+    private String topic2 = "topic2";
+
+    private IntegerSerializer keySerializer = new IntegerSerializer();
+    private StringSerializer valSerializer = new StringSerializer();
+    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
+    private StringDeserializer valDeserializer = new StringDeserializer();
+
+    private ValueJoiner<String, String, String> joiner = new ValueJoiner<String,
String, String>() {
+        @Override
+        public String apply(String value1, String value2) {
+            return value1 + "+" + value2;
+        }
+    };
+
+    @Test
+    public void testLeftJoin() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream1;
+            KStream<Integer, String> stream2;
+            KStream<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+                    keySerializer, valSerializer, keyDeserializer, valDeserializer);
+            joined.process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(0L);
+
+            // push two items to the primary stream. the other window is empty
+            // w {}
+            // --> w = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce two items.
+            // w {}
+            // --> w = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push all four items to the primary stream. this should produce four items.
+            // w = { 0:Y0, 1:Y1 }
+            // --> w = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null");
+
+            // push all items to the other stream. this should produce no items.
+            // w = { 0:Y0, 1:Y1 }
+            // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // push all four items to the primary stream. this should produce four items.
+            // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3
+            // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 }
+
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1",
"2:XX2+YY2", "3:XX3+YY3");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+    @Test
+    public void testWindowing() throws Exception {
+        File baseDir = Files.createTempDirectory("test").toFile();
+        try {
+
+            long time = 0L;
+
+            KStreamBuilder builder = new KStreamBuilder();
+
+            final int[] expectedKeys = new int[]{0, 1, 2, 3};
+
+            KStream<Integer, String> stream1;
+            KStream<Integer, String> stream2;
+            KStream<Integer, String> joined;
+            MockProcessorSupplier<Integer, String> processor;
+
+            processor = new MockProcessorSupplier<>();
+            stream1 = builder.from(keyDeserializer, valDeserializer, topic1);
+            stream2 = builder.from(keyDeserializer, valDeserializer, topic2);
+            joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100),
+                    keySerializer, valSerializer, keyDeserializer, valDeserializer);
+            joined.process(processor);
+
+            Collection<Set<String>> copartitionGroups = builder.copartitionGroups();
+
+            assertEquals(1, copartitionGroups.size());
+            assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
+
+            KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir);
+            driver.setTime(time);
+
+            // push two items to the primary stream. the other window is empty. this should
produce two items
+            // w = {}
+            // --> w = {}
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:X0+null", "1:X1+null");
+
+            // push two items to the other stream. this should produce no items.
+            // w = {}
+            // --> w = { 0:Y0, 1:Y1 }
+
+            for (int i = 0; i < 2; i++) {
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // clear logically
+            time = 1000L;
+
+            // push all items to the other stream. this should produce no items.
+            // w = {}
+            // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.setTime(time + i);
+                driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult();
+
+            // gradually expire items in window.
+            // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 }
+
+            time = 1000L + 100L;
+            driver.setTime(time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            // go back to the time before expiration
+
+            time = 1000L - 100L - 1L;
+            driver.setTime(time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null");
+
+            driver.setTime(++time);
+            for (int i = 0; i < expectedKeys.length; i++) {
+                driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]);
+            }
+
+            processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3");
+
+        } finally {
+            Utils.delete(baseDir);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
deleted file mode 100644
index c3dc7e0..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.kstream.internals;
-
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.KStreamBuilder;
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.test.KStreamTestDriver;
-import org.apache.kafka.test.UnlimitedWindowDef;
-import org.junit.Test;
-
-import java.util.Iterator;
-
-import static org.junit.Assert.assertEquals;
-
-public class KStreamWindowedTest {
-
-    private String topicName = "topic";
-    private String windowName = "MyWindow";
-
-    private IntegerDeserializer keyDeserializer = new IntegerDeserializer();
-    private StringDeserializer valDeserializer = new StringDeserializer();
-
-    @Test
-    public void testWindowedStream() {
-        KStreamBuilder builder = new KStreamBuilder();
-
-        final int[] expectedKeys = new int[]{0, 1, 2, 3};
-
-        KStream<Integer, String> stream;
-        WindowSupplier<Integer, String> windowSupplier;
-
-        windowSupplier = new UnlimitedWindowDef<>(windowName);
-        stream = builder.from(keyDeserializer, valDeserializer, topicName);
-        stream.with(windowSupplier);
-
-        KStreamTestDriver driver = new KStreamTestDriver(builder);
-        Window<Integer, String> window = (Window<Integer, String>) driver.getStateStore(windowName);
-        driver.setTime(0L);
-
-        // two items in the window
-
-        for (int i = 0; i < 2; i++) {
-            driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]);
-        }
-
-        assertEquals(1, countItem(window.find(0, 0L)));
-        assertEquals(1, countItem(window.find(1, 0L)));
-        assertEquals(0, countItem(window.find(2, 0L)));
-        assertEquals(0, countItem(window.find(3, 0L)));
-
-        // previous two items + all items, thus two are duplicates, in the window
-
-        for (int i = 0; i < expectedKeys.length; i++) {
-            driver.process(topicName, expectedKeys[i], "Y" + expectedKeys[i]);
-        }
-
-        assertEquals(2, countItem(window.find(0, 0L)));
-        assertEquals(2, countItem(window.find(1, 0L)));
-        assertEquals(1, countItem(window.find(2, 0L)));
-        assertEquals(1, countItem(window.find(3, 0L)));
-    }
-
-
-    private <T> int countItem(Iterator<T> iter) {
-        int i = 0;
-        while (iter.hasNext()) {
-            i++;
-            iter.next();
-        }
-        return i;
-    }
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
deleted file mode 100644
index 3da1ca7..0000000
--- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.test;
-
-import org.apache.kafka.streams.kstream.Window;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.kstream.KeyValue;
-import org.apache.kafka.streams.kstream.WindowSupplier;
-import org.apache.kafka.streams.kstream.internals.FilteredIterator;
-import org.apache.kafka.streams.processor.internals.Stamped;
-
-import java.util.Iterator;
-import java.util.LinkedList;
-
-public class UnlimitedWindowDef<K, V> implements WindowSupplier<K, V> {
-
-    private final String name;
-
-    public UnlimitedWindowDef(String name) {
-        this.name = name;
-    }
-
-    public String name() {
-        return name;
-    }
-
-    public Window<K, V> get() {
-        return new UnlimitedWindow();
-    }
-
-    public class UnlimitedWindow implements Window<K, V> {
-
-        private final LinkedList<Stamped<KeyValue<K, V>>> list = new LinkedList<>();
-
-        @Override
-        public void init(ProcessorContext context) {
-            context.register(this, true, null);
-        }
-
-        @Override
-        public Iterator<V> find(final K key, long timestamp) {
-            return find(key, Long.MIN_VALUE, timestamp);
-        }
-
-        @Override
-        public Iterator<V> findAfter(final K key, long timestamp) {
-            return find(key, timestamp, Long.MAX_VALUE);
-        }
-
-        @Override
-        public Iterator<V> findBefore(final K key, long timestamp) {
-            return find(key, Long.MIN_VALUE, Long.MAX_VALUE);
-        }
-
-        private Iterator<V> find(final K key, final long startTime, final long endTime)
{
-            return new FilteredIterator<V, Stamped<KeyValue<K, V>>>(list.iterator())
{
-                protected V filter(Stamped<KeyValue<K, V>> item) {
-                    if (item.value.key.equals(key) && startTime <= item.timestamp
&& item.timestamp <= endTime)
-                        return item.value.value;
-                    else
-                        return null;
-                }
-            };
-        }
-
-        @Override
-        public void put(K key, V value, long timestamp) {
-            list.add(new Stamped<>(KeyValue.pair(key, value), timestamp));
-        }
-
-        @Override
-        public String name() {
-            return name;
-        }
-
-        @Override
-        public void flush() {
-        }
-
-        @Override
-        public void close() {
-        }
-
-        @Override
-        public boolean persistent() {
-            return false;
-        }
-    }
-}


Mime
View raw message