Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 25BA7200C13 for ; Mon, 6 Feb 2017 20:37:54 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 24562160B56; Mon, 6 Feb 2017 19:37:54 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 473B3160B53 for ; Mon, 6 Feb 2017 20:37:53 +0100 (CET) Received: (qmail 28878 invoked by uid 500); 6 Feb 2017 19:37:52 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 28868 invoked by uid 99); 6 Feb 2017 19:37:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 06 Feb 2017 19:37:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 52086DFC15; Mon, 6 Feb 2017 19:37:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: guozhang@apache.org To: commits@kafka.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-4461: Added support to ProcessorTopologyTestDriver for internal topics Date: Mon, 6 Feb 2017 19:37:52 +0000 (UTC) archived-at: Mon, 06 Feb 2017 19:37:54 -0000 Repository: kafka Updated Branches: refs/heads/trunk 7de22453b -> 1f8a2ad2e KAFKA-4461: Added support to ProcessorTopologyTestDriver for internal topics This resolves an issue in driving tests using the ProcessorTopologyTestDriver when `groupBy()` is invoked downstream of a processor that flags repartitioning. Ticket: https://issues.apache.org/jira/browse/KAFKA-4461 Discussion: http://search-hadoop.com/m/Kafka/uyzND1wbKeY1Q8nH1 dguy guozhangwang The contribution is my original work and I license the work to the project under the project's open source license. Author: Adrian McCague Reviewers: Damian Guy, Guozhang Wang Closes #2499 from amccague/KAFKA-4461_ProcessorTopologyTestDriver_map_groupbykey Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/1f8a2ad2 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/1f8a2ad2 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/1f8a2ad2 Branch: refs/heads/trunk Commit: 1f8a2ad2edc5d9de4dc3a3311a0650f25d2b9114 Parents: 7de2245 Author: Adrian McCague Authored: Mon Feb 6 11:37:48 2017 -0800 Committer: Guozhang Wang Committed: Mon Feb 6 11:37:48 2017 -0800 ---------------------------------------------------------------------- .../internals/ProcessorTopologyTest.java | 19 +++++++++++++++++++ .../kafka/test/ProcessorTopologyTestDriver.java | 15 +++++++++++++-- 2 files changed, 32 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/1f8a2ad2/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index f35a2b5..a0b2b8e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -61,6 +61,7 @@ public class ProcessorTopologyTest { private static final String INPUT_TOPIC_2 = "input-topic-2"; private static final String OUTPUT_TOPIC_1 = "output-topic-1"; private static final String OUTPUT_TOPIC_2 = "output-topic-2"; + private static final String THROUGH_TOPIC_1 = "through-topic-1"; private static long timestamp = 1000L; private final TopologyBuilder builder = new TopologyBuilder(); @@ -235,6 +236,17 @@ public class ProcessorTopologyTest { } @Test + public void testDrivingInternalRepartitioningTopology() { + driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology()); + driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); + driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key2", "value2"); + assertNextOutputRecord(OUTPUT_TOPIC_1, "key3", "value3"); + } + + @Test public void shouldCreateStringWithSourceAndTopics() throws Exception { builder.addSource("source", "topic1", "topic2"); final ProcessorTopology topology = builder.build(null); @@ -337,6 +349,13 @@ public class ProcessorTopologyTest { .addSink("counts", OUTPUT_TOPIC_1, "processor"); } + private TopologyBuilder createInternalRepartitioningTopology() { + return builder.addSource("source", INPUT_TOPIC_1) + .addInternalTopic(THROUGH_TOPIC_1) + .addSink("sink0", THROUGH_TOPIC_1, "source") + .addSource("source1", THROUGH_TOPIC_1) + .addSink("sink1", OUTPUT_TOPIC_1, "source1"); + } private TopologyBuilder createSimpleMultiSourceTopology(int partition) { return builder.addSource("source-1", STRING_DESERIALIZER, STRING_DESERIALIZER, INPUT_TOPIC_1) http://git-wip-us.apache.org/repos/asf/kafka/blob/1f8a2ad2/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index 277f5f5..b50ff34 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -20,10 +20,12 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; +import java.util.Set; import java.util.concurrent.atomic.AtomicLong; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; @@ -150,6 +152,7 @@ public class ProcessorTopologyTestDriver { private final Map partitionsByTopic = new HashMap<>(); private final Map offsetsByTopicPartition = new HashMap<>(); private final Map>> outputRecordsByTopic = new HashMap<>(); + private final Set internalTopics = new HashSet<>(); private final ProcessorTopology globalTopology; private final Map globalPartitionsByTopic = new HashMap<>(); private StreamTask task; @@ -176,6 +179,11 @@ public class ProcessorTopologyTestDriver { }; restoreStateConsumer = createRestoreConsumer(id, storeNames); + // Identify internal topics for forwarding in process ... + for (TopologyBuilder.TopicsInfo topicsInfo : builder.topicGroups().values()) { + internalTopics.addAll(topicsInfo.repartitionSourceTopics.keySet()); + } + // Set up all of the topic+partition information and subscribe the consumer to each ... for (String topic : topology.sourceTopics()) { TopicPartition tp = new TopicPartition(topic, 1); @@ -183,8 +191,6 @@ public class ProcessorTopologyTestDriver { offsetsByTopicPartition.put(tp, new AtomicLong()); } - - consumer.assign(offsetsByTopicPartition.keySet()); final StateDirectory stateDirectory = new StateDirectory(applicationId, TestUtils.tempDirectory().getPath(), Time.SYSTEM); @@ -250,6 +256,11 @@ public class ProcessorTopologyTestDriver { outputRecordsByTopic.put(record.topic(), outputRecords); } outputRecords.add(record); + + // Forward back into the topology if the produced record is to an internal topic ... + if (internalTopics.contains(record.topic())) { + process(record.topic(), record.key(), record.value()); + } } } else { final TopicPartition global = globalPartitionsByTopic.get(topicName);