kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [2/2] kafka git commit: KAFKA-5832; add Consumed and change StreamBuilder to use it
Date Fri, 08 Sep 2017 07:21:55 GMT
KAFKA-5832; add Consumed and change StreamBuilder to use it

Added `Consumed` class.
Updated `StreamBuilder#stream`, `StreamBuilder#table`, `StreamBuilder#globalTable`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3784 from dguy/kip-182-stream-builder


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d0ee6ed3
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d0ee6ed3
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d0ee6ed3

Branch: refs/heads/trunk
Commit: d0ee6ed36baf702fa24dac8ae31f45fc27324d89
Parents: 2733619
Author: Damian Guy <damian.guy@gmail.com>
Authored: Fri Sep 8 08:21:48 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Fri Sep 8 08:21:48 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html               |   7 +-
 .../examples/pageview/PageViewTypedDemo.java    |   3 +-
 .../examples/pageview/PageViewUntypedDemo.java  |   3 +-
 .../java/org/apache/kafka/streams/Consumed.java | 158 +++++++
 .../apache/kafka/streams/StreamsBuilder.java    | 431 +++++--------------
 .../apache/kafka/streams/kstream/KStream.java   |   8 +-
 .../kstream/internals/ConsumedInternal.java     |  56 +++
 .../internals/InternalStreamsBuilder.java       |  91 ++--
 .../streams/kstream/internals/KStreamImpl.java  |   7 +-
 .../streams/kstream/internals/KTableImpl.java   |   4 +-
 .../apache/kafka/streams/KafkaStreamsTest.java  |   2 +-
 .../kafka/streams/StreamsBuilderTest.java       |   9 +-
 .../GlobalKTableIntegrationTest.java            |   3 +-
 .../KStreamAggregationDedupIntegrationTest.java |   3 +-
 .../KStreamAggregationIntegrationTest.java      |  15 +-
 .../KStreamKTableJoinIntegrationTest.java       |   3 +-
 .../integration/KStreamRepartitionJoinTest.java |   7 +-
 ...eamsFineGrainedAutoResetIntegrationTest.java |  13 +-
 .../QueryableStateIntegrationTest.java          |   3 +-
 .../integration/RegexSourceIntegrationTest.java |   2 +-
 .../kstream/internals/AbstractStreamTest.java   |   3 +-
 .../internals/GlobalKTableJoinsTest.java        |   3 +-
 .../internals/InternalStreamsBuilderTest.java   |  69 +--
 .../internals/KGroupedStreamImplTest.java       |  10 +-
 .../kstream/internals/KStreamBranchTest.java    |   3 +-
 .../kstream/internals/KStreamFilterTest.java    |   5 +-
 .../kstream/internals/KStreamFlatMapTest.java   |   3 +-
 .../internals/KStreamFlatMapValuesTest.java     |   3 +-
 .../kstream/internals/KStreamForeachTest.java   |   3 +-
 .../kstream/internals/KStreamImplTest.java      |  25 +-
 .../internals/KStreamKStreamJoinTest.java       |  24 +-
 .../internals/KStreamKStreamLeftJoinTest.java   |  10 +-
 .../internals/KStreamKTableJoinTest.java        |   3 +-
 .../internals/KStreamKTableLeftJoinTest.java    |   3 +-
 .../kstream/internals/KStreamMapTest.java       |   3 +-
 .../kstream/internals/KStreamMapValuesTest.java |   3 +-
 .../kstream/internals/KStreamPeekTest.java      |   5 +-
 .../kstream/internals/KStreamSelectKeyTest.java |   3 +-
 .../kstream/internals/KStreamTransformTest.java |   3 +-
 .../internals/KStreamTransformValuesTest.java   |   3 +-
 .../internals/KStreamWindowAggregateTest.java   |   7 +-
 .../kafka/streams/perf/SimpleBenchmark.java     |   7 +-
 .../kafka/streams/perf/YahooBenchmark.java      |   6 +-
 .../processor/internals/StandbyTaskTest.java    |   3 +-
 .../processor/internals/StreamThreadTest.java   |  10 +-
 .../streams/tests/ShutdownDeadlockTest.java     |   3 +-
 .../kafka/streams/tests/SmokeTestClient.java    |   3 +-
 47 files changed, 553 insertions(+), 501 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index a140b46..b8d3ae4 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -547,9 +547,8 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
                     StreamsBuilder builder = new StreamsBuilder();
 
                     KStream&lt;String, Long&gt; wordCounts = builder.stream(
-                        Serdes.String(), /* key serde */
-                        Serdes.Long(),   /* value serde */
-                        "word-counts-input-topic" /* input topic */);
+                        "word-counts-input-topic" /* input topic */,
+                        Consumed.with(Serdes.String(), Serdes.Long()); // define key and value serdes
                 </pre>
                 When to provide serdes explicitly:
                 <ul>
@@ -2427,7 +2426,7 @@ Note that in the <code>WordCountProcessor</code> implementation, users need to r
           StreamsConfig config = new StreamsConfig(props);
           StreamsBuilder builder = new StreamsBuilder();
 
-          KStream&lt;String, String&gt; textLines = builder.stream(stringSerde, stringSerde, "word-count-input");
+          KStream&lt;String, String&gt; textLines = builder.stream("word-count-input", Consumed.with(stringSerde, stringSerde);
 
           KGroupedStream&lt;String, String&gt; groupedByWord = textLines
               .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index 676f8cc..72f9be8 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -21,6 +21,7 @@ import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -142,7 +143,7 @@ public class PageViewTypedDemo {
         pageViewByRegionDeserializer.configure(serdeProps, false);
         final Serde<PageViewByRegion> pageViewByRegionSerde = Serdes.serdeFrom(pageViewByRegionSerializer, pageViewByRegionDeserializer);
 
-        KStream<String, PageView> views = builder.stream(Serdes.String(), pageViewSerde, "streams-pageview-input");
+        KStream<String, PageView> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), pageViewSerde));
 
         KTable<String, UserProfile> users = builder.table(Serdes.String(), userProfileSerde,
             "streams-userprofile-input", "streams-userprofile-store-name");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 5b87937..e8787af 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.connect.json.JsonDeserializer;
 import org.apache.kafka.connect.json.JsonSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -72,7 +73,7 @@ public class PageViewUntypedDemo {
         final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
         final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
 
-        KStream<String, JsonNode> views = builder.stream(Serdes.String(), jsonSerde, "streams-pageview-input");
+        KStream<String, JsonNode> views = builder.stream("streams-pageview-input", Consumed.with(Serdes.String(), jsonSerde));
 
         KTable<String, JsonNode> users = builder.table(Serdes.String(), jsonSerde,
             "streams-userprofile-input", "streams-userprofile-store-name");

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/Consumed.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/Consumed.java b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
new file mode 100644
index 0000000..fb42b99
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/Consumed.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.GlobalKTable;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+/**
+ * The {@code Consumed} class is used to define the optional parameters when using {@link StreamsBuilder} to
+ * build instancs of {@link KStream}, {@link KTable}, and {@link GlobalKTable}.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class Consumed<K, V> {
+
+    protected Serde<K> keySerde;
+    protected Serde<V> valueSerde;
+    protected TimestampExtractor timestampExtractor;
+    protected Topology.AutoOffsetReset resetPolicy;
+
+    private Consumed(final Serde<K> keySerde,
+                     final Serde<V> valueSerde,
+                     final TimestampExtractor timestampExtractor,
+                     final Topology.AutoOffsetReset resetPolicy) {
+
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.timestampExtractor = timestampExtractor;
+        this.resetPolicy = resetPolicy;
+    }
+
+    /**
+     * Create an instance of {@link Consumed} from an existing instance.
+     * @param consumed  the instance of {@link Consumed} to copy
+     */
+    public Consumed(final Consumed<K, V> consumed) {
+        this(consumed.keySerde, consumed.valueSerde, consumed.timestampExtractor, consumed.resetPolicy);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with the supplied arguments. {@code null} values are acceptable.
+     *
+     * @param keySerde           the key serde. If {@code null} the default key serde from config will be used
+     * @param valueSerde         the value serde. If {@code null} the default value serde from config will be used
+     * @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
+     * @param resetPolicy        the offset reset policy to be used. If {@code null} the default reset policy from config will be used
+     * @param <K>                key type
+     * @param <V>                value type
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
+                                             final Serde<V> valueSerde,
+                                             final TimestampExtractor timestampExtractor,
+                                             final Topology.AutoOffsetReset resetPolicy) {
+        return new Consumed<>(keySerde, valueSerde, timestampExtractor, resetPolicy);
+
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with key and value {@link Serde}s.
+     *
+     * @param keySerde   the key serde. If {@code null}the default key serde from config will be used
+     * @param valueSerde the value serde. If {@code null} the default value serde from config will be used
+     * @param <K>        key type
+     * @param <V>        value type
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final Serde<K> keySerde,
+                                             final Serde<V> valueSerde) {
+        return new Consumed<>(keySerde, valueSerde, null, null);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with a {@link TimestampExtractor}.
+     *
+     * @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
+     * @param <K>                key type
+     * @param <V>                value type
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final TimestampExtractor timestampExtractor) {
+        return new Consumed<>(null, null, timestampExtractor, null);
+    }
+
+    /**
+     * Create an instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+     *
+     * @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
+     * @param <K>         key type
+     * @param <V>         value type
+     * @return a new instance of {@link Consumed}
+     */
+    public static <K, V> Consumed<K, V> with(final Topology.AutoOffsetReset resetPolicy) {
+        return new Consumed<>(null, null, null, resetPolicy);
+    }
+
+    /**
+     * Configure the instance of {@link Consumed} with a key {@link Serde}.
+     *
+     * @param keySerde the key serde. If {@code null}the default key serde from config will be used
+     * @return this
+     */
+    public Consumed<K, V> withKeySerde(final Serde<K> keySerde) {
+        this.keySerde = keySerde;
+        return this;
+    }
+
+    /**
+     * Configure the instance of {@link Consumed} with a value {@link Serde}.
+     *
+     * @param valueSerde the value serde. If {@code null} the default value serde from config will be used
+     * @return this
+     */
+    public Consumed<K, V> withValueSerde(final Serde<V> valueSerde) {
+        this.valueSerde = valueSerde;
+        return this;
+    }
+
+    /**
+     * Configure the instance of {@link Consumed} with a {@link TimestampExtractor}.
+     *
+     * @param timestampExtractor the timestamp extractor to used. If {@code null} the default timestamp extractor from config will be used
+     * @return this
+     */
+    public Consumed<K, V> withTimestampExtractor(final TimestampExtractor timestampExtractor) {
+        this.timestampExtractor = timestampExtractor;
+        return this;
+    }
+
+    /**
+     * Configure the instance of {@link Consumed} with a {@link Topology.AutoOffsetReset}.
+     *
+     * @param resetPolicy the offset reset policy to be used. If {@code null} the default reset policy from config will be used
+     * @return this
+     */
+    public Consumed<K, V> withOffsetResetPolicy(final Topology.AutoOffsetReset resetPolicy) {
+        this.resetPolicy = resetPolicy;
+        return this;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index 5fc6a55..a26822a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -24,6 +24,7 @@ import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KGroupedTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.internals.ConsumedInternal;
 import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilder;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.processor.StateStore;
@@ -35,6 +36,8 @@ import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreType;
 
+import java.util.Collection;
+import java.util.Collections;
 import java.util.regex.Pattern;
 
 /**
@@ -66,153 +69,53 @@ public class StreamsBuilder {
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param topics the topic names; must contain at least one topic name
+     * @param topic the topic name; cannot be {@code null}
      * @return a {@link KStream} for the specified topics
      */
-    public synchronized <K, V> KStream<K, V> stream(final String... topics) {
-        return internalStreamsBuilder.stream(null, null, null, null, topics);
+    public synchronized <K, V> KStream<K, V> stream(final String topic) {
+        return stream(Collections.singleton(topic));
     }
 
     /**
      * Create a {@link KStream} from the specified topics.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
+     * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
+     * are defined by the options in {@link Consumed}.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
-     *                    offsets are available
-     * @param topics      the topic names; must contain at least one topic name
+     * @param topic the topic names; cannot be {@code null}
+     * @param consumed      the instance of {@link Consumed} used to define optional parameters
      * @return a {@link KStream} for the specified topics
      */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final String... topics) {
-        return internalStreamsBuilder.stream(offsetReset, null, null, null, topics);
+    public synchronized <K, V> KStream<K, V> stream(final String topic,
+                                                    final Consumed<K, V> consumed) {
+        return stream(Collections.singleton(topic), consumed);
     }
 
     /**
-     * Create a {@link KStream} from the specified topic pattern.
+     * Create a {@link KStream} from the specified topics.
      * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
      * deserializers as specified in the {@link StreamsConfig config} are used.
      * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(null, null,  null, null, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@link TimestampExtractor} and default key and value deserializers as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
-     *                     offsets are available
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(offsetReset, null, null, null, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor} as specified in the
-     * {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param keySerde   key serde used to read this source {@link KStream},
-     *                   if not specified the default serde defined in the configs will be used
-     * @param valueSerde value serde used to read this source {@link KStream},
-     *                   if not specified the default serde defined in the configs will be used
-     * @param topics     the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final String... topics) {
-        return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are specified there is no ordering guarantee for records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topics if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to read this source {@link KStream},
-     *                    if not specified the default serde defined in the configs will be used
-     * @param valueSerde  value serde used to read this source {@link KStream},
-     *                    if not specified the default serde defined in the configs will be used
-     * @param topics      the topic names; must contain at least one topic name
-     * @return a {@link KStream} for the specified topics
-     */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final String... topics) {
-        return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topics);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topics.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * <p>
      * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
      * Note that the specified input topics must be partitioned by key.
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream}, if not specified the default
-     *                           serde defined in the configs will be used
-     * @param valueSerde         value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topics             the topic names; must contain at least one topic name
+     * @param topics the topic names; must contain at least one topic name
      * @return a {@link KStream} for the specified topics
      */
-    public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final String... topics) {
-        return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topics);
+    public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics) {
+        return stream(topics, Consumed.<K, V>with(null, null, null, null));
     }
 
     /**
      * Create a {@link KStream} from the specified topics.
+     * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
+     * are defined by the options in {@link Consumed}.
      * <p>
      * If multiple topics are specified there is no ordering guarantee for records from different topics.
      * <p>
@@ -220,53 +123,20 @@ public class StreamsBuilder {
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the specified topics
-     *                           if no valid committed offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valueSerde         value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topics             the topic names; must contain at least one topic name
+     * @param topics the topic names; must contain at least one topic name
+     * @param consumed      the instance of {@link Consumed} used to define optional parameters
      * @return a {@link KStream} for the specified topics
      */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final TimestampExtractor timestampExtractor,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final String... topics) {
-        return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topics);
+    public synchronized <K, V> KStream<K, V> stream(final Collection<String> topics,
+                                                    final Consumed<K, V> consumed) {
+        return internalStreamsBuilder.stream(topics, new ConsumedInternal<>(consumed));
     }
 
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy and default {@link TimestampExtractor}
-     * as specified in the {@link StreamsConfig config} are used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param keySerde     key serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valueSerde   value serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param topicPattern the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public synchronized <K, V> KStream<K, V> stream(final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(null, null, keySerde, valueSerde, topicPattern);
-    }
 
     /**
      * Create a {@link KStream} from the specified topic pattern.
-     * The default {@link TimestampExtractor} as specified in the {@link StreamsConfig config} is used.
+     * The default {@code "auto.offset.reset"} strategy, default {@link TimestampExtractor}, and default key and value
+     * deserializers as specified in the {@link StreamsConfig config} are used.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -275,51 +145,17 @@ public class StreamsBuilder {
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset  the {@code "auto.offset.reset"} policy to use for the matched topics if no valid committed
-     *                     offsets are available
-     * @param keySerde     key serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valueSerde   value serde used to read this source {@link KStream},
-     *                     if not specified the default serde defined in the configs will be used
      * @param topicPattern the pattern to match for topic names
      * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(offsetReset, null, keySerde, valueSerde, topicPattern);
-    }
-
-    /**
-     * Create a {@link KStream} from the specified topic pattern.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * <p>
-     * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
-     * them and there is no ordering guarantee between records from different topics.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case it is the user's responsibility to repartition the data before any key based operation
-     * (like aggregation or join) is applied to the returned {@link KStream}.
-     *
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valueSerde         value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topicPattern       the pattern to match for topic names
-     * @return a {@link KStream} for topics matching the regex pattern.
-     */
-    public synchronized <K, V> KStream<K, V> stream(final TimestampExtractor timestampExtractor,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(null, timestampExtractor, keySerde, valueSerde, topicPattern);
+    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern) {
+        return stream(topicPattern, Consumed.<K, V>with(null, null));
     }
 
     /**
      * Create a {@link KStream} from the specified topic pattern.
+     * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
+     * are defined by the options in {@link Consumed}.
      * <p>
      * If multiple topics are matched by the specified pattern, the created {@link KStream} will read data from all of
      * them and there is no ordering guarantee between records from different topics.
@@ -328,23 +164,13 @@ public class StreamsBuilder {
      * If this is not the case it is the user's responsibility to repartition the data before any key based operation
      * (like aggregation or join) is applied to the returned {@link KStream}.
      *
-     * @param offsetReset        the {@code "auto.offset.reset"} policy to use for the matched topics if no valid
-     *                           committed  offsets are available
-     * @param timestampExtractor the stateless timestamp extractor used for this source {@link KStream},
-     *                           if not specified the default extractor defined in the configs will be used
-     * @param keySerde           key serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param valueSerde         value serde used to read this source {@link KStream},
-     *                           if not specified the default serde defined in the configs will be used
-     * @param topicPattern       the pattern to match for topic names
+     * @param topicPattern  the pattern to match for topic names
+     * @param consumed      the instance of {@link Consumed} used to define optional parameters
      * @return a {@link KStream} for topics matching the regex pattern.
      */
-    public synchronized <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                    final TimestampExtractor timestampExtractor,
-                                                    final Serde<K> keySerde,
-                                                    final Serde<V> valueSerde,
-                                                    final Pattern topicPattern) {
-        return internalStreamsBuilder.stream(offsetReset, timestampExtractor, keySerde, valueSerde, topicPattern);
+    public synchronized <K, V> KStream<K, V> stream(final Pattern topicPattern,
+                                                    final Consumed<K, V> consumed) {
+        return internalStreamsBuilder.stream(topicPattern, new ConsumedInternal<>(consumed));
     }
 
     /**
@@ -378,7 +204,7 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> KTable<K, V> table(final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(null, null,  null, null, topic, queryableStoreName);
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), queryableStoreName);
     }
 
     /**
@@ -433,7 +259,30 @@ public class StreamsBuilder {
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final String topic) {
-        return internalStreamsBuilder.table(null, null, null, null, topic, (String) null);
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<K, V>(), null);
+    }
+
+    /**
+     * Create a {@link KTable} for the specified topic.
+     * The {@code "auto.offset.reset"} strategy, {@link TimestampExtractor}, key and value deserializers
+     * are defined by the options in {@link Consumed}.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * Note that the specified input topics must be partitioned by key.
+     * If this is not the case the returned {@link KTable} will be corrupted.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     *
+     * @param topic     the topic name; cannot be {@code null}
+     * @param consumed  the instance of {@link Consumed} used to define optional parameters
+     * @return a {@link KTable} for the specified topic
+     */
+    public synchronized <K, V> KTable<K, V> table(final String topic,
+                                                  final Consumed<K, V> consumed) {
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), null);
     }
 
     /**
@@ -464,13 +313,13 @@ public class StreamsBuilder {
      *                           offsets are available
      * @param topic              the topic name; cannot be {@code null}
      * @param queryableStoreName the state store name; if {@code null} this is the equivalent of
-     *                           {@link #table(Topology.AutoOffsetReset, String)}
+     *                           {@link #table(String, Consumed)}
      * @return a {@link KTable} for the specified topic
      */
     public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, queryableStoreName);
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(offsetReset)), queryableStoreName);
     }
 
     /**
@@ -512,29 +361,6 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param topic       the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final String topic) {
-        return internalStreamsBuilder.table(offsetReset, null, null, null, topic, (String) null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
      * The default {@code "auto.offset.reset"} strategy and default key and value deserializers
      * as specified in the {@link StreamsConfig config} are used.
      * Input {@link KeyValue} pairs with {@code null} key will be dropped.
@@ -567,7 +393,7 @@ public class StreamsBuilder {
     public synchronized <K, V> KTable<K, V> table(final TimestampExtractor timestampExtractor,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(null, timestampExtractor, null, null, topic, queryableStoreName);
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(Consumed.<K, V>with(timestampExtractor)), queryableStoreName);
     }
 
     /**
@@ -606,7 +432,8 @@ public class StreamsBuilder {
                                                   final TimestampExtractor timestampExtractor,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(offsetReset, timestampExtractor, null, null, topic, queryableStoreName);
+        final Consumed<K, V> consumed = Consumed.<K, V>with(offsetReset).withTimestampExtractor(timestampExtractor);
+        return internalStreamsBuilder.table(topic, new ConsumedInternal<>(consumed), queryableStoreName);
     }
 
     /**
@@ -646,7 +473,9 @@ public class StreamsBuilder {
                                                   final Serde<V> valueSerde,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, queryableStoreName);
+        return internalStreamsBuilder.table(topic,
+                                            new ConsumedInternal<>(keySerde, valueSerde, null, null),
+                                            queryableStoreName);
     }
 
     /**
@@ -691,32 +520,6 @@ public class StreamsBuilder {
 
     /**
      * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in the configuration will be used
-     * @param topic      the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic) {
-        return internalStreamsBuilder.table(null, null, keySerde, valueSerde, topic, (String) null);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
      * Input {@link KeyValue records} with {@code null} key will be dropped.
      * <p>
      * Note that the specified input topics must be partitioned by key.
@@ -753,7 +556,8 @@ public class StreamsBuilder {
                                                   final Serde<V> valueSerde,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, queryableStoreName);
+        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, null, offsetReset);
+        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
     }
 
     /**
@@ -795,7 +599,8 @@ public class StreamsBuilder {
                                                   final Serde<V> valueSerde,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(null, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
+        final ConsumedInternal<K, V> consumed = new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null);
+        return internalStreamsBuilder.table(topic, consumed, queryableStoreName);
     }
 
     /**
@@ -839,36 +644,9 @@ public class StreamsBuilder {
                                                   final Serde<V> valueSerde,
                                                   final String topic,
                                                   final String queryableStoreName) {
-        return internalStreamsBuilder.table(offsetReset, timestampExtractor, keySerde, valueSerde, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link KTable} for the specified topic.
-     * The default {@code "auto.offset.reset"} strategy as specified in the {@link StreamsConfig config} is used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * Note that the specified input topics must be partitioned by key.
-     * If this is not the case the returned {@link KTable} will be corrupted.
-     * <p>
-     * The resulting {@link KTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * @param offsetReset the {@code "auto.offset.reset"} policy to use for the specified topic if no valid committed
-     *                    offsets are available
-     * @param keySerde    key serde used to send key-value pairs,
-     *                    if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde  value serde used to send key-value pairs,
-     *                    if not specified the default value serde defined in the configuration will be used
-     * @param topic       the topic name; cannot be {@code null}
-     * @return a {@link KTable} for the specified topic
-     */
-    public synchronized <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                                  final Serde<K> keySerde,
-                                                  final Serde<V> valueSerde,
-                                                  final String topic) {
-        return internalStreamsBuilder.table(offsetReset, null, keySerde, valueSerde, topic, (String) null);
+        return internalStreamsBuilder.table(topic,
+                                            new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, offsetReset),
+                                            queryableStoreName);
     }
 
     /**
@@ -942,7 +720,28 @@ public class StreamsBuilder {
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
                                                               final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(null, null, null,  topic, queryableStoreName);
+        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), queryableStoreName);
+    }
+
+    /**
+     * Create a {@link GlobalKTable} for the specified topic.
+     * Input {@link KeyValue records} with {@code null} key will be dropped.
+     * <p>
+     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
+     * store name. Note that that store name may not be queriable through Interactive Queries.
+     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
+     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
+     * <p>
+     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
+     * regardless of the specified value in {@link StreamsConfig}.
+     *
+     * @param topic the topic name; cannot be {@code null}
+     * @param consumed  the instance of {@link Consumed} used to define optional parameters
+     * @return a {@link GlobalKTable} for the specified topic
+     */
+    public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                              final Consumed<K, V> consumed) {
+        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<>(consumed), null);
     }
 
     /**
@@ -962,7 +761,7 @@ public class StreamsBuilder {
      * @return a {@link GlobalKTable} for the specified topic
      */
     public synchronized <K, V> GlobalKTable<K, V> globalTable(final String topic) {
-        return internalStreamsBuilder.globalTable(null, null, null, topic, null);
+        return internalStreamsBuilder.globalTable(topic, new ConsumedInternal<K, V>(), null);
     }
 
     /**
@@ -1002,7 +801,9 @@ public class StreamsBuilder {
                                                               final TimestampExtractor timestampExtractor,
                                                               final String topic,
                                                               final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(keySerde, valueSerde, timestampExtractor, topic, queryableStoreName);
+        return internalStreamsBuilder.globalTable(topic,
+                                                  new ConsumedInternal<>(keySerde, valueSerde, timestampExtractor, null),
+                                                  queryableStoreName);
     }
 
     /**
@@ -1074,33 +875,9 @@ public class StreamsBuilder {
                                                               final Serde<V> valueSerde,
                                                               final String topic,
                                                               final String queryableStoreName) {
-        return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, queryableStoreName);
-    }
-
-    /**
-     * Create a {@link GlobalKTable} for the specified topic.
-     * The default key and value deserializers as specified in the {@link StreamsConfig config} are used.
-     * Input {@link KeyValue records} with {@code null} key will be dropped.
-     * <p>
-     * The resulting {@link GlobalKTable} will be materialized in a local {@link KeyValueStore} with an internal
-     * store name. Note that that store name may not be queriable through Interactive Queries.
-     * No internal changelog topic is created since the original input topic can be used for recovery (cf.
-     * methods of {@link KGroupedStream} and {@link KGroupedTable} that return a {@link KTable}).
-     * <p>
-     * Note that {@link GlobalKTable} always applies {@code "auto.offset.reset"} strategy {@code "earliest"}
-     * regardless of the specified value in {@link StreamsConfig}.
-     *
-     * @param keySerde   key serde used to send key-value pairs,
-     *                   if not specified the default key serde defined in the configuration will be used
-     * @param valueSerde value serde used to send key-value pairs,
-     *                   if not specified the default value serde defined in the configuration will be used
-     * @param topic      the topic name; cannot be {@code null}
-     * @return a {@link GlobalKTable} for the specified topic
-     */
-    public synchronized <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                              final Serde<V> valueSerde,
-                                                              final String topic) {
-        return internalStreamsBuilder.globalTable(keySerde, valueSerde, null, topic, null);
+        return internalStreamsBuilder.globalTable(topic,
+                                                  new ConsumedInternal<>(Consumed.with(keySerde, valueSerde)),
+                                                  queryableStoreName);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 5a36cde..c1e5b87 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -37,7 +37,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  * For example a user X might buy two items I1 and I2, and thus there might be two records {@code <K:I1>, <K:I2>}
  * in the stream.
  * <p>
- * A {@code KStream} is either {@link StreamsBuilder#stream(String...) defined from one or multiple Kafka topics} that
+ * A {@code KStream} is either {@link StreamsBuilder#stream(String) defined from one or multiple Kafka topics} that
  * are consumed message by message or the result of a {@code KStream} transformation.
  * A {@link KTable} can also be {@link KTable#toStream() converted} into a {@code KStream}.
  * <p>
@@ -52,7 +52,7 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
  * @param <V> Type of values
  * @see KTable
  * @see KGroupedStream
- * @see StreamsBuilder#stream(String...)
+ * @see StreamsBuilder#stream(String)
  */
 @InterfaceStability.Evolving
 public interface KStream<K, V> {
@@ -654,7 +654,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
-     * {@link org.apache.kafka.streams.StreamsBuilder#stream(String...)
+     * {@link StreamsBuilder#stream(String)
      * StreamsBuilder#stream(someTopicName)}.
      *
      * @param topic the topic name
@@ -669,7 +669,7 @@ public interface KStream<K, V> {
      * started).
      * <p>
      * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(StreamPartitioner, someTopicName)} and
-     * {@link StreamsBuilder#stream(String...) StreamsBuilder#stream(someTopicName)}.
+     * {@link StreamsBuilder#stream(String) StreamsBuilder#stream(someTopicName)}.
      *
      * @param partitioner the function used to determine how records are distributed among partitions of the topic,
      *                    if not specified producer's {@link DefaultPartitioner} will be used

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
new file mode 100644
index 0000000..5e8f74b
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ConsumedInternal.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.Consumed;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+
+public class ConsumedInternal<K, V> extends Consumed<K, V> {
+    public ConsumedInternal(final Consumed<K, V> consumed) {
+        super(consumed);
+    }
+
+
+    public ConsumedInternal(final Serde<K> keySerde,
+                            final Serde<V> valSerde,
+                            final TimestampExtractor timestampExtractor,
+                            final Topology.AutoOffsetReset offsetReset) {
+        this(Consumed.with(keySerde, valSerde, timestampExtractor, offsetReset));
+    }
+
+    public ConsumedInternal() {
+        this(Consumed.<K, V>with(null, null));
+    }
+
+    public Serde<K> keySerde() {
+        return keySerde;
+    }
+
+    public Serde<V> valueSerde() {
+        return valueSerde;
+    }
+
+    public TimestampExtractor timestampExtractor() {
+        return timestampExtractor;
+    }
+
+    public Topology.AutoOffsetReset offsetResetPolicy() {
+        return resetPolicy;
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
index bcb68f0..4da9906 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.internals.RocksDBKeyValueStoreSupplier;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Objects;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -44,45 +45,45 @@ public class InternalStreamsBuilder {
         this.internalTopologyBuilder = internalTopologyBuilder;
     }
 
-    public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                                                  final TimestampExtractor timestampExtractor,
-                                                                  final Serde<K> keySerde,
-                                                                  final Serde<V> valSerde,
-                                                                  final String... topics) {
+    public <K, V> KStream<K, V> stream(final Collection<String> topics,
+                                       final ConsumedInternal<K, V> consumed) {
         final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topics);
+        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
+                                          name,
+                                          consumed.timestampExtractor(),
+                                          consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
+                                          consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
+                                          topics.toArray(new String[topics.size()]));
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
-    public <K, V> KStream<K, V> stream(final Topology.AutoOffsetReset offsetReset,
-                                       final TimestampExtractor timestampExtractor,
-                                       final Serde<K> keySerde,
-                                       final Serde<V> valSerde,
-                                       final Pattern topicPattern) {
+    public <K, V> KStream<K, V> stream(final Pattern topicPattern, final ConsumedInternal<K, V> consumed) {
         final String name = newName(KStreamImpl.SOURCE_NAME);
 
-        internalTopologyBuilder.addSource(offsetReset, name, timestampExtractor, keySerde == null ? null : keySerde.deserializer(), valSerde == null ? null : valSerde.deserializer(), topicPattern);
+        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
+                                          name,
+                                          consumed.timestampExtractor(),
+                                          consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
+                                          consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
+                                          topicPattern);
 
         return new KStreamImpl<>(this, name, Collections.singleton(name), false);
     }
 
     @SuppressWarnings("unchecked")
-    public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
-                                     final TimestampExtractor timestampExtractor,
-                                     final Serde<K> keySerde,
-                                     final Serde<V> valSerde,
-                                     final String topic,
+    public <K, V> KTable<K, V> table(final String topic,
+                                     final ConsumedInternal<K, V> consumed,
                                      final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
         final StateStoreSupplier storeSupplier = new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            keySerde,
-            valSerde,
+            consumed.keySerde(),
+            consumed.valueSerde(),
             false,
             Collections.<String, String>emptyMap(),
             true);
-        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, queryableStoreName != null);
+        return doTable(consumed, topic, storeSupplier, queryableStoreName != null);
     }
 
     public <K, V> KTable<K, V> table(final Topology.AutoOffsetReset offsetReset,
@@ -92,13 +93,10 @@ public class InternalStreamsBuilder {
                                      final String topic,
                                      final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        return doTable(offsetReset, keySerde, valSerde, timestampExtractor, topic, storeSupplier, true);
+        return doTable(new ConsumedInternal<>(keySerde, valSerde, timestampExtractor, offsetReset), topic, storeSupplier, true);
     }
 
-    private <K, V> KTable<K, V> doTable(final Topology.AutoOffsetReset offsetReset,
-                                        final Serde<K> keySerde,
-                                        final Serde<V> valSerde,
-                                        final TimestampExtractor timestampExtractor,
+    private <K, V> KTable<K, V> doTable(final ConsumedInternal<K, V> consumed,
                                         final String topic,
                                         final StateStoreSupplier<KeyValueStore> storeSupplier,
                                         final boolean isQueryable) {
@@ -106,13 +104,16 @@ public class InternalStreamsBuilder {
         final String name = newName(KTableImpl.SOURCE_NAME);
         final ProcessorSupplier<K, V> processorSupplier = new KTableSource<>(storeSupplier.name());
 
-        internalTopologyBuilder.addSource(offsetReset, source, timestampExtractor, keySerde == null ? null : keySerde.deserializer(),
-            valSerde == null ? null : valSerde.deserializer(),
-            topic);
+        internalTopologyBuilder.addSource(consumed.offsetResetPolicy(),
+                                          source,
+                                          consumed.timestampExtractor(),
+                                          consumed.keySerde() == null ? null : consumed.keySerde().deserializer(),
+                                          consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer(),
+                                          topic);
         internalTopologyBuilder.addProcessor(name, processorSupplier, source);
 
         final KTableImpl<K, ?, V> kTable = new KTableImpl<>(this, name, processorSupplier,
-            keySerde, valSerde, Collections.singleton(source), storeSupplier.name(), isQueryable);
+            consumed.keySerde(), consumed.valueSerde(), Collections.singleton(source), storeSupplier.name(), isQueryable);
 
         internalTopologyBuilder.addStateStore(storeSupplier, name);
         internalTopologyBuilder.connectSourceStoreAndTopic(storeSupplier.name(), topic);
@@ -120,15 +121,13 @@ public class InternalStreamsBuilder {
         return kTable;
     }
 
-    public <K, V> GlobalKTable<K, V> globalTable(final Serde<K> keySerde,
-                                                 final Serde<V> valSerde,
-                                                 final TimestampExtractor timestampExtractor,
-                                                 final String topic,
+    public <K, V> GlobalKTable<K, V> globalTable(final String topic,
+                                                 final ConsumedInternal<K, V> consumed,
                                                  final String queryableStoreName) {
         final String internalStoreName = queryableStoreName != null ? queryableStoreName : newStoreName(KTableImpl.SOURCE_NAME);
-        return doGlobalTable(keySerde, valSerde, timestampExtractor, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
-            keySerde,
-            valSerde,
+        return doGlobalTable(consumed, topic, new RocksDBKeyValueStoreSupplier<>(internalStoreName,
+            consumed.keySerde(),
+            consumed.valueSerde(),
             false,
             Collections.<String, String>emptyMap(),
             true));
@@ -138,13 +137,11 @@ public class InternalStreamsBuilder {
                                                  final Serde<V> valSerde,
                                                  final String topic,
                                                  final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return doGlobalTable(keySerde, valSerde, null, topic, storeSupplier);
+        return doGlobalTable(new ConsumedInternal<>(keySerde, valSerde, null, null), topic, storeSupplier);
     }
 
     @SuppressWarnings("unchecked")
-    private <K, V> GlobalKTable<K, V> doGlobalTable(final Serde<K> keySerde,
-                                                    final Serde<V> valSerde,
-                                                    final TimestampExtractor timestampExtractor,
+    private <K, V> GlobalKTable<K, V> doGlobalTable(final ConsumedInternal<K, V> consumed,
                                                     final String topic,
                                                     final StateStoreSupplier<KeyValueStore> storeSupplier) {
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
@@ -153,10 +150,17 @@ public class InternalStreamsBuilder {
         final KTableSource<K, V> tableSource = new KTableSource<>(storeSupplier.name());
 
 
-        final Deserializer<K> keyDeserializer = keySerde == null ? null : keySerde.deserializer();
-        final Deserializer<V> valueDeserializer = valSerde == null ? null : valSerde.deserializer();
+        final Deserializer<K> keyDeserializer = consumed.keySerde() == null ? null : consumed.keySerde().deserializer();
+        final Deserializer<V> valueDeserializer = consumed.valueSerde() == null ? null : consumed.valueSerde().deserializer();
 
-        internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer, valueDeserializer, topic, processorName, tableSource);
+        internalTopologyBuilder.addGlobalStore(storeSupplier,
+                                               sourceName,
+                                               consumed.timestampExtractor(),
+                                               keyDeserializer,
+                                               valueDeserializer,
+                                               topic,
+                                               processorName,
+                                               tableSource);
         return new GlobalKTableImpl(new KTableSourceValueGetterSupplier<>(storeSupplier.name()));
     }
 
@@ -199,5 +203,4 @@ public class InternalStreamsBuilder {
         internalTopologyBuilder.addGlobalStore(storeSupplier, sourceName, timestampExtractor, keyDeserializer,
             valueDeserializer, topic, processorName, stateUpdateSupplier);
     }
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 8aa7c58..93fe5b3 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -380,14 +380,17 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
     public KStream<K, V> through(final Serde<K> keySerde,
                                  final Serde<V> valSerde,
                                  final StreamPartitioner<? super K, ? super V> partitioner, String topic) {
-
         return through(topic, Produced.with(keySerde, valSerde, partitioner));
     }
 
     @Override
     public KStream<K, V> through(final String topic, final Produced<K, V> produced) {
         to(topic, produced);
-        return builder.stream(null, new FailOnInvalidTimestamp(), produced.keySerde(), produced.valueSerde(), topic);
+        return builder.stream(Collections.singleton(topic),
+                              new ConsumedInternal<>(produced.keySerde(),
+                                            produced.valueSerde(),
+                                            new FailOnInvalidTimestamp(),
+                                            null));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index aed7fde..0e06944 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -340,7 +340,9 @@ public class KTableImpl<K, S, V> extends AbstractStream<K> implements KTable<K,
 
         to(keySerde, valSerde, partitioner, topic);
 
-        return builder.table(null, new FailOnInvalidTimestamp(), keySerde, valSerde, topic, internalStoreName);
+        return builder.table(topic,
+                             new ConsumedInternal<>(keySerde, valSerde, new FailOnInvalidTimestamp(), null),
+                             internalStoreName);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index f465654..5e5f6c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -374,7 +374,7 @@ public class KafkaStreamsTest {
             final String topic = "input";
             CLUSTER.createTopic(topic);
 
-            builder.stream(Serdes.String(), Serdes.String(), topic)
+            builder.stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
                     .foreach(new ForeachAction<String, String>() {
                         @Override
                         public void apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
index e4ee7c2..dedd157 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.streams;
 
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KStream;
@@ -27,7 +26,9 @@ import org.apache.kafka.test.MockProcessorSupplier;
 import org.junit.Rule;
 import org.junit.Test;
 
+import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Set;
 
 import static org.junit.Assert.assertEquals;
@@ -41,7 +42,7 @@ public class StreamsBuilderTest {
 
     @Test(expected = TopologyException.class)
     public void testFrom() {
-        builder.stream("topic-1", "topic-2");
+        builder.stream(Arrays.asList("topic-1", "topic-2"));
 
         builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3");
     }
@@ -109,12 +110,12 @@ public class StreamsBuilderTest {
 
     @Test(expected = TopologyException.class)
     public void shouldThrowExceptionWhenNoTopicPresent() throws Exception {
-        builder.stream();
+        builder.stream(Collections.<String>emptyList());
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldThrowExceptionWhenTopicNamesAreNull() throws Exception {
-        builder.stream(Serdes.String(), Serdes.String(), null, null);
+        builder.stream(Arrays.<String>asList(null, null));
     }
 
     // TODO: these two static functions are added because some non-TopologyBuilder unit tests need to access the internal topology builder,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index ad0f1c9..7aef5c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -101,7 +102,7 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
         globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
-        stream = builder.stream(Serdes.String(), Serdes.Long(), inputStream);
+        stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long()));
         table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
         foreachAction = new ForeachAction<String, String>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index e070624..5740779 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -24,6 +24,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -100,7 +101,7 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
 
         KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+        stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
                 mapper,

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index b1e4237..3cd8b10 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.common.serialization.LongDeserializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -37,6 +38,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Produced;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -111,12 +113,11 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
 
         final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
-        stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
+        stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String()));
         groupedStream = stream
             .groupBy(
                 mapper,
-                Serdes.String(),
-                Serdes.String());
+                Serialized.with(Serdes.String(), Serdes.String()));
 
         reducer = new Reducer<String>() {
             @Override
@@ -208,7 +209,7 @@ public class KStreamAggregationIntegrationTest {
                     return windowedKey.key() + "@" + windowedKey.window().start();
                 }
             })
-            .to(Serdes.String(), Serdes.String(), outputTopic);
+            .to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
 
         startStreams();
 
@@ -513,7 +514,8 @@ public class KStreamAggregationIntegrationTest {
 
         final Map<Windowed<String>, Long> results = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
-        builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
+
+        builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .count(SessionWindows.with(sessionGap).until(maintainMillis), "UserSessionsStore")
                 .toStream()
@@ -536,6 +538,7 @@ public class KStreamAggregationIntegrationTest {
         assertThat(results.get(new Windowed<>("penny", new SessionWindow(t3, t3))), equalTo(1L));
     }
 
+    @SuppressWarnings("deprecation")
     @Test
     public void shouldReduceSessionWindows() throws Exception {
         final long sessionGap = 1000L; // something to do with time
@@ -600,7 +603,7 @@ public class KStreamAggregationIntegrationTest {
         final Map<Windowed<String>, String> results = new HashMap<>();
         final CountDownLatch latch = new CountDownLatch(11);
         final String userSessionsStore = "UserSessionsStore";
-        builder.stream(Serdes.String(), Serdes.String(), userSessionsStream)
+        builder.stream(userSessionsStream, Consumed.with(Serdes.String(), Serdes.String()))
                 .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
                 .reduce(new Reducer<String>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/d0ee6ed3/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index bc99d64..8dc22ce 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -197,7 +198,7 @@ public class KStreamKTableJoinIntegrationTest {
         //
         // Because this is a KStream ("record stream"), multiple records for the same user will be
         // considered as separate click-count events, each of which will be added to the total count.
-        final KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, userClicksTopic);
+        final KStream<String, Long> userClicksStream = builder.stream(userClicksTopic, Consumed.with(Serdes.String(), Serdes.Long()));
         // This KTable contains information such as "alice" -> "europe".
         //
         // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a


Mime
View raw message