kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch 2.0 updated: KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
Date Tue, 20 Nov 2018 22:48:16 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new c6a9abe  KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
c6a9abe is described below

commit c6a9abe3fad0cc92314ad272d9f7a2ee792bc2c8
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Tue Nov 20 14:39:12 2018 -0800

    KAFKA-7536: Initialize TopologyTestDriver with non-null topic (#5923)
    
    In TopologyTestDriver constructor set non-null topic; and in unit test intentionally turn
on caching to verify this case.
    
    Reviewers: Bill Bejeck <bill@confluent.io>, Matthias J. Sax <matthias@confluent.io>
---
 .../processor/internals/AbstractProcessorContext.java   |  2 +-
 .../org/apache/kafka/streams/TopologyTestDriver.java    |  4 ++--
 .../apache/kafka/streams/TopologyTestDriverTest.java    | 17 +++++++++++++++++
 3 files changed, 20 insertions(+), 3 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
index 0c3fcf2..62e0936 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContext.java
@@ -33,7 +33,7 @@ import java.util.Objects;
 
 public abstract class AbstractProcessorContext implements InternalProcessorContext {
 
-    static final String NONEXIST_TOPIC = "__null_topic__";
+    public static final String NONEXIST_TOPIC = "__null_topic__";
     private final TaskId taskId;
     private final String applicationId;
     private final StreamsConfig config;
diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 245a6fa..96904c2 100644
--- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -316,7 +316,7 @@ public class TopologyTestDriver implements Closeable {
                 new LogContext()
             );
             globalStateTask.initialize();
-            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1,
null, new RecordHeaders()));
+            globalProcessorContext.setRecordContext(new ProcessorRecordContext(0L, -1L, -1,
ProcessorContextImpl.NONEXIST_TOPIC, new RecordHeaders()));
         } else {
             globalStateManager = null;
             globalStateTask = null;
@@ -342,7 +342,7 @@ public class TopologyTestDriver implements Closeable {
             task.initializeStateStores();
             task.initializeTopology();
             context = (InternalProcessorContext) task.context();
-            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, null, new RecordHeaders()));
+            context.setRecordContext(new ProcessorRecordContext(0L, -1L, -1, ProcessorContextImpl.NONEXIST_TOPIC,
new RecordHeaders()));
         } else {
             task = null;
             context = null;
diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index d0d4ed1..8400e87 100644
--- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -888,6 +888,23 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void shouldAllowPrePopulatingStatesStoresWithCachingEnabled() {
+        final Topology topology = new Topology();
+        topology.addSource("sourceProcessor", "input-topic");
+        topology.addProcessor("aggregator", new CustomMaxAggregatorSupplier(), "sourceProcessor");
+        topology.addStateStore(Stores.keyValueStoreBuilder(
+            Stores.inMemoryKeyValueStore("aggStore"),
+            Serdes.String(),
+            Serdes.Long()).withCachingEnabled(), // intentionally turn on caching to achieve
better test coverage
+            "aggregator");
+
+        testDriver = new TopologyTestDriver(topology, config);
+
+        store = testDriver.getKeyValueStore("aggStore");
+        store.put("a", 21L);
+    }
+
+    @Test
     public void shouldCleanUpPersistentStateStoresOnClose() {
         final Topology topology = new Topology();
         topology.addSource("sourceProcessor", "input-topic");


Mime
View raw message