kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6729: Reuse source topics for source KTable's materialized store's changelog (#5017)
Date Thu, 17 May 2018 18:28:50 GMT
This is an automated email from the ASF dual-hosted git repository.

guozhang pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1a324d7  KAFKA-6729: Reuse source topics for source KTable's materialized store's
changelog (#5017)
1a324d7 is described below

commit 1a324d784cfc53288730b7c1b5c1bde0685e4686
Author: Guozhang Wang <wangguoz@gmail.com>
AuthorDate: Thu May 17 11:28:45 2018 -0700

    KAFKA-6729: Reuse source topics for source KTable's materialized store's changelog (#5017)
    
    1. In InternalTopologyBuilder#topicGroups, which is used in StreamsPartitionAssignor,
look for book-kept storeToChangelogTopic map before creating a new internal changelog topics.
In this way if the source KTable is created, its source topic stored in storeToChangelogTopic
will be used.
    
    2. Added unit test (confirmed that without 1) it will fail).
    
    3. MINOR: removed TODOs that are related to removed KStreamBuilder.
    
    4. MINOR: removed TODOs in StreamsBuilderTest util functions and replaced with TopologyWrapper.
    
    5. MINOR: removed StreamsBuilderTest#testFrom as it is already covered by TopologyTest#shouldNotAllowToAddSourcesWithSameName,
plus it requires KStreamImpl.SOURCE_NAME which should be a package private field of the KStreamImpl.
    
    Reviewers: John Roesler <john@confluent.io>, Bill Bejeck <bill@confluent.io>,
Matthias
     J. Sax <matthias@confluent.io>
---
 .../streams/kstream/internals/KStreamImpl.java     |  3 +-
 .../streams/kstream/internals/KTableImpl.java      |  5 ++--
 .../internals/InternalTopologyBuilder.java         | 15 ++++++----
 .../apache/kafka/streams/StreamsBuilderTest.java   | 34 ++++++++--------------
 .../internals/KStreamGlobalKTableJoinTest.java     |  4 +--
 .../internals/KStreamGlobalKTableLeftJoinTest.java |  4 +--
 .../streams/kstream/internals/KStreamImplTest.java |  5 ++--
 .../kstream/internals/KStreamKStreamJoinTest.java  | 12 ++++----
 .../internals/KStreamKStreamLeftJoinTest.java      |  6 ++--
 .../kstream/internals/KStreamKTableJoinTest.java   |  4 +--
 .../internals/KStreamKTableLeftJoinTest.java       |  4 +--
 .../internals/KTableKTableInnerJoinTest.java       |  4 +--
 .../internals/KTableKTableLeftJoinTest.java        |  4 +--
 .../internals/KTableKTableOuterJoinTest.java       |  4 +--
 .../internals/StreamsMetadataStateTest.java        | 12 ++++----
 .../internals/StreamsPartitionAssignorTest.java    | 10 +++----
 .../org/apache/kafka/test/KStreamTestDriver.java   |  4 +--
 17 files changed, 62 insertions(+), 72 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index b8195a0..5331a95 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -53,8 +53,7 @@ import java.util.Set;
 
 public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K,
V> {
 
-    // TODO: change to package-private after removing KStreamBuilder
-    public static final String SOURCE_NAME = "KSTREAM-SOURCE-";
+    static final String SOURCE_NAME = "KSTREAM-SOURCE-";
 
     static final String SINK_NAME = "KSTREAM-SINK-";
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index 1c5ad4d..c1f0f7a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -45,10 +45,9 @@ import java.util.Set;
  */
 public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
V> {
 
-    // TODO: these two fields can be package-private after KStreamBuilder is removed
-    public static final String SOURCE_NAME = "KTABLE-SOURCE-";
+    static final String SOURCE_NAME = "KTABLE-SOURCE-";
 
-    public static final String STATE_STORE_NAME = "STATE-STORE-";
+    static final String STATE_STORE_NAME = "STATE-STORE-";
 
     private static final String FILTER_NAME = "KTABLE-FILTER-";
 
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
index 70437e9..575ac01 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
@@ -96,8 +96,7 @@ public class InternalTopologyBuilder {
     // are connected to these state stores
     private final Map<String, Set<Pattern>> stateStoreNameToSourceRegex = new
HashMap<>();
 
-    // map from state store names to this state store's corresponding changelog topic if
possible,
-    // this is used in the extended KStreamBuilder.
+    // map from state store names to this state store's corresponding changelog topic if
possible
     private final Map<String, String> storeToChangelogTopic = new HashMap<>();
 
     // all global topics
@@ -1013,12 +1012,16 @@ public class InternalTopologyBuilder {
                     }
                 }
 
-                // if the node is connected to a state, add to the state topics
+                // if the node is connected to a state store whose changelog topics are not
predefined, add to the changelog topics
                 for (final StateStoreFactory stateFactory : stateFactories.values()) {
                     if (stateFactory.loggingEnabled() && stateFactory.users().contains(node))
{
-                        final String name = ProcessorStateManager.storeChangelogTopic(applicationId,
stateFactory.name());
-                        final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory,
name);
-                        stateChangelogTopics.put(name, internalTopicConfig);
+                        final String topicName = storeToChangelogTopic.containsKey(stateFactory.name())
?
+                                storeToChangelogTopic.get(stateFactory.name()) :
+                                ProcessorStateManager.storeChangelogTopic(applicationId,
stateFactory.name());
+                        if (!stateChangelogTopics.containsKey(topicName)) {
+                            final InternalTopicConfig internalTopicConfig = createChangelogTopicConfig(stateFactory,
topicName);
+                            stateChangelogTopics.put(topicName, internalTopicConfig);
+                        }
                     }
                 }
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index 7c2bfa6..0a1e6df 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -26,7 +26,6 @@ import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.internals.KStreamImpl;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -39,13 +38,11 @@ import org.apache.kafka.test.StreamsTestUtils;
 import org.junit.Test;
 
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.Properties;
-import java.util.Set;
 
 import static org.hamcrest.CoreMatchers.equalTo;
 import static org.hamcrest.CoreMatchers.is;
@@ -58,13 +55,6 @@ public class StreamsBuilderTest {
     private final StreamsBuilder builder = new StreamsBuilder();
     private final Properties props = StreamsTestUtils.topologyTestConfig(Serdes.String(),
Serdes.String());
 
-    @Test(expected = TopologyException.class)
-    public void testFrom() {
-        builder.stream(Arrays.asList("topic-1", "topic-2"));
-
-        builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
-    }
-
     @Test
     public void shouldAllowJoinUnmaterializedFilteredKTable() {
         final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes,
String>allGoodPredicate());
@@ -192,7 +182,7 @@ public class StreamsBuilderTest {
     }
     
     @Test
-    public void testMerge() {
+    public void shouldMergeStreams() {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
 
@@ -281,6 +271,16 @@ public class StreamsBuilderTest {
         assertFalse(stores.hasNext());
         assertFalse(subtopologies.hasNext());
     }
+
+    @Test
+    public void shouldReuseSourceTopicAsChangelogs() {
+        final String topic = "topic";
+        builder.table(topic, Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as("store"));
+
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
+
+        assertThat(internalTopologyBuilder.topicGroups().get(0).stateChangelogTopics.keySet(),
equalTo(Collections.singleton("topic")));
+    }
     
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() {
@@ -291,14 +291,4 @@ public class StreamsBuilderTest {
     public void shouldThrowExceptionWhenTopicNamesAreNull() {
         builder.stream(Arrays.<String>asList(null, null));
     }
-
-    // TODO: these two static functions are added because some non-TopologyBuilder unit tests
need to access the internal topology builder,
-    //       which is usually a bad sign of design patterns between TopologyBuilder and StreamThread.
We need to consider getting rid of them later
-    public static InternalTopologyBuilder internalTopologyBuilder(final StreamsBuilder builder)
{
-        return builder.internalTopologyBuilder;
-    }
-
-    public static Collection<Set<String>> getCopartitionedGroups(final StreamsBuilder
builder) {
-        return builder.internalTopologyBuilder.copartitionGroups();
-    }
-}
\ No newline at end of file
+}
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
index c37e8a9..7a65c4a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -112,7 +112,7 @@ public class KStreamGlobalKTableJoinTest {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
index eb0775a..d6196c5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamGlobalKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
@@ -114,7 +114,7 @@ public class KStreamGlobalKTableLeftJoinTest {
 
     @Test
     public void shouldNotRequireCopartitioning() {
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals("KStream-GlobalKTable joins do not need to be co-partitioned", 0, copartitionGroups.size());
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 463afb8..ebf3f36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -22,7 +22,6 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -174,7 +173,7 @@ public class KStreamImplTest {
             1 + // to
             2 + // through
             1, // process
-            StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null).processors().size());
+            TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null).processors().size());
     }
 
     @Test
@@ -186,7 +185,7 @@ public class KStreamImplTest {
         stream1.to("topic-5");
         stream2.through("topic-6");
 
-        ProcessorTopology processorTopology = StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("X").build(null);
+        ProcessorTopology processorTopology = TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("X").build(null);
         assertThat(processorTopology.source("topic-6").getTimestampExtractor(), instanceOf(FailOnInvalidTimestamp.class));
         assertEquals(processorTopology.source("topic-4").getTimestampExtractor(), null);
         assertEquals(processorTopology.source("topic-3").getTimestampExtractor(), null);
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index de3446c..59f0953 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -105,7 +105,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -207,7 +207,7 @@ public class KStreamKStreamJoinTest {
             JoinWindows.of(100),
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -312,7 +312,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -535,7 +535,7 @@ public class KStreamKStreamJoinTest {
                 Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -644,7 +644,7 @@ public class KStreamKStreamJoinTest {
             Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 11c5c5b..8535a04 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.Joined;
 import org.apache.kafka.streams.kstream.KStream;
@@ -69,7 +69,7 @@ public class KStreamKStreamLeftJoinTest {
                                   Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
@@ -155,7 +155,7 @@ public class KStreamKStreamLeftJoinTest {
                                   Joined.with(Serdes.Integer(), Serdes.String(), Serdes.String()));
         joined.process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 0ce27ab..55635fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
@@ -104,7 +104,7 @@ public class KStreamKTableJoinTest {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index eedda07..98fc500 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -21,8 +21,8 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.test.ConsumerRecordFactory;
@@ -100,7 +100,7 @@ public class KStreamKTableLeftJoinTest {
     @Test
     public void shouldRequireCopartitionedStreams() {
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(streamTopic, tableTopic)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
index 7ed8b6a..2efdd85 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableInnerJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -75,7 +75,7 @@ public class KTableKTableInnerJoinTest {
                             final int[] expectedKeys,
                             final MockProcessorSupplier<Integer, String> supplier,
                             final KTable<Integer, String> joined) {
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 51fd839..79e5f0e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -87,7 +87,7 @@ public class KTableKTableLeftJoinTest {
         final MockProcessorSupplier<Integer, String> supplier = new MockProcessorSupplier<>();
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index cf3321f..8cee72f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.processor.MockProcessorContext;
 import org.apache.kafka.streams.processor.Processor;
@@ -83,7 +83,7 @@ public class KTableKTableOuterJoinTest {
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(supplier);
 
-        final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
+        final Collection<Set<String>> copartitionGroups = TopologyWrapper.getInternalTopologyBuilder(builder.build()).copartitionGroups();
 
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index e9bb2a3..39f848f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -26,7 +26,7 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -96,7 +96,7 @@ public class StreamsMetadataStateTest {
                             Consumed.with(null, null),
                             Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable));
 
-        StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
+        TopologyWrapper.getInternalTopologyBuilder(builder.build()).setApplicationId("appId");
 
         topic1P0 = new TopicPartition("topic-one", 0);
         topic1P1 = new TopicPartition("topic-one", 1);
@@ -122,7 +122,7 @@ public class StreamsMetadataStateTest {
                 new PartitionInfo("topic-four", 0, null, null, null));
 
         cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos,
Collections.<String>emptySet(), Collections.<String>emptySet());
-        discovery = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
hostOne);
+        discovery = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
hostOne);
         discovery.onChange(hostToPartitions, cluster);
         partitioner = new StreamPartitioner<String, Object>() {
             @Override
@@ -134,7 +134,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldNotThrowNPEWhenOnChangeNotCalled() {
-        new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder), hostOne).getAllMetadataForStore("store");
+        new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
hostOne).getAllMetadataForStore("store");
     }
 
     @Test
@@ -301,7 +301,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer()));
     }
@@ -314,7 +314,7 @@ public class StreamsMetadataStateTest {
 
     @Test
     public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() {
-        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(StreamsBuilderTest.internalTopologyBuilder(builder),
StreamsMetadataState.UNKNOWN_HOST);
+        final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(TopologyWrapper.getInternalTopologyBuilder(builder.build()),
StreamsMetadataState.UNKNOWN_HOST);
         streamsMetadataState.onChange(hostToPartitions, cluster);
         assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner));
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 9812158..cc507d6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -27,8 +27,8 @@ import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -727,7 +727,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldGenerateTasksForAllCreatedPartitions() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         // KStream with 3 partitions
@@ -796,7 +796,7 @@ public class StreamsPartitionAssignorTest {
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-repartition",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KTABLE-AGGREGATE-STATE-STORE-0000000006-changelog",
4);
         expectedCreatedInternalTopics.put(applicationId + "-KSTREAM-MAP-0000000001-repartition",
4);
-        expectedCreatedInternalTopics.put(applicationId + "-topic3-STATE-STORE-0000000002-changelog",
4);
+        expectedCreatedInternalTopics.put("topic3", 4);     // the source topic is reused
as changelog topics
 
         // check if all internal topics were created as expected
         assertThat(mockInternalTopicManager.readyTopics, equalTo(expectedCreatedInternalTopics));
@@ -906,7 +906,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotLoopInfinitelyOnMissingMetadataAndShouldNotCreateRelatedTasks()
{
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         KStream<Object, Object> stream1 = builder
@@ -1026,7 +1026,7 @@ public class StreamsPartitionAssignorTest {
     public void shouldNotAddStandbyTaskPartitionsToPartitionsForHost() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
         internalTopologyBuilder.setApplicationId(applicationId);
 
         builder.stream("topic1").groupByKey().count();
diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
index 033b68d..2c3461a 100644
--- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java
@@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsBuilderTest;
+import org.apache.kafka.streams.TopologyWrapper;
 import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStore;
@@ -82,7 +82,7 @@ public class KStreamTestDriver extends ExternalResource {
                       final Serde<?> keySerde,
                       final Serde<?> valSerde,
                       final long cacheSize) {
-        final InternalTopologyBuilder internalTopologyBuilder = StreamsBuilderTest.internalTopologyBuilder(builder);
+        final InternalTopologyBuilder internalTopologyBuilder = TopologyWrapper.getInternalTopologyBuilder(builder.build());
 
         internalTopologyBuilder.setApplicationId("TestDriver");
         topology = internalTopologyBuilder.build(null);

-- 
To stop receiving notification emails like this one, please contact
guozhang@apache.org.

Mime
View raw message