Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 109BF18883 for ; Wed, 6 Jan 2016 22:34:46 +0000 (UTC) Received: (qmail 68303 invoked by uid 500); 6 Jan 2016 22:34:46 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 68270 invoked by uid 500); 6 Jan 2016 22:34:45 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 68261 invoked by uid 99); 6 Jan 2016 22:34:45 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Jan 2016 22:34:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C7900DFDC7; Wed, 6 Jan 2016 22:34:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Date: Wed, 06 Jan 2016 22:34:45 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/2] kafka git commit: KAFKA-3016: phase-2. stream join implementations Repository: kafka Updated Branches: refs/heads/trunk a788c65f0 -> 5aad4999d http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java new file mode 100644 index 0000000..dbb5515 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java @@ -0,0 +1,289 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.JoinWindowSpec; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockProcessorSupplier; +import org.junit.Test; + +import java.io.File; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +import static org.junit.Assert.assertEquals; + +public class KStreamKStreamLeftJoinTest { + + private String topic1 = "topic1"; + private String topic2 = "topic2"; + + private IntegerSerializer keySerializer = new IntegerSerializer(); + private StringSerializer valSerializer = new StringSerializer(); + private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); + private StringDeserializer valDeserializer = new StringDeserializer(); + + private ValueJoiner joiner = new ValueJoiner() { + @Override + public String apply(String value1, String value2) { + return value1 + "+" + value2; + } + }; + + @Test + public void testLeftJoin() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream stream1; + KStream stream2; + KStream joined; + MockProcessorSupplier processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), + keySerializer, valSerializer, keyDeserializer, valDeserializer); + joined.process(processor); + + Collection> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver.setTime(0L); + + // push two items to the primary stream. the other window is empty + // w {} + // --> w = {} + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+null", "1:X1+null"); + + // push two items to the other stream. this should produce two items. + // w {} + // --> w = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // push all four items to the primary stream. this should produce four items. + // w = { 0:Y0, 1:Y1 } + // --> w = { 0:Y0, 1:Y1 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+Y0", "1:X1+Y1", "2:X2+null", "3:X3+null"); + + // push all items to the other stream. this should produce no items. + // w = { 0:Y0, 1:Y1 } + // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic2, expectedKeys[i], "YY" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // push all four items to the primary stream. this should produce four items. + // w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 + // --> w = { 0:Y0, 1:Y1, 0:YY0, 0:YY0, 1:YY1, 2:YY2, 3:YY3 } + + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "0:XX0+YY0", "1:XX1+Y1", "1:XX1+YY1", "2:XX2+YY2", "3:XX3+YY3"); + + } finally { + Utils.delete(baseDir); + } + } + + @Test + public void testWindowing() throws Exception { + File baseDir = Files.createTempDirectory("test").toFile(); + try { + + long time = 0L; + + KStreamBuilder builder = new KStreamBuilder(); + + final int[] expectedKeys = new int[]{0, 1, 2, 3}; + + KStream stream1; + KStream stream2; + KStream joined; + MockProcessorSupplier processor; + + processor = new MockProcessorSupplier<>(); + stream1 = builder.from(keyDeserializer, valDeserializer, topic1); + stream2 = builder.from(keyDeserializer, valDeserializer, topic2); + joined = stream1.leftJoin(stream2, joiner, JoinWindowSpec.of("test").within(100), + keySerializer, valSerializer, keyDeserializer, valDeserializer); + joined.process(processor); + + Collection> copartitionGroups = builder.copartitionGroups(); + + assertEquals(1, copartitionGroups.size()); + assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next()); + + KStreamTestDriver driver = new KStreamTestDriver(builder, baseDir); + driver.setTime(time); + + // push two items to the primary stream. the other window is empty. this should produce two items + // w = {} + // --> w = {} + + for (int i = 0; i < 2; i++) { + driver.process(topic1, expectedKeys[i], "X" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:X0+null", "1:X1+null"); + + // push two items to the other stream. this should produce no items. + // w = {} + // --> w = { 0:Y0, 1:Y1 } + + for (int i = 0; i < 2; i++) { + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // clear logically + time = 1000L; + + // push all items to the other stream. this should produce no items. + // w = {} + // --> w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + for (int i = 0; i < expectedKeys.length; i++) { + driver.setTime(time + i); + driver.process(topic2, expectedKeys[i], "Y" + expectedKeys[i]); + } + + processor.checkAndClearResult(); + + // gradually expire items in window. + // w = { 0:Y0, 1:Y1, 2:Y2, 3:Y3 } + + time = 1000L + 100L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+null", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+Y2", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+Y3"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + + // go back to the time before expiration + + time = 1000L - 100L - 1L; + driver.setTime(time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+null", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+null", "2:XX2+null", "3:XX3+null"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+null", "3:XX3+null"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+null"); + + driver.setTime(++time); + for (int i = 0; i < expectedKeys.length; i++) { + driver.process(topic1, expectedKeys[i], "XX" + expectedKeys[i]); + } + + processor.checkAndClearResult("0:XX0+Y0", "1:XX1+Y1", "2:XX2+Y2", "3:XX3+Y3"); + + } finally { + Utils.delete(baseDir); + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java deleted file mode 100644 index c3dc7e0..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowedTest.java +++ /dev/null @@ -1,91 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.UnlimitedWindowDef; -import org.junit.Test; - -import java.util.Iterator; - -import static org.junit.Assert.assertEquals; - -public class KStreamWindowedTest { - - private String topicName = "topic"; - private String windowName = "MyWindow"; - - private IntegerDeserializer keyDeserializer = new IntegerDeserializer(); - private StringDeserializer valDeserializer = new StringDeserializer(); - - @Test - public void testWindowedStream() { - KStreamBuilder builder = new KStreamBuilder(); - - final int[] expectedKeys = new int[]{0, 1, 2, 3}; - - KStream stream; - WindowSupplier windowSupplier; - - windowSupplier = new UnlimitedWindowDef<>(windowName); - stream = builder.from(keyDeserializer, valDeserializer, topicName); - stream.with(windowSupplier); - - KStreamTestDriver driver = new KStreamTestDriver(builder); - Window window = (Window) driver.getStateStore(windowName); - driver.setTime(0L); - - // two items in the window - - for (int i = 0; i < 2; i++) { - driver.process(topicName, expectedKeys[i], "V" + expectedKeys[i]); - } - - assertEquals(1, countItem(window.find(0, 0L))); - assertEquals(1, countItem(window.find(1, 0L))); - assertEquals(0, countItem(window.find(2, 0L))); - assertEquals(0, countItem(window.find(3, 0L))); - - // previous two items + all items, thus two are duplicates, in the window - - for (int i = 0; i < expectedKeys.length; i++) { - driver.process(topicName, expectedKeys[i], "Y" + expectedKeys[i]); - } - - assertEquals(2, countItem(window.find(0, 0L))); - assertEquals(2, countItem(window.find(1, 0L))); - assertEquals(1, countItem(window.find(2, 0L))); - assertEquals(1, countItem(window.find(3, 0L))); - } - - - private int countItem(Iterator iter) { - int i = 0; - while (iter.hasNext()) { - i++; - iter.next(); - } - return i; - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/5aad4999/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java b/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java deleted file mode 100644 index 3da1ca7..0000000 --- a/streams/src/test/java/org/apache/kafka/test/UnlimitedWindowDef.java +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.test; - -import org.apache.kafka.streams.kstream.Window; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.kstream.KeyValue; -import org.apache.kafka.streams.kstream.WindowSupplier; -import org.apache.kafka.streams.kstream.internals.FilteredIterator; -import org.apache.kafka.streams.processor.internals.Stamped; - -import java.util.Iterator; -import java.util.LinkedList; - -public class UnlimitedWindowDef implements WindowSupplier { - - private final String name; - - public UnlimitedWindowDef(String name) { - this.name = name; - } - - public String name() { - return name; - } - - public Window get() { - return new UnlimitedWindow(); - } - - public class UnlimitedWindow implements Window { - - private final LinkedList>> list = new LinkedList<>(); - - @Override - public void init(ProcessorContext context) { - context.register(this, true, null); - } - - @Override - public Iterator find(final K key, long timestamp) { - return find(key, Long.MIN_VALUE, timestamp); - } - - @Override - public Iterator findAfter(final K key, long timestamp) { - return find(key, timestamp, Long.MAX_VALUE); - } - - @Override - public Iterator findBefore(final K key, long timestamp) { - return find(key, Long.MIN_VALUE, Long.MAX_VALUE); - } - - private Iterator find(final K key, final long startTime, final long endTime) { - return new FilteredIterator>>(list.iterator()) { - protected V filter(Stamped> item) { - if (item.value.key.equals(key) && startTime <= item.timestamp && item.timestamp <= endTime) - return item.value.value; - else - return null; - } - }; - } - - @Override - public void put(K key, V value, long timestamp) { - list.add(new Stamped<>(KeyValue.pair(key, value), timestamp)); - } - - @Override - public String name() { - return name; - } - - @Override - public void flush() { - } - - @Override - public void close() { - } - - @Override - public boolean persistent() { - return false; - } - } -}