kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/2] kafka git commit: KAFKA-3914: Global discovery of state stores
Date Wed, 10 Aug 2016 21:25:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk caa9bd0fc -> 68b5d014f


http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index 21de73a..9d261bb 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.utils.SystemTime;
 import org.apache.kafka.common.utils.Utils;
@@ -31,10 +32,12 @@ import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
 import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo;
+import org.apache.kafka.streams.state.HostInfo;
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -85,6 +88,7 @@ public class StreamPartitionAssignorTest {
     private final TaskId task1 = new TaskId(0, 1);
     private final TaskId task2 = new TaskId(0, 2);
     private final TaskId task3 = new TaskId(0, 3);
+    private String userEndPoint = null;
 
     private Properties configProps() {
         return new Properties() {
@@ -115,7 +119,7 @@ public class StreamPartitionAssignorTest {
 
         String clientId = "client-id";
         UUID processId = UUID.randomUUID();
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             public Set<TaskId> prevTasks() {
                 return prevTasks;
@@ -137,7 +141,7 @@ public class StreamPartitionAssignorTest {
         Set<TaskId> standbyTasks = new HashSet<>(cachedTasks);
         standbyTasks.removeAll(prevTasks);
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks, null);
         assertEquals(info.encode(), subscription.userData());
     }
 
@@ -163,18 +167,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -229,18 +233,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -297,18 +301,18 @@ public class StreamPartitionAssignorTest {
         String client1 = "client1";
 
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet(), userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -352,18 +356,18 @@ public class StreamPartitionAssignorTest {
         UUID uuid2 = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1));
 
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10, userEndPoint).encode()));
         subscriptions.put("consumer11",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11, userEndPoint).encode()));
         subscriptions.put("consumer20",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -454,7 +458,7 @@ public class StreamPartitionAssignorTest {
         UUID uuid = UUID.randomUUID();
         String client1 = "client1";
 
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime());
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1));
@@ -464,7 +468,7 @@ public class StreamPartitionAssignorTest {
         standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0)));
         standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0)));
 
-        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks);
+        AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
         PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode());
         partitionAssignor.onAssignment(assignment);
 
@@ -493,7 +497,7 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -503,7 +507,7 @@ public class StreamPartitionAssignorTest {
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
         subscriptions.put("consumer10",
-                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
         partitionAssignor.assign(metadata, subscriptions);
 
@@ -535,7 +539,7 @@ public class StreamPartitionAssignorTest {
 
         MockClientSupplier clientSupplier = new MockClientSupplier();
 
-        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime());
+        StreamThread thread10 = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
 
         StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1));
@@ -545,7 +549,7 @@ public class StreamPartitionAssignorTest {
         Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
         Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
         subscriptions.put("consumer10",
-                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode()));
+                          new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
         Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
 
@@ -554,6 +558,139 @@ public class StreamPartitionAssignorTest {
         assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicZ"));
     }
 
+    @Test
+    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
+        final StreamsConfig config = new StreamsConfig(properties);
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+        builder.addSource("source", "input");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addSink("sink", "output", "processor");
+
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("input"));
+        final SubscriptionInfo subscriptionInfo = SubscriptionInfo.decode(subscription.userData());
+        assertEquals("localhost:8080", subscriptionInfo.userEndPoint);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost:8080";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+        builder.addSource("source", "topic1");
+        builder.addProcessor("processor", new MockProcessorSupplier(), "source");
+        builder.addSink("sink", "output", "processor");
+
+        final List<String> topics = Utils.mkList("topic1");
+
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1, new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.<TaskId>emptySet();
+        subscriptions.put("consumer1",
+                new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, myEndPoint).encode()));
+
+        final Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        final PartitionAssignor.Assignment consumerAssignment = assignments.get("consumer1");
+        final AssignmentInfo assignmentInfo = AssignmentInfo.decode(consumerAssignment.userData());
+        final Set<TopicPartition> topicPartitions = assignmentInfo.partitionsByHostState.get(new HostInfo("localhost", 8080));
+        assertEquals(Utils.mkSet(new TopicPartition("topic1", 0),
+                new TopicPartition("topic1", 1),
+                new TopicPartition("topic1", 2)), topicPartitions);
+    }
+
+    @Test
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+                                                           new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+        try {
+            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+            Assert.fail("expected to an exception due to invalid config");
+        } catch (ConfigException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+        final Properties properties = configProps();
+        final String myEndPoint = "localhost:j87yhk";
+        properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
+        final StreamsConfig config = new StreamsConfig(properties);
+        final UUID uuid1 = UUID.randomUUID();
+        final String client1 = "client1";
+        final TopologyBuilder builder = new TopologyBuilder();
+        final String applicationId = "application-id";
+        builder.setApplicationId(applicationId);
+
+        final MockClientSupplier clientSupplier = new MockClientSupplier();
+
+        final StreamThread streamThread = new StreamThread(builder, config, clientSupplier, applicationId, client1, uuid1,
+                                                           new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+
+        try {
+            partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1));
+            Assert.fail("expected to an exception due to invalid config");
+        } catch (ConfigException e) {
+            // pass
+        }
+    }
+
+    @Test
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
+        final Map<HostInfo, Set<TopicPartition>> hostState =
+                Collections.singletonMap(new HostInfo("localhost", 80),
+                        Collections.singleton(new TopicPartition("topic", 0)));
+        AssignmentInfo assignmentInfo = new AssignmentInfo(Collections.singletonList(new TaskId(0, 0)),
+                Collections.<TaskId, Set<TopicPartition>>emptyMap(),
+                hostState);
+        partitionAssignor.onAssignment(new PartitionAssignor.Assignment(topic, assignmentInfo.encode()));
+        assertEquals(hostState, partitionAssignor.getPartitionsByHostState());
+    }
+
     private class MockInternalTopicManager extends InternalTopicManager {
 
         public Map<String, Integer> readyTopics = new HashMap<>();

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index d1aaa07..3a90ce3 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -159,7 +159,7 @@ public class StreamThreadTest {
         builder.addSource("source3", "topic3");
         builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3");
 
-        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime()) {
+        StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId, processId, new Metrics(), new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) {
                 ProcessorTopology topology = builder.build("X", id.topicGroupId);
@@ -278,7 +278,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
                 @Override
                 public void maybeClean() {
                     super.maybeClean();
@@ -397,7 +397,7 @@ public class StreamThreadTest {
             TopologyBuilder builder = new TopologyBuilder();
             builder.addSource("source1", "topic1");
 
-            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime) {
+            StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), applicationId, clientId,  processId, new Metrics(), mockTime, new StreamsMetadataState(builder)) {
                 @Override
                 public void maybeCommit() {
                     super.maybeCommit();
@@ -469,7 +469,7 @@ public class StreamThreadTest {
         StreamsConfig config = new StreamsConfig(configProps());
         MockClientSupplier clientSupplier = new MockClientSupplier();
         StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId,
-                                               clientId,  processId, new Metrics(), new MockTime());
+                                               clientId,  processId, new Metrics(), new MockTime(), new StreamsMetadataState(builder));
         assertSame(clientSupplier.producer, thread.producer);
         assertSame(clientSupplier.consumer, thread.consumer);
         assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer);

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
new file mode 100644
index 0000000..d110277
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.Predicate;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.streams.processor.StreamPartitioner;
+import org.apache.kafka.streams.state.HostInfo;
+import org.apache.kafka.streams.state.StreamsMetadata;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class StreamsMetadataStateTest {
+
+    private StreamsMetadataState discovery;
+    private HostInfo hostOne;
+    private HostInfo hostTwo;
+    private HostInfo hostThree;
+    private TopicPartition topic1P0;
+    private TopicPartition topic2P0;
+    private TopicPartition topic3P0;
+    private Map<HostInfo, Set<TopicPartition>> hostToPartitions;
+    private KStreamBuilder builder;
+    private TopicPartition topic1P1;
+    private TopicPartition topic2P1;
+    private TopicPartition topic4P0;
+    private List<PartitionInfo> partitionInfos;
+    private Cluster cluster;
+
+    @Before
+    public void before() {
+        builder = new KStreamBuilder();
+        final KStream<Object, Object> one = builder.stream("topic-one");
+        one.groupByKey().count("table-one");
+
+        final KStream<Object, Object> two = builder.stream("topic-two");
+        two.groupByKey().count("table-two");
+
+        builder.stream("topic-three")
+                .groupByKey()
+                .count("table-three");
+
+        builder.merge(one, two).groupByKey().count("merged-table");
+
+        builder.stream("topic-four").mapValues(new ValueMapper<Object, Object>() {
+            @Override
+            public Object apply(final Object value) {
+                return value;
+            }
+        });
+
+        builder.setApplicationId("appId");
+
+        topic1P0 = new TopicPartition("topic-one", 0);
+        topic1P1 = new TopicPartition("topic-one", 1);
+        topic2P0 = new TopicPartition("topic-two", 0);
+        topic2P1 = new TopicPartition("topic-two", 1);
+        topic3P0 = new TopicPartition("topic-three", 0);
+        topic4P0 = new TopicPartition("topic-four", 0);
+
+        hostOne = new HostInfo("host-one", 8080);
+        hostTwo = new HostInfo("host-two", 9090);
+        hostThree = new HostInfo("host-three", 7070);
+        hostToPartitions = new HashMap<>();
+        hostToPartitions.put(hostOne, Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1));
+        hostToPartitions.put(hostThree, Collections.singleton(topic3P0));
+
+        partitionInfos = Arrays.asList(
+                new PartitionInfo("topic-one", 0, null, null, null),
+                new PartitionInfo("topic-one", 1, null, null, null),
+                new PartitionInfo("topic-two", 0, null, null, null),
+                new PartitionInfo("topic-two", 1, null, null, null),
+                new PartitionInfo("topic-three", 0, null, null, null),
+                new PartitionInfo("topic-four", 0, null, null, null));
+
+        cluster = new Cluster(Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet());
+        discovery = new StreamsMetadataState(builder);
+        discovery.onChange(hostToPartitions, cluster);
+    }
+
+    @Test
+    public void shouldGetAllStreamInstances() throws Exception {
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+                Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1));
+        final StreamsMetadata three = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+                Collections.singleton(topic3P0));
+
+        Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        assertEquals(3, actual.size());
+        assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+        assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+        assertTrue("expected " + actual + " to contain " + three, actual.contains(three));
+    }
+
+    @Test
+    public void shouldGetAllStreamsInstancesWithNoStores() throws Exception {
+        builder.stream("topic-five").filter(new Predicate<Object, Object>() {
+            @Override
+            public boolean test(final Object key, final Object value) {
+                return true;
+            }
+        }).to("some-other-topic");
+
+        final TopicPartition tp5 = new TopicPartition("topic-five", 1);
+        final HostInfo hostFour = new HostInfo("host-four", 8080);
+        hostToPartitions.put(hostFour, Utils.mkSet(tp5));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp5, new PartitionInfo("topic-five", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostFour, Collections.<String>emptySet(),
+                Collections.singleton(tp5));
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadata();
+        assertTrue("expected " + actual + " to contain " + expected, actual.contains(expected));
+    }
+
+    @Test
+    public void shouldGetInstancesForStoreName() throws Exception {
+        final StreamsMetadata one = new StreamsMetadata(hostOne, Utils.mkSet("table-one", "table-two", "merged-table"),
+                Utils.mkSet(topic1P0, topic2P1, topic4P0));
+        final StreamsMetadata two = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1));
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("table-one");
+        assertEquals(2, actual.size());
+        assertTrue("expected " + actual + " to contain " + one, actual.contains(one));
+        assertTrue("expected " + actual + " to contain " + two, actual.contains(two));
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStoreNameIsNullOnGetAllInstancesWithStore() throws Exception {
+        discovery.getAllMetadataForStore(null);
+    }
+
+    @Test
+    public void shouldReturnEmptyCollectionOnGetAllInstancesWithStoreWhenStoreDoesntExist() throws Exception {
+        final Collection<StreamsMetadata> actual = discovery.getAllMetadataForStore("not-a-store");
+        assertTrue(actual.isEmpty());
+    }
+
+    @Test
+    public void shouldGetInstanceWithKey() throws Exception {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostThree, Collections.singleton("table-three"),
+                Collections.singleton(topic3P0));
+
+        final StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key",
+                Serdes.String().serializer());
+
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void shouldGetInstanceWithKeyAndCustomPartitioner() throws Exception {
+        final TopicPartition tp4 = new TopicPartition("topic-three", 1);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, tp4));
+
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(tp4, new PartitionInfo("topic-three", 1, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-three", "merged-table"),
+                Utils.mkSet(topic2P0, tp4));
+
+        StreamsMetadata actual = discovery.getMetadataWithKey("table-three", "the-key", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 1;
+            }
+        });
+        assertEquals(expected, actual);
+    }
+
+    @Test
+    public void shouldGetInstanceWithKeyWithMergedStreams() throws Exception {
+        final TopicPartition topic2P2 = new TopicPartition("topic-two", 2);
+        hostToPartitions.put(hostTwo, Utils.mkSet(topic2P0, topic1P1, topic2P2));
+        discovery.onChange(hostToPartitions, cluster.withPartitions(Collections.singletonMap(topic2P2, new PartitionInfo("topic-two", 2, null, null, null))));
+
+        final StreamsMetadata expected = new StreamsMetadata(hostTwo, Utils.mkSet("table-two", "table-one", "merged-table"),
+                Utils.mkSet(topic2P0, topic1P1, topic2P2));
+
+        final StreamsMetadata actual = discovery.getMetadataWithKey("merged-table", "123", new StreamPartitioner<String, Object>() {
+            @Override
+            public Integer partition(final String key, final Object value, final int numPartitions) {
+                return 2;
+            }
+        });
+
+        assertEquals(expected, actual);
+
+    }
+
+    @Test
+    public void shouldReturnNullOnGetWithKeyWhenStoreDoesntExist() throws Exception {
+        final StreamsMetadata actual = discovery.getMetadataWithKey("not-a-store",
+                "key",
+                Serdes.String().serializer());
+        assertNull(actual);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowWhenKeyIsNull() throws Exception {
+        discovery.getMetadataWithKey("table-three", null, Serdes.String().serializer());
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowWhenSerializerIsNull() throws Exception {
+        discovery.getMetadataWithKey("table-three", "key", (Serializer) null);
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStoreNameIsNull() throws Exception {
+        discovery.getMetadataWithKey(null, "key", Serdes.String().serializer());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowIfStreamPartitionerIsNull() throws Exception {
+        discovery.getMetadataWithKey(null, "key", (StreamPartitioner) null);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
deleted file mode 100644
index 14a7f9a..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssginmentInfoTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.processor.internals.assignment;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.processor.TaskId;
-import org.junit.Test;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import static org.junit.Assert.assertEquals;
-
-public class AssginmentInfoTest {
-
-    @Test
-    public void testEncodeDecode() {
-        List<TaskId> activeTasks =
-                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
-        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
-
-        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
-        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
-
-        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks);
-        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
-
-        assertEquals(info, decoded);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
new file mode 100644
index 0000000..ce94a23
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals.assignment;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.state.HostInfo;
+import org.junit.Test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+
+public class AssignmentInfoTest {
+
+    @Test
+    public void testEncodeDecode() {
+        List<TaskId> activeTasks =
+                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+
+        AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>());
+        AssignmentInfo decoded = AssignmentInfo.decode(info.encode());
+
+        assertEquals(info, decoded);
+    }
+
+    @Test
+    public void shouldDecodePreviousVersion() throws Exception {
+        List<TaskId> activeTasks =
+                Arrays.asList(new TaskId(0, 0), new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0));
+        Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        standbyTasks.put(new TaskId(1, 1), Utils.mkSet(new TopicPartition("t1", 1), new TopicPartition("t2", 1)));
+        standbyTasks.put(new TaskId(2, 0), Utils.mkSet(new TopicPartition("t3", 0), new TopicPartition("t3", 0)));
+        final AssignmentInfo oldVersion = new AssignmentInfo(1, activeTasks, standbyTasks, null);
+        final AssignmentInfo decoded = AssignmentInfo.decode(encodeV1(oldVersion));
+        assertEquals(oldVersion.activeTasks, decoded.activeTasks);
+        assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
+        assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
+        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+    }
+
+
+    /**
+     * This is a clone of what the V1 encoding did. The encode method has changed for V2
+     * so it is impossible to test compatibility without having this
+     */
+    private ByteBuffer encodeV1(AssignmentInfo oldVersion) throws IOException {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        DataOutputStream out = new DataOutputStream(baos);
+        // Encode version
+        out.writeInt(oldVersion.version);
+        // Encode active tasks
+        out.writeInt(oldVersion.activeTasks.size());
+        for (TaskId id : oldVersion.activeTasks) {
+            id.writeTo(out);
+        }
+        // Encode standby tasks
+        out.writeInt(oldVersion.standbyTasks.size());
+        for (Map.Entry<TaskId, Set<TopicPartition>> entry : oldVersion.standbyTasks.entrySet()) {
+            TaskId id = entry.getKey();
+            id.writeTo(out);
+
+            Set<TopicPartition> partitions = entry.getValue();
+            out.writeInt(partitions.size());
+            for (TopicPartition partition : partitions) {
+                out.writeUTF(partition.topic());
+                out.writeInt(partition.partition());
+            }
+        }
+
+        out.flush();
+        out.close();
+
+        return ByteBuffer.wrap(baos.toByteArray());
+
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
index 3119bee..cf6d8c6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfoTest.java
@@ -20,12 +20,15 @@ package org.apache.kafka.streams.processor.internals.assignment;
 import org.apache.kafka.streams.processor.TaskId;
 import org.junit.Test;
 
+import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 import java.util.UUID;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
 
 public class SubscriptionInfoTest {
 
@@ -38,10 +41,62 @@ public class SubscriptionInfoTest {
         Set<TaskId> standbyTasks =
                 new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
 
-        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks);
+        SubscriptionInfo info = new SubscriptionInfo(processId, activeTasks, standbyTasks, null);
         SubscriptionInfo decoded = SubscriptionInfo.decode(info.encode());
 
         assertEquals(info, decoded);
     }
 
+    @Test
+    public void shouldEncodeDecodeWithUserEndPoint() throws Exception {
+        SubscriptionInfo original = new SubscriptionInfo(UUID.randomUUID(),
+                Collections.singleton(new TaskId(0, 0)), Collections.<TaskId>emptySet(), "localhost:80");
+        SubscriptionInfo decoded = SubscriptionInfo.decode(original.encode());
+        assertEquals(original, decoded);
+    }
+
+    @Test
+    public void shouldBeBackwardCompatible() throws Exception {
+        UUID processId = UUID.randomUUID();
+
+        Set<TaskId> activeTasks =
+                new HashSet<>(Arrays.asList(new TaskId(0, 0), new TaskId(0, 1), new TaskId(1, 0)));
+        Set<TaskId> standbyTasks =
+                new HashSet<>(Arrays.asList(new TaskId(1, 1), new TaskId(2, 0)));
+
+        final ByteBuffer v1Encoding = encodePreviousVersion(processId, activeTasks, standbyTasks);
+        final SubscriptionInfo decode = SubscriptionInfo.decode(v1Encoding);
+        assertEquals(activeTasks, decode.prevTasks);
+        assertEquals(standbyTasks, decode.standbyTasks);
+        assertEquals(processId, decode.processId);
+        assertNull(decode.userEndPoint);
+
+    }
+
+
+    /**
+     * This is a clone of what the V1 encoding did. The encode method has changed for V2
+     * so it is impossible to test compatibility without having this
+     */
+    private ByteBuffer encodePreviousVersion(UUID processId,  Set<TaskId> prevTasks, Set<TaskId> standbyTasks) {
+        ByteBuffer buf = ByteBuffer.allocate(4 /* version */ + 16 /* process id */ + 4 + prevTasks.size() * 8 + 4 + standbyTasks.size() * 8);
+        // version
+        buf.putInt(1);
+        // encode client UUID
+        buf.putLong(processId.getMostSignificantBits());
+        buf.putLong(processId.getLeastSignificantBits());
+        // encode ids of previously running tasks
+        buf.putInt(prevTasks.size());
+        for (TaskId id : prevTasks) {
+            id.writeTo(buf);
+        }
+        // encode ids of cached tasks
+        buf.putInt(standbyTasks.size());
+        for (TaskId id : standbyTasks) {
+            id.writeTo(buf);
+        }
+        buf.rewind();
+
+        return buf;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/68b5d014/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
index a112a5a..0416e40 100644
--- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java
@@ -29,6 +29,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology;
 import org.apache.kafka.streams.processor.internals.StateDirectory;
 import org.apache.kafka.streams.processor.internals.StreamTask;
 import org.apache.kafka.streams.processor.internals.StreamThread;
+import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
@@ -66,7 +67,7 @@ public class StreamThreadStateStoreProviderTest {
     public void before() throws IOException {
         final TopologyBuilder builder = new TopologyBuilder();
         builder.addSource("the-source", "the-source");
-        builder.addProcessor("the-processor", new MockProcessorSupplier());
+        builder.addProcessor("the-processor", new MockProcessorSupplier(), "the-source");
         builder.addStateStore(Stores.create("kv-store")
                                   .withStringKeys()
                                   .withStringValues().inMemory().build(), "the-processor");
@@ -106,7 +107,7 @@ public class StreamThreadStateStoreProviderTest {
         thread = new StreamThread(builder, streamsConfig, clientSupplier,
                                   applicationId,
                                   "clientId", UUID.randomUUID(), new Metrics(),
-                                  new SystemTime()) {
+                                  new SystemTime(), new StreamsMetadataState(builder)) {
             @Override
             public Map<TaskId, StreamTask> tasks() {
                 return tasks;


Mime
View raw message