kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/2] kafka git commit: KAFKA-5832; add Consumed and change StreamBuilder to use it
Date Fri, 08 Sep 2017 07:21:54 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 27336192f -> d0ee6ed36


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 4a356c7..9618033 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -100,9 +101,9 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
-        streamOne = builder.stream(Serdes.Long(), Serdes.Integer(), streamOneInput);
-        streamTwo = builder.stream(Serdes.Integer(), Serdes.String(), streamTwoInput);
-        streamFour = builder.stream(Serdes.Integer(), Serdes.String(), streamFourInput);
+        streamOne = builder.stream(streamOneInput, Consumed.with(Serdes.Long(), Serdes.Integer()));
+        streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String()));
+        streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String()));
 
         keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
index 2ae5cc2..92f351b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamsFineGrainedAutoResetIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -186,9 +187,10 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
 
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KStream<String, String> pattern1Stream = builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-\\d" + topicSuffix));
-        final KStream<String, String> pattern2Stream = builder.stream(Topology.AutoOffsetReset.LATEST, Pattern.compile("topic-[A-D]" + topicSuffix));
-        final KStream<String, String> namedTopicsStream = builder.stream(topicY, topicZ);
+
+        final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.EARLIEST));
+        final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]" + topicSuffix), Consumed.<String, String>with(Topology.AutoOffsetReset.LATEST));
+        final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(topicY, topicZ));
 
         pattern1Stream.to(stringSerde, stringSerde, outputTopic);
         pattern2Stream.to(stringSerde, stringSerde, outputTopic);
@@ -262,10 +264,9 @@ public class KStreamsFineGrainedAutoResetIntegrationTest {
     public void shouldThrowExceptionOverlappingTopic() throws  Exception {
         final StreamsBuilder builder = new StreamsBuilder();
         //NOTE this would realistically get caught when building topology, the test is for completeness
-        builder.stream(Topology.AutoOffsetReset.EARLIEST, Pattern.compile("topic-[A-D]_1"));
-
+        builder.stream(Pattern.compile("topic-[A-D]_1"), Consumed.with(Topology.AutoOffsetReset.EARLIEST));
         try {
-            builder.stream(Topology.AutoOffsetReset.LATEST, TOPIC_A_1, TOPIC_Z_1);
+            builder.stream(Arrays.asList(TOPIC_A_1, TOPIC_Z_1), Consumed.with(Topology.AutoOffsetReset.LATEST));
             fail("Should have thrown TopologyException");
         } catch (final TopologyException expected) {
             // do nothing

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 2a06ef4..0e9bc33 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreamsTest;
 import org.apache.kafka.streams.KeyValue;
@@ -191,7 +192,7 @@ public class QueryableStateIntegrationTest {
     private KafkaStreams createCountStream(final String inputTopic, final String outputTopic, final Properties streamsConfiguration) {
         final StreamsBuilder builder = new StreamsBuilder();
         final Serde<String> stringSerde = Serdes.String();
-        final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
+        final KStream<String, String> textLines = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
 
         final KGroupedStream<String, String> groupedByWord = textLines
             .flatMapValues(new ValueMapper<String, Iterable<String>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index d67588a..13d9f82 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -279,7 +279,7 @@ public class RegexSourceIntegrationTest {
 
         final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
         final KStream<String, String> pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
-        final KStream<String, String> namedTopicsStream = builder.stream(TOPIC_Y, TOPIC_Z);
+        final KStream<String, String> namedTopicsStream = builder.stream(Arrays.asList(TOPIC_Y, TOPIC_Z));
 
         pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
         pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
index 6b91b2b..8e1a8a1 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/AbstractStreamTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.processor.AbstractProcessor;
@@ -44,7 +45,7 @@ public class AbstractStreamTest {
         final MockProcessorSupplier<Integer, String> processor = new MockProcessorSupplier<>();
         final String topicName = "topic";
 
-        ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(Serdes.Integer(), Serdes.String(), topicName));
+        ExtendedKStream<Integer, String> stream = new ExtendedKStream<>(builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String())));
 
         stream.randomFilter().process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 87e15be..87c7ece 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.GlobalKTable;
@@ -54,7 +55,7 @@ public class GlobalKTableJoinsTest {
     public void setUp() throws Exception {
         stateDir = TestUtils.tempDirectory();
         global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
-        stream = builder.stream(Serdes.String(), Serdes.String(), streamTopic);
+        stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
         keyValueMapper = new KeyValueMapper<String, String, String>() {
             @Override
             public String apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index fc9a7fe..1a2dc13 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -16,9 +16,8 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
-import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -58,6 +57,7 @@ public class InternalStreamsBuilderTest {
     private final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
 
     private KStreamTestDriver driver = null;
+    private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
 
     @Before
     public void setUp() {
@@ -103,9 +103,9 @@ public class InternalStreamsBuilderTest {
         final String topic1 = "topic-1";
         final String topic2 = "topic-2";
         final String topic3 = "topic-3";
-        final KStream<String, String> source1 = builder.stream(null, null, null, null, topic1);
-        final KStream<String, String> source2 = builder.stream(null, null, null, null, topic2);
-        final KStream<String, String> source3 = builder.stream(null, null, null, null, topic3);
+        final KStream<String, String> source1 = builder.stream(Collections.singleton(topic1), consumed);
+        final KStream<String, String> source2 = builder.stream(Collections.singleton(topic2), consumed);
+        final KStream<String, String> source3 = builder.stream(Collections.singleton(topic3), consumed);
         final KStream<String, String> processedSource1 =
                 source1.mapValues(new ValueMapper<String, String>() {
                     @Override
@@ -133,8 +133,8 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
-        KTable table1 = builder.table(null, null, null, null, "topic1", "table1");
-        KTable table2 = builder.table(null, null, null, null, "topic2", (String) null);
+        KTable table1 = builder.table("topic1", consumed, "table1");
+        KTable table2 = builder.table("topic2", consumed, null);
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
@@ -152,7 +152,7 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {
-        builder.globalTable(null, null, null, "table", "globalTable");
+        builder.globalTable("table", consumed, "globalTable");
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -173,16 +173,16 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
-        builder.globalTable(null, null, null, "table", "globalTable");
-        builder.globalTable(null, null, null, "table2", "globalTable2");
+        builder.globalTable("table", consumed, "globalTable");
+        builder.globalTable("table2", consumed, "globalTable2");
 
         doBuildGlobalTopologyWithAllGlobalTables();
     }
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
-        builder.globalTable(null, null, null, "table", null);
-        builder.globalTable(null, null, null, "table2", null);
+        builder.globalTable("table", consumed, null);
+        builder.globalTable("table2", consumed, null);
 
         doBuildGlobalTopologyWithAllGlobalTables();
     }
@@ -191,10 +191,10 @@ public class InternalStreamsBuilderTest {
     public void shouldAddGlobalTablesToEachGroup() throws Exception {
         final String one = "globalTable";
         final String two = "globalTable2";
-        final GlobalKTable<String, String> globalTable = builder.globalTable(null, null, null, "table", one);
-        final GlobalKTable<String, String> globalTable2 = builder.globalTable(null, null, null, "table2", two);
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one);
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two);
 
-        builder.table(null, null, null, null, "not-global", "not-global");
+        builder.table("not-global", consumed, "not-global");
 
         final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
             @Override
@@ -203,9 +203,9 @@ public class InternalStreamsBuilderTest {
             }
         };
 
-        final KStream<String, String> stream = builder.stream(null, null, null, null, "t1");
+        final KStream<String, String> stream = builder.stream(Collections.singleton("t1"), consumed);
         stream.leftJoin(globalTable, kvMapper, MockValueJoiner.TOSTRING_JOINER);
-        final KStream<String, String> stream2 = builder.stream(null, null, null, null, "t2");
+        final KStream<String, String> stream2 = builder.stream(Collections.singleton("t2"), consumed);
         stream2.leftJoin(globalTable2, kvMapper, MockValueJoiner.TOSTRING_JOINER);
 
         final Map<Integer, Set<String>> nodeGroups = builder.internalTopologyBuilder.nodeGroups();
@@ -225,9 +225,9 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
-        final KStream<String, String> playEvents = builder.stream(null, null, null, null, "events");
+        final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
-        final KTable<String, String> table = builder.table(null, null, null, null, "table-topic", "table-store");
+        final KTable<String, String> table = builder.table("table-topic", consumed, "table-store");
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
@@ -239,8 +239,8 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTopicToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        
-        builder.stream(AutoOffsetReset.EARLIEST, null, null, null, topicName);
+        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST));
+        builder.stream(Collections.singleton(topicName), consumed);
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -250,7 +250,8 @@ public class InternalStreamsBuilderTest {
     public void shouldAddTopicToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
 
-        builder.stream(AutoOffsetReset.LATEST, null, null, null, topicName);
+        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST));
+        builder.stream(Collections.singleton(topicName), consumed);
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -260,8 +261,7 @@ public class InternalStreamsBuilderTest {
     public void shouldAddTableToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
         final String storeName = "test-store";
-
-        builder.table(AutoOffsetReset.EARLIEST, null, null, null, topicName, storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName);
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -272,7 +272,7 @@ public class InternalStreamsBuilderTest {
         final String topicName = "topic-1";
         final String storeName = "test-store";
 
-        builder.table(AutoOffsetReset.LATEST, null, null, null, topicName, storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName);
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -282,9 +282,8 @@ public class InternalStreamsBuilderTest {
     public void shouldNotAddTableToOffsetResetLists() {
         final String topicName = "topic-1";
         final String storeName = "test-store";
-        final Serde<String> stringSerde = Serdes.String();
 
-        builder.table(null, null, stringSerde, stringSerde, topicName, storeName);
+        builder.table(topicName, consumed, storeName);
 
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -295,7 +294,7 @@ public class InternalStreamsBuilderTest {
         final Pattern topicPattern = Pattern.compile("topic-\\d");
         final String topic = "topic-5";
 
-        builder.stream(null, null, null, null, topicPattern);
+        builder.stream(topicPattern, consumed);
 
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topic).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topic).matches());
@@ -307,7 +306,7 @@ public class InternalStreamsBuilderTest {
         final Pattern topicPattern = Pattern.compile("topic-\\d+");
         final String topicTwo = "topic-500000";
 
-        builder.stream(AutoOffsetReset.EARLIEST, null, null, null,  topicPattern);
+        builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)));
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
@@ -318,7 +317,7 @@ public class InternalStreamsBuilderTest {
         final Pattern topicPattern = Pattern.compile("topic-\\d+");
         final String topicTwo = "topic-1000000";
 
-        builder.stream(AutoOffsetReset.LATEST, null, null, null, topicPattern);
+        builder.stream(topicPattern, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)));
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicTwo).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicTwo).matches());
@@ -326,28 +325,30 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
-        builder.stream(null, null, null, null, "topic");
+        builder.stream(Collections.singleton("topic"), consumed);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
     public void shouldUseProvidedTimestampExtractor() throws Exception {
-        builder.stream(null, new MockTimestampExtractor(), null, null, "topic");
+        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
+        builder.stream(Collections.singleton("topic"), consumed);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }
 
     @Test
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
-        builder.table(null, null, null, null, "topic", "store");
+        builder.table("topic", consumed, "store");
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
     public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
-        builder.table(null, new MockTimestampExtractor(), null, null, "topic", "store");
+        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
+        builder.table("topic", consumed, "store");
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index ba7296d..849bb00 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.Aggregator;
@@ -68,7 +69,7 @@ public class KGroupedStreamImplTest {
 
     @Before
     public void before() {
-        final KStream<String, String> stream = builder.stream(Serdes.String(), Serdes.String(), TOPIC);
+        final KStream<String, String> stream = builder.stream(TOPIC, Consumed.with(Serdes.String(), Serdes.String()));
         groupedStream = stream.groupByKey(Serialized.with(Serdes.String(), Serdes.String()));
     }
 
@@ -87,17 +88,20 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnReduce() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, (StateStoreSupplier<KeyValueStore>) null);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnCount() throws Exception {
         groupedStream.count((StateStoreSupplier<KeyValueStore>) null);
     }
 
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnWindowedCount() throws Exception {
         groupedStream.count(TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
@@ -168,6 +172,7 @@ public class KGroupedStreamImplTest {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), Serdes.String(), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotHaveNullStoreSupplierOnWindowedAggregate() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, TimeWindows.of(10), (StateStoreSupplier<WindowStore>) null);
@@ -383,6 +388,7 @@ public class KGroupedStreamImplTest {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierWhenReducingSessionWindows() throws Exception {
         groupedStream.reduce(MockReducer.STRING_ADDER, SessionWindows.with(10), (StateStoreSupplier<SessionStore>) null);
@@ -448,6 +454,7 @@ public class KGroupedStreamImplTest {
         }, SessionWindows.with(10), Serdes.String(), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierNameWhenAggregatingSessionWindows() throws Exception {
         groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, new Merger<String, String>() {
@@ -473,6 +480,7 @@ public class KGroupedStreamImplTest {
         groupedStream.count(SessionWindows.with(90), INVALID_STORE_NAME);
     }
 
+    @SuppressWarnings("deprecation")
     @Test(expected = NullPointerException.class)
     public void shouldNotAcceptNullStateStoreSupplierWhenCountingSessionWindows() throws Exception {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
index f43d138..702631a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamBranchTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -66,7 +67,7 @@ public class KStreamBranchTest {
         KStream<Integer, String>[] branches;
         MockProcessorSupplier<Integer, String>[] processors;
 
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         branches = stream.branch(isEven, isMultipleOfThree, isOdd);
 
         assertEquals(3, branches.length);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
index cc85dc5..f1a6152 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFilterTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Predicate;
@@ -50,7 +51,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filter(isMultipleOfThree).process(processor);
 
         driver.setUp(builder);
@@ -70,7 +71,7 @@ public class KStreamFilterTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.filterNot(isMultipleOfThree).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
index c2a01c2..59dad47 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
@@ -59,7 +60,7 @@ public class KStreamFlatMapTest {
         MockProcessorSupplier<String, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), topicName);
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.String()));
         stream.flatMap(mapper).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
index ccfec98..59ab0ff 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamFlatMapValuesTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -57,7 +58,7 @@ public class KStreamFlatMapValuesTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(Serdes.Integer(), Serdes.Integer(), topicName);
+        stream = builder.stream(topicName, Consumed.with(Serdes.Integer(), Serdes.Integer()));
         stream.flatMapValues(mapper).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
index 4a46005..e8854fc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamForeachTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -70,7 +71,7 @@ public class KStreamForeachTest {
 
         // When
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
         stream.foreach(action);
 
         // Then

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index ca454f1..4ae1ea4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -44,6 +45,7 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.concurrent.TimeUnit;
 
@@ -60,6 +62,7 @@ public class KStreamImplTest {
     final private Serde<Integer> intSerde = Serdes.Integer();
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
 
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -74,9 +77,9 @@ public class KStreamImplTest {
     public void testNumProcesses() {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<String, String> source1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
+        KStream<String, String> source1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
 
-        KStream<String, String> source2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
+        KStream<String, String> source2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
 
         KStream<String, String> stream1 =
             source1.filter(new Predicate<String, String>() {
@@ -171,8 +174,9 @@ public class KStreamImplTest {
     @Test
     public void shouldUseRecordMetadataTimestampExtractorWithThrough() {
         final StreamsBuilder builder = new StreamsBuilder();
-        KStream<String, String> stream1 = builder.stream(stringSerde, stringSerde, "topic-1", "topic-2");
-        KStream<String, String> stream2 = builder.stream(stringSerde, stringSerde, "topic-3", "topic-4");
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+        KStream<String, String> stream1 = builder.stream(Arrays.asList("topic-1", "topic-2"), consumed);
+        KStream<String, String> stream2 = builder.stream(Arrays.asList("topic-3", "topic-4"), consumed);
 
         stream1.to("topic-5");
         stream2.through("topic-6");
@@ -189,7 +193,7 @@ public class KStreamImplTest {
     public void shouldSendDataThroughTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
+        final KStream<String, String> stream = builder.stream(input, consumed);
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         stream.through("through-topic", Produced.with(stringSerde, stringSerde)).process(processorSupplier);
 
@@ -202,10 +206,10 @@ public class KStreamImplTest {
     public void shouldSendDataToTopicUsingProduced() {
         final StreamsBuilder builder = new StreamsBuilder();
         final String input = "topic";
-        final KStream<String, String> stream = builder.stream(stringSerde, stringSerde, input);
+        final KStream<String, String> stream = builder.stream(input, consumed);
         final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>();
         stream.to("to-topic", Produced.with(stringSerde, stringSerde));
-        builder.stream(stringSerde, stringSerde, "to-topic").process(processorSupplier);
+        builder.stream("to-topic", consumed).process(processorSupplier);
 
         driver.setUp(builder);
         driver.process(input, "e", "f");
@@ -249,7 +253,8 @@ public class KStreamImplTest {
     @Test
     public void testToWithNullValueSerdeDoesntNPE() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, String> inputStream = builder.stream(stringSerde, stringSerde, "input");
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
+        final KStream<String, String> inputStream = builder.stream(Collections.singleton("input"), consumed);
         inputStream.to(stringSerde, null, "output");
     }
 
@@ -424,7 +429,7 @@ public class KStreamImplTest {
 
     @Test
     public void shouldThrowNullPointerOnLeftJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah");
+        final KTable<String, String> table = builder.table("blah", consumed);
         try {
             testStream.leftJoin(table,
                                 MockValueJoiner.TOSTRING_JOINER,
@@ -437,7 +442,7 @@ public class KStreamImplTest {
 
     @Test
     public void shouldThrowNullPointerOnJoinWithTableWhenJoinedIsNull() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), "blah");
+        final KTable<String, String> table = builder.table("blah", consumed);
         try {
             testStream.join(table,
                             MockValueJoiner.TOSTRING_JOINER,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
index ab7ca53..efb9f12 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -53,6 +54,7 @@ public class KStreamKStreamJoinTest {
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
     @Before
     public void setUp() throws IOException {
@@ -71,8 +73,8 @@ public class KStreamKStreamJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
         joined = stream1.join(stream2,
                               MockValueJoiner.TOSTRING_JOINER,
                               JoinWindows.of(100),
@@ -172,14 +174,14 @@ public class KStreamKStreamJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
         joined = stream1.outerJoin(stream2,
                                    MockValueJoiner.TOSTRING_JOINER,
                                    JoinWindows.of(100),
                                    Joined.with(intSerde, stringSerde, stringSerde));
         joined.process(processor);
-
         Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
 
         assertEquals(1, copartitionGroups.size());
@@ -275,8 +277,8 @@ public class KStreamKStreamJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.join(stream2,
                               MockValueJoiner.TOSTRING_JOINER,
@@ -505,8 +507,8 @@ public class KStreamKStreamJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.join(stream2,
                               MockValueJoiner.TOSTRING_JOINER,
@@ -619,8 +621,8 @@ public class KStreamKStreamJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.join(stream2,
                               MockValueJoiner.TOSTRING_JOINER,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 7d8297c..c56956d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.JoinWindows;
@@ -52,6 +53,7 @@ public class KStreamKStreamLeftJoinTest {
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
 
     @Before
@@ -71,8 +73,8 @@ public class KStreamKStreamLeftJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,
@@ -162,8 +164,8 @@ public class KStreamKStreamLeftJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream1 = builder.stream(intSerde, stringSerde, topic1);
-        stream2 = builder.stream(intSerde, stringSerde, topic2);
+        stream1 = builder.stream(topic1, consumed);
+        stream2 = builder.stream(topic2, consumed);
 
         joined = stream1.leftJoin(stream2,
                                   MockValueJoiner.TOSTRING_JOINER,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index 8f10923..524cf42 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
@@ -66,7 +67,7 @@ public class KStreamKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topic1);
+        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index df1f4a3..9e9fddc 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
@@ -67,7 +68,7 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topic1);
+        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
         table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
index 1d4dbb5..eed7d7d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
@@ -52,7 +53,7 @@ public class KStreamMapTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KStream<Integer, String> stream = builder.stream(intSerde, stringSerde, topicName);
+        KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
         MockProcessorSupplier<String, Integer> processor;
 
         processor = new MockProcessorSupplier<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
index 9d46a50..e4bf23e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamMapValuesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.ValueMapper;
@@ -53,7 +54,7 @@ public class KStreamMapValuesTest {
 
         KStream<Integer, String> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, stringSerde, topicName);
+        stream = builder.stream(topicName, Consumed.with(intSerde, stringSerde));
         stream.mapValues(mapper).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
index 9c78c31..215b0c3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamPeekTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -44,7 +45,7 @@ public class KStreamPeekTest {
     @Test
     public void shouldObserveStreamElements() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
         final List<KeyValue<Integer, String>> peekObserved = new ArrayList<>(), streamObserved = new ArrayList<>();
         stream.peek(collect(peekObserved)).foreach(collect(streamObserved));
 
@@ -63,7 +64,7 @@ public class KStreamPeekTest {
     @Test
     public void shouldNotAllowNullAction() {
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<Integer, String> stream = builder.stream(intSerd, stringSerd, topicName);
+        final KStream<Integer, String> stream = builder.stream(topicName, Consumed.with(intSerd, stringSerd));
         try {
             stream.peek(null);
             fail("expected null action to throw NPE");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
index 45c47fe..f4f340f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSelectKeyTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KStream;
@@ -61,7 +62,7 @@ public class KStreamSelectKeyTest {
         final String[] expected = new String[]{"ONE:1", "TWO:2", "THREE:3"};
         final int[] expectedValues = new int[]{1, 2, 3};
 
-        KStream<String, Integer>  stream = builder.stream(stringSerde, integerSerde, topicName);
+        KStream<String, Integer>  stream = builder.stream(topicName, Consumed.with(stringSerde, integerSerde));
 
         MockProcessorSupplier<String, Integer> processor = new MockProcessorSupplier<>();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
index d191891..df3ceaf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
@@ -75,7 +76,7 @@ public class KStreamTransformTest {
         final int[] expectedKeys = {1, 10, 100, 1000};
 
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        KStream<Integer, Integer> stream = builder.stream(intSerde, intSerde, topicName);
+        KStream<Integer, Integer> stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
         stream.transform(transformerSupplier).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
index 701a3d7..16f121e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamTransformValuesTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.kstream.KStream;
@@ -78,7 +79,7 @@ public class KStreamTransformValuesTest {
 
         KStream<Integer, Integer> stream;
         MockProcessorSupplier<Integer, Integer> processor = new MockProcessorSupplier<>();
-        stream = builder.stream(intSerde, intSerde, topicName);
+        stream = builder.stream(topicName, Consumed.with(intSerde, intSerde));
         stream.transformValues(valueTransformerSupplier).process(processor);
 
         driver.setUp(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index f8dfdb3..b39ac36 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -59,7 +60,7 @@ public class KStreamWindowAggregateTest {
         final StreamsBuilder builder = new StreamsBuilder();
         String topic1 = "topic1";
 
-        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KStream<String, String> stream1 = builder.stream(topic1, Consumed.with(strSerde, strSerde));
         KTable<Windowed<String>, String> table2 =
             stream1.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,
@@ -151,7 +152,7 @@ public class KStreamWindowAggregateTest {
         String topic1 = "topic1";
         String topic2 = "topic2";
 
-        KStream<String, String> stream1 = builder.stream(strSerde, strSerde, topic1);
+        KStream<String, String> stream1 = builder.stream(topic1, Consumed.with(strSerde, strSerde));
         KTable<Windowed<String>, String> table1 =
             stream1.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,
@@ -162,7 +163,7 @@ public class KStreamWindowAggregateTest {
         MockProcessorSupplier<Windowed<String>, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
 
-        KStream<String, String> stream2 = builder.stream(strSerde, strSerde, topic2);
+        KStream<String, String> stream2 = builder.stream(topic2, Consumed.with(strSerde, strSerde));
         KTable<Windowed<String>, String> table2 =
             stream2.groupByKey(Serialized.with(strSerde, strSerde))
                 .aggregate(MockInitializer.STRING_INIT,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 7453cc2..9fb8480 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -31,6 +31,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -586,7 +587,7 @@ public class SimpleBenchmark {
 
         StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
+        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 
         source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override
@@ -625,7 +626,7 @@ public class SimpleBenchmark {
 
         StreamsBuilder builder = new StreamsBuilder();
 
-        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
+        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 
         source.to(INTEGER_SERDE, BYTE_SERDE, SINK_TOPIC);
         source.process(new ProcessorSupplier<Integer, byte[]>() {
@@ -729,7 +730,7 @@ public class SimpleBenchmark {
         } else {
             builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
         }
-        KStream<Integer, byte[]> source = builder.stream(INTEGER_SERDE, BYTE_SERDE, topic);
+        KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 
         source.process(new ProcessorSupplier<Integer, byte[]>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 6d9cf98..d2d0681 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -282,8 +283,9 @@ public class YahooBenchmark {
         projectedEventDeserializer.configure(serdeProps, false);
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, ProjectedEvent> kEvents = builder.stream(Serdes.String(),
-            Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer), eventsTopic);
+        final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
+                                                                       Consumed.with(Serdes.String(),
+                                                                                     Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
         final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
             campaignsTopic, "campaign-state");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index b9455e7..881da2a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -32,6 +32,7 @@ import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.errors.ProcessorStateException;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.StateStore;
@@ -333,7 +334,7 @@ public class StandbyTaskTest {
         restoreStateConsumer.updatePartitions(changelogName, Utils.mkList(
                 new PartitionInfo(changelogName, 0, Node.noNode(), new Node[0], new Node[0])));
         final InternalStreamsBuilder builder = new InternalStreamsBuilder(new InternalTopologyBuilder());
-        builder.stream(null, null, null, null, "topic").groupByKey().count("my-store");
+        builder.stream(Collections.singleton("topic"), new ConsumedInternal<>()).groupByKey().count("my-store");
 
         final StreamsConfig config = createConfig(baseDir);
         final InternalTopologyBuilder internalTopologyBuilder = InternalStreamsBuilderTest.internalTopologyBuilder(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index b82c1ac..a7923f8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest;
 import org.apache.kafka.streams.processor.TaskId;
@@ -86,6 +87,7 @@ public class StreamThreadTest {
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final StateDirectory stateDirectory  = new StateDirectory("applicationId", stateDir, mockTime);
     private StreamsMetadataState streamsMetadataState;
+    private final ConsumedInternal<Object, Object> consumed = new ConsumedInternal<>();
 
     @Before
     public void setUp() throws Exception {
@@ -743,8 +745,8 @@ public class StreamThreadTest {
 
     @Test
     public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
-        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
-        internalStreamsBuilder.stream(null, null, null, null, "t2").groupByKey().count("count-two");
+        internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton("t2"), consumed).groupByKey().count("count-two");
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1018,7 +1020,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException {
-        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;
@@ -1070,7 +1072,7 @@ public class StreamThreadTest {
 
     @Test
     public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException {
-        internalStreamsBuilder.stream(null, null, null, null, "t1").groupByKey().count("count-one");
+        internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one");
 
         final StreamThread thread = createStreamThread(clientId, config, false);
         final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
index 614ab4d..7372cc3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/ShutdownDeadlockTest.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -45,7 +46,7 @@ public class ShutdownDeadlockTest {
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "shouldNotDeadlock");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, String> source = builder.stream(Serdes.String(), Serdes.String(), topic);
+        final KStream<String, String> source = builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()));
 
         source.foreach(new ForeachAction<String, String>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index 50c33e7..c4e108d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.tests;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -106,7 +107,7 @@ public class SmokeTestClient extends SmokeTestUtil {
 
 
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<String, Integer> source = builder.stream(stringSerde, intSerde, "data");
+        KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde));
         source.to(stringSerde, intSerde, "echo");
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override


Mime
View raw message