kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject [1/2] kafka git commit: KAFKA-5873; add materialized overloads to StreamsBuilder
Date Mon, 18 Sep 2017 14:53:49 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 52d7b6763 -> f2b74aa1c


http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
index cbf2b56..0bdd3a3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/GlobalKTableIntegrationTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -33,7 +34,9 @@ import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.test.IntegrationTest;
@@ -101,9 +104,13 @@ public class GlobalKTableIntegrationTest {
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
         streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 100);
-        globalTable = builder.globalTable(Serdes.Long(), Serdes.String(), null, globalOne, globalStore);
-        stream = builder.stream(inputStream, Consumed.with(Serdes.String(), Serdes.Long()));
-        table = builder.table(Serdes.String(), Serdes.Long(), inputTable, "table");
+        globalTable = builder.globalTable(globalOne, Consumed.with(Serdes.Long(), Serdes.String()),
+                                          Materialized.<Long, String, KeyValueStore<Bytes, byte[]>>as(globalStore)
+                                                  .withKeySerde(Serdes.Long())
+                                                  .withValueSerde(Serdes.String()));
+        final Consumed<String, Long> stringLongConsumed = Consumed.with(Serdes.String(), Serdes.Long());
+        stream = builder.stream(inputStream, stringLongConsumed);
+        table = builder.table(inputTable, stringLongConsumed);
         foreachAction = new ForeachAction<String, String>() {
             @Override
             public void apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
index 3a771c4..faa581b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -128,8 +128,8 @@ public class JoinIntegrationTest {
         CLUSTER.createTopics(INPUT_TOPIC_1, INPUT_TOPIC_2, OUTPUT_TOPIC);
 
         builder = new StreamsBuilder();
-        leftTable = builder.table(INPUT_TOPIC_1, "leftTable");
-        rightTable = builder.table(INPUT_TOPIC_2, "rightTable");
+        leftTable = builder.table(INPUT_TOPIC_1);
+        rightTable = builder.table(INPUT_TOPIC_2);
         leftStream = leftTable.toStream();
         rightStream = rightTable.toStream();
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index a433667..8d4299b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -211,7 +211,8 @@ public class KStreamKTableJoinIntegrationTest {
         // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
         // (which overrides her previous region value of "asia").
         final KTable<String, String> userRegionsTable =
-            builder.table(stringSerde, stringSerde, userRegionsTopic, userRegionsStoreName);
+            builder.table(userRegionsTopic,
+                          Consumed.with(Serdes.String(), Serdes.String()));
 
 
         // Compute the number of clicks per region, e.g. "europe" -> 13L.

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
index 1b45711..a12ffac 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableJoinIntegrationTest.java
@@ -343,9 +343,9 @@ public class KTableKTableJoinIntegrationTest {
     private KafkaStreams prepareTopology(final JoinType joinType1, final JoinType joinType2, final String queryableName) {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<String, String> table1 = builder.table(TABLE_1, TABLE_1);
-        final KTable<String, String> table2 = builder.table(TABLE_2, TABLE_2);
-        final KTable<String, String> table3 = builder.table(TABLE_3, TABLE_3);
+        final KTable<String, String> table1 = builder.table(TABLE_1);
+        final KTable<String, String> table2 = builder.table(TABLE_2);
+        final KTable<String, String> table3 = builder.table(TABLE_3);
 
         Materialized<String, String, KeyValueStore<Bytes, byte[]>> materialized = null;
         if (queryableName != null) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
index 69c42fe..31b7222 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RestoreIntegrationTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.IntegerSerializer;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
@@ -106,7 +107,7 @@ public class RestoreIntegrationTest {
 
         createStateForRestoration();
 
-        builder.table(Serdes.Integer(), Serdes.Integer(), inputStream, "store")
+        builder.table(inputStream, Consumed.with(Serdes.Integer(), Serdes.Integer()))
                 .toStream()
                 .foreach(new ForeachAction<Integer, Integer>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
index 0ee74b8..0f8109d 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/GlobalKTableJoinsTest.java
@@ -54,8 +54,9 @@ public class GlobalKTableJoinsTest {
     @Before
     public void setUp() {
         stateDir = TestUtils.tempDirectory();
-        global = builder.globalTable(Serdes.String(), Serdes.String(), null, globalTopic, "global-store");
-        stream = builder.stream(streamTopic, Consumed.with(Serdes.String(), Serdes.String()));
+        final Consumed<String, String> consumed = Consumed.with(Serdes.String(), Serdes.String());
+        global = builder.globalTable(globalTopic, consumed);
+        stream = builder.stream(streamTopic, consumed);
         keyValueMapper = new KeyValueMapper<String, String, String>() {
             @Override
             public String apply(final String key, final String value) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
index 1a2dc13..494e197 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java
@@ -16,17 +16,20 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.kstream.GlobalKTable;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.StateStore;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
 import org.apache.kafka.streams.processor.internals.ProcessorTopology;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.MockTimestampExtractor;
@@ -58,6 +61,8 @@ public class InternalStreamsBuilderTest {
 
     private KStreamTestDriver driver = null;
     private final ConsumedInternal<String, String> consumed = new ConsumedInternal<>();
+    private MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+            = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("test-store"), false);
 
     @Before
     public void setUp() {
@@ -132,27 +137,30 @@ public class InternalStreamsBuilderTest {
     }
 
     @Test
-    public void shouldStillMaterializeSourceKTableIfStateNameNotSpecified() throws Exception {
-        KTable table1 = builder.table("topic1", consumed, "table1");
-        KTable table2 = builder.table("topic2", consumed, null);
+    public void shouldStillMaterializeSourceKTableIfMaterializedIsntQueryable() throws Exception {
+        KTable table1 = builder.table("topic2",
+                                      consumed,
+                                      new MaterializedInternal<>(
+                                              Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("topic2"),
+                                              false));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.build(null);
 
-        assertEquals(2, topology.stateStores().size());
-        assertEquals("table1", topology.stateStores().get(0).name());
+        assertEquals(1, topology.stateStores().size());
+        assertEquals("topic2", topology.stateStores().get(0).name());
 
-        final String internalStoreName = topology.stateStores().get(1).name();
-        assertTrue(internalStoreName.contains(KTableImpl.STATE_STORE_NAME));
-        assertEquals(2, topology.storeToChangelogTopic().size());
-        assertEquals("topic1", topology.storeToChangelogTopic().get("table1"));
-        assertEquals("topic2", topology.storeToChangelogTopic().get(internalStoreName));
-        assertEquals(table1.queryableStoreName(), "table1");
-        assertNull(table2.queryableStoreName());
+        assertEquals(1, topology.storeToChangelogTopic().size());
+        assertEquals("topic2", topology.storeToChangelogTopic().get("topic2"));
+        assertNull(table1.queryableStoreName());
     }
 
     @Test
     public void shouldBuildSimpleGlobalTableTopology() throws Exception {
-        builder.globalTable("table", consumed, "globalTable");
+        builder.globalTable("table",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalTable"),
+                                    false));
 
         final ProcessorTopology topology = builder.internalTopologyBuilder.buildGlobalStateTopology();
         final List<StateStore> stateStores = topology.globalStateStores();
@@ -173,16 +181,14 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void shouldBuildGlobalTopologyWithAllGlobalTables() throws Exception {
-        builder.globalTable("table", consumed, "globalTable");
-        builder.globalTable("table2", consumed, "globalTable2");
-
-        doBuildGlobalTopologyWithAllGlobalTables();
-    }
-
-    @Test
-    public void shouldBuildGlobalTopologyWithAllGlobalTablesWithInternalStoreName() throws Exception {
-        builder.globalTable("table", consumed, null);
-        builder.globalTable("table2", consumed, null);
+        builder.globalTable("table",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global1")));
+        builder.globalTable("table2",
+                            consumed,
+                            new MaterializedInternal<>(
+                                    Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("global2")));
 
         doBuildGlobalTopologyWithAllGlobalTables();
     }
@@ -191,10 +197,19 @@ public class InternalStreamsBuilderTest {
     public void shouldAddGlobalTablesToEachGroup() throws Exception {
         final String one = "globalTable";
         final String two = "globalTable2";
-        final GlobalKTable<String, String> globalTable = builder.globalTable("table", consumed, one);
-        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2", consumed, two);
 
-        builder.table("not-global", consumed, "not-global");
+        final GlobalKTable<String, String> globalTable = builder.globalTable("table",
+                                                                             consumed,
+                                                                             new MaterializedInternal<>(
+                                                                                     Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(one)));
+        final GlobalKTable<String, String> globalTable2 = builder.globalTable("table2",
+                                                                              consumed,
+                                                                              new MaterializedInternal<>(
+                                                                                      Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(two)));
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("not-global"), false);
+        builder.table("not-global", consumed, materialized);
 
         final KeyValueMapper<String, String, String> kvMapper = new KeyValueMapper<String, String, String>() {
             @Override
@@ -227,7 +242,9 @@ public class InternalStreamsBuilderTest {
     public void shouldMapStateStoresToCorrectSourceTopics() throws Exception {
         final KStream<String, String> playEvents = builder.stream(Collections.singleton("events"), consumed);
 
-        final KTable<String, String> table = builder.table("table-topic", consumed, "table-store");
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, byte[]>> materialized
+                = new MaterializedInternal<>(Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("table-store"), false);
+        final KTable<String, String> table = builder.table("table-topic", consumed, materialized);
         assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store"));
 
         final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
@@ -260,8 +277,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTableToEarliestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
-        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.EARLIEST)), storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.EARLIEST)), materialized);
 
         assertTrue(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
@@ -270,9 +286,7 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldAddTableToLatestAutoOffsetResetList() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
-
-        builder.table(topicName, new ConsumedInternal<>(Consumed.with(AutoOffsetReset.LATEST)), storeName);
+        builder.table(topicName, new ConsumedInternal<>(Consumed.<String, String>with(AutoOffsetReset.LATEST)), materialized);
 
         assertTrue(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -281,9 +295,8 @@ public class InternalStreamsBuilderTest {
     @Test
     public void shouldNotAddTableToOffsetResetLists() {
         final String topicName = "topic-1";
-        final String storeName = "test-store";
 
-        builder.table(topicName, consumed, storeName);
+        builder.table(topicName, consumed, materialized);
 
         assertFalse(builder.internalTopologyBuilder.latestResetTopicsPattern().matcher(topicName).matches());
         assertFalse(builder.internalTopologyBuilder.earliestResetTopicsPattern().matcher(topicName).matches());
@@ -340,15 +353,15 @@ public class InternalStreamsBuilderTest {
 
     @Test
     public void ktableShouldHaveNullTimestampExtractorWhenNoneSupplied() throws Exception {
-        builder.table("topic", consumed, "store");
+        builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertNull(processorTopology.source("topic").getTimestampExtractor());
     }
 
     @Test
     public void ktableShouldUseProvidedTimestampExtractor() throws Exception {
-        final ConsumedInternal consumed = new ConsumedInternal<>(Consumed.with(new MockTimestampExtractor()));
-        builder.table("topic", consumed, "store");
+        final ConsumedInternal<String, String> consumed = new ConsumedInternal<>(Consumed.<String, String>with(new MockTimestampExtractor()));
+        builder.table("topic", consumed, materialized);
         final ProcessorTopology processorTopology = builder.internalTopologyBuilder.build(null);
         assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class));
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
index ff9726e..705cf62 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
@@ -60,7 +61,7 @@ public class KGroupedTableImplTest {
 
     @Before
     public void before() {
-        groupedTable = builder.table(Serdes.String(), Serdes.String(), "blah", "blah")
+        groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String()))
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper());
     }
 
@@ -157,7 +158,11 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic,
+                                                              Consumed.with(Serdes.String(), Serdes.Double()),
+                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                                                                      .withKeySerde(Serdes.String())
+                                                                      .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR, "reduced");
 
@@ -175,7 +180,11 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic,
+                                                              Consumed.with(Serdes.String(), Serdes.Double()),
+                                                              Materialized.<String, Double, KeyValueStore<Bytes, byte[]>>as("store")
+                                                                      .withKeySerde(Serdes.String())
+                                                                      .withValueSerde(Serdes.Double()))
             .groupBy(intProjection)
             .reduce(MockReducer.INTEGER_ADDER, MockReducer.INTEGER_SUBTRACTOR);
 
@@ -194,7 +203,7 @@ public class KGroupedTableImplTest {
                 }
             };
 
-        final KTable<String, Integer> reduced = builder.table(Serdes.String(), Serdes.Double(), topic, "store")
+        final KTable<String, Integer> reduced = builder.table(topic, Consumed.with(Serdes.String(), Serdes.Double()))
                 .groupBy(intProjection)
                 .reduce(MockReducer.INTEGER_ADDER,
                         MockReducer.INTEGER_SUBTRACTOR,
@@ -211,10 +220,10 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldCountAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
         table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
-                         Serialized.with(Serdes.String(),
-                                         Serdes.String()))
+                      Serialized.with(Serdes.String(),
+                                      Serdes.String()))
                 .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
                                .withKeySerde(Serdes.String())
                                .withValueSerde(Serdes.Long()));
@@ -228,7 +237,7 @@ public class KGroupedTableImplTest {
     @SuppressWarnings("unchecked")
     @Test
     public void shouldAggregateAndMaterializeResults() {
-        final KTable<String, String> table = builder.table(Serdes.String(), Serdes.String(), topic, "store");
+        final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String()));
         table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(),
                       Serialized.with(Serdes.String(),
                                       Serdes.String()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
index 5e8687f..be1d865 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
@@ -61,6 +61,7 @@ public class KStreamImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
     final private Serde<Integer> intSerde = Serdes.Integer();
+    private final Consumed<String, String> stringConsumed = Consumed.with(Serdes.String(), Serdes.String());
     private KStream<String, String> testStream;
     private StreamsBuilder builder;
     private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
@@ -361,7 +362,7 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullValueMapperOnTableJoin() {
-        testStream.leftJoin(builder.table(Serdes.String(), Serdes.String(), "topic", "store"), null);
+        testStream.leftJoin(builder.table("topic", stringConsumed), null);
     }
 
     @Test(expected = NullPointerException.class)
@@ -383,14 +384,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnJoinWithGlobalTable() {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.join(builder.globalTable("global", stringConsumed),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() {
-        testStream.join(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.join(builder.globalTable("global", stringConsumed),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }
@@ -404,14 +405,14 @@ public class KStreamImplTest {
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.leftJoin(builder.globalTable("global", stringConsumed),
                         null,
                         MockValueJoiner.TOSTRING_JOINER);
     }
 
     @Test(expected = NullPointerException.class)
     public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() {
-        testStream.leftJoin(builder.globalTable(Serdes.String(), Serdes.String(), null, "global", "global"),
+        testStream.leftJoin(builder.globalTable("global", stringConsumed),
                         MockKeyValueMapper.<String, String>SelectValueMapper(),
                         null);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
index 745ab4e..39b318f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKStreamLeftJoinTest.java
@@ -87,7 +87,7 @@ public class KStreamKStreamLeftJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         // push two items to the primary stream. the other window is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
index d206b02..d1226c2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
@@ -67,8 +67,9 @@ public class KStreamKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
-        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+        final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        stream = builder.stream(topic1, consumed);
+        table = builder.table(topic2, consumed);
         stream.join(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         final Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);
@@ -76,7 +77,7 @@ public class KStreamKTableJoinTest {
         assertEquals(1, copartitionGroups.size());
         assertEquals(new HashSet<>(Arrays.asList(topic1, topic2)), copartitionGroups.iterator().next());
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.Integer(), Serdes.String());
         driver.setTime(0L);
 
         // push two items to the primary stream. the other table is empty

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
index a79184e..ed835a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableLeftJoinTest.java
@@ -68,8 +68,9 @@ public class KStreamKTableLeftJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        stream = builder.stream(topic1, Consumed.with(intSerde, stringSerde));
-        table = builder.table(intSerde, stringSerde, topic2, "anyStoreName");
+        Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
+        stream = builder.stream(topic1, consumed);
+        table = builder.table(topic2, consumed);
         stream.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).process(processor);
 
         Collection<Set<String>> copartitionGroups = StreamsBuilderTest.getCopartitionedGroups(builder);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
index accbb9c..0ae95dd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -50,6 +51,7 @@ import static org.junit.Assert.assertEquals;
 public class KTableAggregateTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     private final Serialized<String, String> stringSerialzied = Serialized.with(stringSerde, stringSerde);
 
     private File stateDir = null;
@@ -70,7 +72,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
@@ -81,7 +83,7 @@ public class KTableAggregateTest {
 
         table2.toStream().process(proc);
 
-        driver.setUp(builder, stateDir);
+        driver.setUp(builder, stateDir, Serdes.String(), Serdes.String());
 
         driver.process(topic1, "A", "1");
         driver.flushState();
@@ -118,7 +120,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(),
                                                        stringSerialzied
         ).aggregate(MockInitializer.STRING_INIT,
@@ -146,7 +148,7 @@ public class KTableAggregateTest {
         final String topic1 = "topic1";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, String> table2 = table1.groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
             @Override
                 public KeyValue<String, String> apply(String key, String value) {
@@ -232,7 +234,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
                 .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
                 .count("count")
                 .toStream()
@@ -247,7 +249,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
             .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
             .count()
             .toStream()
@@ -262,7 +264,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
             .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied)
             .count("count")
             .toStream()
@@ -291,7 +293,7 @@ public class KTableAggregateTest {
         final String input = "count-test-input";
         final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>();
 
-        builder.table(Serdes.String(), Serdes.String(), input, "anyStoreName")
+        builder.table(input, consumed)
                 .groupBy(new KeyValueMapper<String, String, KeyValue<String, String>>() {
 
                     @Override
@@ -348,8 +350,8 @@ public class KTableAggregateTest {
         final String reduceTopic = "TestDriver-reducer-store-repartition";
         final Map<String, Long> reduceResults = new HashMap<>();
 
-        final KTable<String, String> one = builder.table(Serdes.String(), Serdes.String(), tableOne, tableOne);
-        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
+        final KTable<String, String> one = builder.table(tableOne, consumed);
+        final KTable<Long, String> two = builder.table(tableTwo, Consumed.with(Serdes.Long(), Serdes.String()));
 
 
         final KTable<String, Long> reduce = two.groupBy(new KeyValueMapper<Long, String, KeyValue<String, Long>>() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
index a885edd..7986277 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@ public class KTableFilterTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, Integer> consumed = Consumed.with(stringSerde, intSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -79,7 +81,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -104,7 +106,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -128,7 +130,7 @@ public class KTableFilterTest {
 
         final String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, consumed);
 
         KTable<String, Integer> table2 = table1.filter(new Predicate<String, Integer>() {
             @Override
@@ -213,7 +215,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -239,7 +241,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -304,7 +306,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -323,7 +325,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -382,7 +384,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-                (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+                (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
                 new Predicate<String, Integer>() {
                     @Override
@@ -401,7 +403,7 @@ public class KTableFilterTest {
         String topic1 = "topic1";
 
         KTableImpl<String, Integer, Integer> table1 =
-            (KTableImpl<String, Integer, Integer>) builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+            (KTableImpl<String, Integer, Integer>) builder.table(topic1, consumed);
         KTableImpl<String, Integer, Integer> table2 = (KTableImpl<String, Integer, Integer>) table1.filter(
             new Predicate<String, Integer>() {
                 @Override
@@ -440,8 +442,9 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
             new Predicate<String, String>() {
                 @Override
@@ -461,8 +464,9 @@ public class KTableFilterTest {
 
         String topic1 = "topic1";
 
+        final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 = (KTableImpl<String, String, String>) table1.filter(
             new Predicate<String, String>() {
                 @Override
@@ -485,7 +489,7 @@ public class KTableFilterTest {
         };
 
         new StreamsBuilder()
-            .<Integer, String>table("empty", "emptyStore")
+            .<Integer, String>table("empty")
             .filter(numberKeyPredicate)
             .filterNot(numberKeyPredicate)
             .to("nirvana");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
index 693aac8..23e0b59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableForeachTest.java
@@ -18,10 +18,14 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.ForeachAction;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.test.KStreamTestDriver;
 import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
@@ -79,7 +83,11 @@ public class KTableForeachTest {
 
         // When
         StreamsBuilder builder = new StreamsBuilder();
-        KTable<Integer, String> table = builder.table(intSerde, stringSerde, topicName, "anyStoreName");
+        KTable<Integer, String> table = builder.table(topicName,
+                                                      Consumed.with(intSerde, stringSerde),
+                                                      new MaterializedInternal<>(Materialized.<Integer, String, KeyValueStore<Bytes, byte[]>>as(topicName)
+                                                                                         .withKeySerde(intSerde)
+                                                                                         .withValueSerde(stringSerde)));
         table.foreach(action);
 
         // Then
@@ -105,7 +113,7 @@ public class KTableForeachTest {
         };
 
         new StreamsBuilder()
-            .<Integer, String>table("emptyTopic", "emptyStore")
+            .<Integer, String>table("emptyTopic")
             .foreach(consume);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
index 6ca38b8..9d918e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.errors.TopologyException;
 import org.apache.kafka.streams.kstream.KTable;
@@ -54,6 +55,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableImplTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -64,7 +66,7 @@ public class KTableImplTest {
     public void setUp() {
         stateDir = TestUtils.tempDirectory("kafka-test");
         builder = new StreamsBuilder();
-        table = builder.table("test", "test");
+        table = builder.table("test");
     }
 
     @Test
@@ -73,10 +75,9 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, storeName1);
+        KTable<String, String> table1 = builder.table(topic1, consumed);
 
         MockProcessorSupplier<String, String> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -130,11 +131,10 @@ public class KTableImplTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -256,14 +256,12 @@ public class KTableImplTest {
     public void testStateStoreLazyEval() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
-        String storeName2 = "storeName2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
-        builder.table(stringSerde, stringSerde, topic2, storeName2);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
+        builder.table(topic2, consumed);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -291,15 +289,13 @@ public class KTableImplTest {
     public void testStateStore() {
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
-        String storeName2 = "storeName2";
 
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, String> table2 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic2, storeName2);
+                (KTableImpl<String, String, String>) builder.table(topic2, consumed);
 
         KTableImpl<String, String, Integer> table1Mapped = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
@@ -338,7 +334,12 @@ public class KTableImplTest {
         final StreamsBuilder builder = new StreamsBuilder();
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1,
+                                                                   consumed,
+                                                                   Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as(storeName1)
+                                                                           .withKeySerde(stringSerde)
+                                                                           .withValueSerde(stringSerde)
+                );
 
         table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper())
             .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
index 124114b..aeb2418 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -50,6 +51,7 @@ public class KTableKTableJoinTest {
 
     final private Serde<Integer> intSerde = Serdes.Integer();
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -165,8 +167,8 @@ public class KTableKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
@@ -186,8 +188,8 @@ public class KTableKTableJoinTest {
         final MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
         joined.toStream().process(processor);
 
@@ -285,8 +287,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -306,8 +308,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER, Serdes.String(), "anyQueryableName");
         proc = new MockProcessorSupplier<>();
         builder.build().addProcessor("proc", proc, ((KTableImpl<?, ?, ?>) joined).name);
@@ -327,8 +329,8 @@ public class KTableKTableJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.join(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
index 9cdc782..ca0c81c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableLeftJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -60,6 +61,7 @@ public class KTableKTableLeftJoinTest {
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
     @Before
     public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableLeftJoinTest {
 
         final int[] expectedKeys = new int[]{0, 1, 2, 3};
 
-        KTable<Integer, String> table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        KTable<Integer, String> table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        KTable<Integer, String> table1 = builder.table(topic1, consumed);
+        KTable<Integer, String> table2 = builder.table(topic2, consumed);
         KTable<Integer, String> joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         MockProcessorSupplier<Integer, String> processor;
         processor = new MockProcessorSupplier<>();
@@ -171,8 +173,8 @@ public class KTableKTableLeftJoinTest {
         final KTable<Integer, String> joined;
         final MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -252,8 +254,8 @@ public class KTableKTableLeftJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.leftJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();
@@ -341,7 +343,8 @@ public class KTableKTableLeftJoinTest {
         final String[] inputs = {agg, tableOne, tableTwo, tableThree, tableFour, tableFive, tableSix};
 
         final StreamsBuilder builder = new StreamsBuilder();
-        final KTable<Long, String> aggTable = builder.table(Serdes.Long(), Serdes.String(), agg, agg)
+        final Consumed<Long, String> consumed = Consumed.with(Serdes.Long(), Serdes.String());
+        final KTable<Long, String> aggTable = builder.table(agg, consumed)
                 .groupBy(new KeyValueMapper<Long, String, KeyValue<Long, String>>() {
                     @Override
                     public KeyValue<Long, String> apply(final Long key, final String value) {
@@ -349,12 +352,12 @@ public class KTableKTableLeftJoinTest {
                     }
                 }, Serialized.with(Serdes.Long(), Serdes.String())).reduce(MockReducer.STRING_ADDER, MockReducer.STRING_ADDER, "agg-store");
 
-        final KTable<Long, String> one = builder.table(Serdes.Long(), Serdes.String(), tableOne, tableOne);
-        final KTable<Long, String> two = builder.table(Serdes.Long(), Serdes.String(), tableTwo, tableTwo);
-        final KTable<Long, String> three = builder.table(Serdes.Long(), Serdes.String(), tableThree, tableThree);
-        final KTable<Long, String> four = builder.table(Serdes.Long(), Serdes.String(), tableFour, tableFour);
-        final KTable<Long, String> five = builder.table(Serdes.Long(), Serdes.String(), tableFive, tableFive);
-        final KTable<Long, String> six = builder.table(Serdes.Long(), Serdes.String(), tableSix, tableSix);
+        final KTable<Long, String> one = builder.table(tableOne, consumed);
+        final KTable<Long, String> two = builder.table(tableTwo, consumed);
+        final KTable<Long, String> three = builder.table(tableThree, consumed);
+        final KTable<Long, String> four = builder.table(tableFour, consumed);
+        final KTable<Long, String> five = builder.table(tableFive, consumed);
+        final KTable<Long, String> six = builder.table(tableSix, consumed);
 
         final ValueMapper<String, String> mapper = new ValueMapper<String, String>() {
             @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
index 368a3ea..d6ab613 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableKTableOuterJoinTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
@@ -54,6 +55,7 @@ public class KTableKTableOuterJoinTest {
     private File stateDir = null;
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
+    private final Consumed<Integer, String> consumed = Consumed.with(intSerde, stringSerde);
 
     @Before
     public void setUp() throws IOException {
@@ -72,8 +74,8 @@ public class KTableKTableOuterJoinTest {
         MockProcessorSupplier<Integer, String> processor;
 
         processor = new MockProcessorSupplier<>();
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
         joined.toStream().process(processor);
 
@@ -176,8 +178,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         proc = new MockProcessorSupplier<>();
@@ -264,8 +266,8 @@ public class KTableKTableOuterJoinTest {
         KTable<Integer, String> joined;
         MockProcessorSupplier<Integer, String> proc;
 
-        table1 = builder.table(intSerde, stringSerde, topic1, storeName1);
-        table2 = builder.table(intSerde, stringSerde, topic2, storeName2);
+        table1 = builder.table(topic1, consumed);
+        table2 = builder.table(topic2, consumed);
         joined = table1.outerJoin(table2, MockValueJoiner.TOSTRING_JOINER);
 
         ((KTableImpl<?, ?, ?>) joined).enableSendingOldValues();

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
index 756404f..81797cb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapKeysTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
@@ -57,7 +58,7 @@ public class KTableMapKeysTest {
 
         String topic1 = "topic_map_keys";
 
-        KTable<Integer, String> table1 = builder.table(integerSerde, stringSerde, topic1, "anyStoreName");
+        KTable<Integer, String> table1 = builder.table(topic1, Consumed.with(integerSerde, stringSerde));
 
         final Map<Integer, String> keyMap = new HashMap<>();
         keyMap.put(1, "ONE");

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
index 4bfaea6..5d92846 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableMapValuesTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -43,6 +44,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableMapValuesTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> consumed = Consumed.with(stringSerde, stringSerde);
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
     private File stateDir = null;
@@ -69,7 +71,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
@@ -89,7 +91,7 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
 
-        KTable<String, String> table1 = builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTable<String, String> table1 = builder.table(topic1, consumed);
         KTable<String, Integer> table2 = table1.mapValues(new ValueMapper<CharSequence, Integer>() {
             @Override
             public Integer apply(CharSequence value) {
@@ -210,11 +212,10 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -241,11 +242,10 @@ public class KTableMapValuesTest {
 
         String topic1 = "topic1";
         String topic2 = "topic2";
-        String storeName1 = "storeName1";
         String storeName2 = "storeName2";
 
         KTableImpl<String, String, String> table1 =
-            (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, storeName1);
+            (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
             new ValueMapper<String, Integer>() {
                 @Override
@@ -273,7 +273,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override
@@ -321,7 +321,7 @@ public class KTableMapValuesTest {
         String topic1 = "topic1";
 
         KTableImpl<String, String, String> table1 =
-                (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+                (KTableImpl<String, String, String>) builder.table(topic1, consumed);
         KTableImpl<String, String, Integer> table2 = (KTableImpl<String, String, Integer>) table1.mapValues(
                 new ValueMapper<String, Integer>() {
                     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
index 3f8a6b2..35a3dbd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableSourceTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.KStreamTestDriver;
@@ -37,6 +38,7 @@ import static org.junit.Assert.assertTrue;
 public class KTableSourceTest {
 
     final private Serde<String> stringSerde = Serdes.String();
+    private final Consumed<String, String> stringConsumed = Consumed.with(stringSerde, stringSerde);
     final private Serde<Integer> intSerde = Serdes.Integer();
     @Rule
     public final KStreamTestDriver driver = new KStreamTestDriver();
@@ -53,7 +55,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTable<String, Integer> table1 = builder.table(stringSerde, intSerde, topic1, "anyStoreName");
+        KTable<String, Integer> table1 = builder.table(topic1, Consumed.with(stringSerde, intSerde));
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
         table1.toStream().process(proc1);
@@ -77,7 +79,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         KTableValueGetterSupplier<String, String> getterSupplier1 = table1.valueGetterSupplier();
 
@@ -121,7 +123,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         MockProcessorSupplier<String, Integer> proc1 = new MockProcessorSupplier<>();
 
@@ -159,7 +161,7 @@ public class KTableSourceTest {
 
         String topic1 = "topic1";
 
-        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(stringSerde, stringSerde, topic1, "anyStoreName");
+        KTableImpl<String, String, String> table1 = (KTableImpl<String, String, String>) builder.table(topic1, stringConsumed);
 
         table1.enableSendingOldValues();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 3cbceeb..1e4afcd 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -45,6 +45,7 @@ import org.apache.kafka.streams.processor.Processor;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.ProcessorSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.test.TestUtils;
 
@@ -686,7 +687,7 @@ public class SimpleBenchmark {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final KStream<Long, byte[]> input1 = builder.stream(kStreamTopic);
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic, kTableTopic + "-store");
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
@@ -697,8 +698,8 @@ public class SimpleBenchmark {
                                                             String kTableTopic2, final CountDownLatch latch) {
         final StreamsBuilder builder = new StreamsBuilder();
 
-        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1, kTableTopic1 + "-store");
-        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2, kTableTopic2 + "-store");
+        final KTable<Long, byte[]> input1 = builder.table(kTableTopic1);
+        final KTable<Long, byte[]> input2 = builder.table(kTableTopic2);
 
         input1.leftJoin(input2, VALUE_JOINER).foreach(new CountDownAction(latch));
 
@@ -725,10 +726,12 @@ public class SimpleBenchmark {
 
         StreamsBuilder builder = new StreamsBuilder();
 
+        final StoreBuilder<KeyValueStore<Integer, byte[]>> storeBuilder
+                = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore("store"), Serdes.Integer(), Serdes.ByteArray());
         if (enableCaching) {
-            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().enableCaching().build());
+            builder.addStateStore(storeBuilder.withCachingEnabled());
         } else {
-            builder.addStateStore(Stores.create("store").withIntegerKeys().withByteArrayValues().persistent().build());
+            builder.addStateStore(storeBuilder);
         }
         KStream<Integer, byte[]> source = builder.stream(topic, Consumed.with(INTEGER_SERDE, BYTE_SERDE));
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
index 0a10f9f..d98fd7f 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/YahooBenchmark.java
@@ -286,8 +286,7 @@ public class YahooBenchmark {
         final KStream<String, ProjectedEvent> kEvents = builder.stream(eventsTopic,
                                                                        Consumed.with(Serdes.String(),
                                                                                      Serdes.serdeFrom(projectedEventSerializer, projectedEventDeserializer)));
-        final KTable<String, String> kCampaigns = builder.table(Serdes.String(), Serdes.String(),
-            campaignsTopic, "campaign-state");
+        final KTable<String, String> kCampaigns = builder.table(campaignsTopic, Consumed.with(Serdes.String(), Serdes.String()));
 
 
         KStream<String, ProjectedEvent> filteredEvents = kEvents

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
index b22c488..f3dbb32 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -22,14 +22,18 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsBuilderTest;
 import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Predicate;
 import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StreamsMetadata;
 import org.junit.Before;
 import org.junit.Test;
@@ -88,7 +92,9 @@ public class StreamsMetadataStateTest {
             }
         });
 
-        builder.globalTable("global-topic", "global-table");
+        builder.globalTable("global-topic",
+                            Consumed.with(null, null),
+                            Materialized.<Object, Object, KeyValueStore<Bytes, byte[]>>as(globalTable));
 
         StreamsBuilderTest.internalTopologyBuilder(builder).setApplicationId("appId");
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/f2b74aa1/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
index c4e108d..4b75702 100644
--- a/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -105,9 +105,9 @@ public class SmokeTestClient extends SmokeTestUtil {
         props.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE);
         props.put(ProducerConfig.ACKS_CONFIG, "all");
 
-
         StreamsBuilder builder = new StreamsBuilder();
-        KStream<String, Integer> source = builder.stream("data", Consumed.with(stringSerde, intSerde));
+        Consumed<String, Integer> stringIntConsumed = Consumed.with(stringSerde, intSerde);
+        KStream<String, Integer> source = builder.stream("data", stringIntConsumed);
         source.to(stringSerde, intSerde, "echo");
         KStream<String, Integer> data = source.filter(new Predicate<String, Integer>() {
             @Override
@@ -141,7 +141,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "min");
 
-        KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
+        KTable<String, Integer> minTable = builder.table("min", stringIntConsumed);
         minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min"));
 
         // max
@@ -163,7 +163,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Integer>()
         ).to(stringSerde, intSerde, "max");
 
-        KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
+        KTable<String, Integer> maxTable = builder.table("max", stringIntConsumed);
         maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max"));
 
         // sum
@@ -186,7 +186,8 @@ public class SmokeTestClient extends SmokeTestUtil {
         ).to(stringSerde, longSerde, "sum");
 
 
-        KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
+        Consumed<String, Long> stringLongConsumed = Consumed.with(stringSerde, longSerde);
+        KTable<String, Long> sumTable = builder.table("sum", stringLongConsumed);
         sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum"));
 
         // cnt
@@ -195,7 +196,7 @@ public class SmokeTestClient extends SmokeTestUtil {
                 new Unwindow<String, Long>()
         ).to(stringSerde, longSerde, "cnt");
 
-        KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
+        KTable<String, Long> cntTable = builder.table("cnt", stringLongConsumed);
         cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt"));
 
         // dif


Mime
View raw message