From commits-return-9385-archive-asf-public=cust-asf.ponee.io@kafka.apache.org Tue Apr 17 18:41:36 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id C96AE180649 for ; Tue, 17 Apr 2018 18:41:35 +0200 (CEST) Received: (qmail 73289 invoked by uid 500); 17 Apr 2018 16:41:34 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 73280 invoked by uid 99); 17 Apr 2018 16:41:34 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Apr 2018 16:41:34 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4456982F75; Tue, 17 Apr 2018 16:41:34 +0000 (UTC) Date: Tue, 17 Apr 2018 16:41:34 +0000 To: "commits@kafka.apache.org" Subject: [kafka] branch 1.1 updated: KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <152398329373.23735.11393507018280896392@gitbox.apache.org> From: guozhang@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: kafka X-Git-Refname: refs/heads/1.1 X-Git-Reftype: branch X-Git-Oldrev: d305027e1039540bd0987a044ea6c5ec1d469736 X-Git-Newrev: 24355c538b1f11f882f32c4ce8873453f966f8c0 X-Git-Rev: 24355c538b1f11f882f32c4ce8873453f966f8c0 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. guozhang pushed a commit to branch 1.1 in repository https://gitbox.apache.org/repos/asf/kafka.git The following commit(s) were added to refs/heads/1.1 by this push: new 24355c5 KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable 24355c5 is described below commit 24355c538b1f11f882f32c4ce8873453f966f8c0 Author: Valentino Proietti AuthorDate: Tue Apr 17 09:39:33 2018 -0700 KAFKA-6742: TopologyTestDriver error when dealing with stores from GlobalKTable guozhangwang While TopologyTestDriver works well with stores created from KTable it does not with stores from GlobalKTable. Moreover, for my testing purposes but I think it can be useful to others, I need to get access to the MockProducer inside TopologyTestDriver. I have added 4 new tests to TopologyTestDriverTest, two for stores from KTable and two for stores from GlobalKTable. While I was changing the TopologyTestDriver I've also make it implement java.io.Closeable. Author: Valentino Proietti Reviewers: Bill Bejeck , Matthias J. Sax , John Roesler , Guozhang Wang Closes #4823 from Vale68/KAFKA-6742 minor renaming (cherry picked from commit 01eddce01f94887d5f15d32d8050e8a526599fcc) Signed-off-by: Guozhang Wang --- .../apache/kafka/streams/TopologyTestDriver.java | 43 +++++++++++++++------- .../kafka/streams/TopologyTestDriverTest.java | 22 +++++++++++ 2 files changed, 52 insertions(+), 13 deletions(-) diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index abcc99d..e814fec 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.StateRestoreListener; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.GlobalStateManager; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; @@ -59,6 +60,7 @@ import org.apache.kafka.streams.state.internals.ThreadCache; import org.apache.kafka.streams.test.ConsumerRecordFactory; import org.apache.kafka.streams.test.OutputVerifier; +import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; @@ -162,18 +164,20 @@ import java.util.concurrent.atomic.AtomicLong; * @see OutputVerifier */ @InterfaceStability.Evolving -public class TopologyTestDriver { +public class TopologyTestDriver implements Closeable { private final Time mockTime; private final InternalTopologyBuilder internalTopologyBuilder; private final static int PARTITION_ID = 0; private final static TaskId TASK_ID = new TaskId(0, PARTITION_ID); - private StreamTask task; - private GlobalStateUpdateTask globalStateTask; + private final StreamTask task; + private final GlobalStateUpdateTask globalStateTask; + private final GlobalStateManager globalStateManager; private final StateDirectory stateDirectory; private final ProcessorTopology processorTopology; + private final MockProducer producer; private final Set internalTopics = new HashSet<>(); @@ -264,7 +268,7 @@ public class TopologyTestDriver { consumer.updateEndOffsets(Collections.singletonMap(partition, 0L)); } - final GlobalStateManagerImpl stateManager = new GlobalStateManagerImpl( + globalStateManager = new GlobalStateManagerImpl( new LogContext("mock "), globalTopology, consumer, @@ -273,16 +277,19 @@ public class TopologyTestDriver { streamsConfig); final GlobalProcessorContextImpl globalProcessorContext - = new GlobalProcessorContextImpl(streamsConfig, stateManager, streamsMetrics, cache); - stateManager.setGlobalProcessorContext(globalProcessorContext); + = new GlobalProcessorContextImpl(streamsConfig, globalStateManager, streamsMetrics, cache); + globalStateManager.setGlobalProcessorContext(globalProcessorContext); globalStateTask = new GlobalStateUpdateTask( globalTopology, globalProcessorContext, - stateManager, + globalStateManager, new LogAndContinueExceptionHandler(), new LogContext()); globalStateTask.initialize(); + } else { + globalStateManager = null; + globalStateTask = null; } if (!partitionsByTopic.isEmpty()) { @@ -303,6 +310,8 @@ public class TopologyTestDriver { producer); task.initializeStateStores(); task.initializeTopology(); + } else { + task = null; } } @@ -372,7 +381,8 @@ public class TopologyTestDriver { // Forward back into the topology if the produced record is to an internal or a source topic ... final String outputTopicName = record.topic(); - if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName)) { + if (internalTopics.contains(outputTopicName) || processorTopology.sourceTopics().contains(outputTopicName) + || globalPartitionsByTopic.containsKey(outputTopicName)) { final byte[] serializedKey = record.key(); final byte[] serializedValue = record.value(); @@ -410,8 +420,10 @@ public class TopologyTestDriver { */ public void advanceWallClockTime(final long advanceMs) { mockTime.sleep(advanceMs); - task.maybePunctuateSystemTime(); - task.commit(); + if (task != null) { + task.maybePunctuateSystemTime(); + task.commit(); + } captureOutputRecords(); } @@ -450,7 +462,7 @@ public class TopologyTestDriver { final V value = valueDeserializer.deserialize(record.topic(), record.value()); return new ProducerRecord<>(record.topic(), record.partition(), record.timestamp(), key, value); } - + /** * Get all {@link StateStore StateStores} from the topology. * The stores can be a "regular" or global stores. @@ -467,7 +479,7 @@ public class TopologyTestDriver { public Map getAllStateStores() { final Map allStores = new HashMap<>(); for (final String storeName : internalTopologyBuilder.allStateStoreName()) { - allStores.put(storeName, ((ProcessorContextImpl) task.context()).getStateMgr().getStore(storeName)); + allStores.put(storeName, getStateStore(storeName)); } return allStores; } @@ -487,7 +499,12 @@ public class TopologyTestDriver { * @see #getSessionStore(String) */ public StateStore getStateStore(final String name) { - return ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); + StateStore stateStore = task == null ? null : + ((ProcessorContextImpl) task.context()).getStateMgr().getStore(name); + if (stateStore == null && globalStateManager != null) { + stateStore = globalStateManager.getGlobalStore(name); + } + return stateStore; } /** diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index b74a754..d757f33 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -584,6 +585,10 @@ public class TopologyTestDriverTest { public void shouldPopulateGlobalStore() { testDriver = new TopologyTestDriver(setupGlobalStoreTopology(SOURCE_TOPIC_1), config); + final KeyValueStore globalStore = testDriver.getKeyValueStore(SOURCE_TOPIC_1 + "-globalStore"); + Assert.assertNotNull(globalStore); + Assert.assertNotNull(testDriver.getAllStateStores().get(SOURCE_TOPIC_1 + "-globalStore")); + testDriver.pipeInput(consumerRecord1); final List processedRecords = mockProcessors.get(0).processedRecords; @@ -897,4 +902,21 @@ public class TopologyTestDriverTest { ); } } + + @Test + public void shouldFeedStoreFromGlobalKTable() { + final StreamsBuilder builder = new StreamsBuilder(); + builder.globalTable("topic", + Consumed.with(Serdes.String(), Serdes.String()), + Materialized.>as("globalStore")); + try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) { + final KeyValueStore globalStore = testDriver.getKeyValueStore("globalStore"); + Assert.assertNotNull(globalStore); + Assert.assertNotNull(testDriver.getAllStateStores().get("globalStore")); + final ConsumerRecordFactory recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new StringSerializer()); + testDriver.pipeInput(recordFactory.create("topic", "k1", "value1")); + // we expect to have both in the global store, the one from pipeInput and the one from the producer + Assert.assertEquals("value1", globalStore.get("k1")); + } + } } -- To stop receiving notification emails like this one, please contact guozhang@apache.org.