kafka-jira mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (KAFKA-6054) ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
Date Tue, 27 Mar 2018 04:44:01 GMT

    [ https://issues.apache.org/jira/browse/KAFKA-6054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16415012#comment-16415012 ] 

ASF GitHub Bot commented on KAFKA-6054:
---------------------------------------

mjsax closed pull request #4746: KAFKA-6054: Fix upgrade path from Kafka Streams v0.10.0
URL: https://github.com/apache/kafka/pull/4746
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 1f5140b10c8..77123ff8093 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -73,28 +73,48 @@ do
   fi
 done
 
-for file in "$base_dir"/clients/build/libs/kafka-clients*.jar;
-do
-  if should_include_file "$file"; then
-    CLASSPATH="$CLASSPATH":"$file"
-  fi
-done
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  clients_lib_dir=$(dirname $0)/../clients/build/libs
+  streams_lib_dir=$(dirname $0)/../streams/build/libs
+  rocksdb_lib_dir=$(dirname $0)/../streams/build/dependant-libs-${SCALA_VERSION}
+else
+  clients_lib_dir=/opt/kafka-$UPGRADE_KAFKA_STREAMS_TEST_VERSION/libs
+  streams_lib_dir=$clients_lib_dir
+  rocksdb_lib_dir=$streams_lib_dir
+fi
+
 
-for file in "$base_dir"/streams/build/libs/kafka-streams*.jar;
+for file in "$clients_lib_dir"/kafka-clients*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+for file in "$streams_lib_dir"/kafka-streams*.jar;
 do
   if should_include_file "$file"; then
     CLASSPATH="$CLASSPATH":"$file"
   fi
 done
 
-for file in "$base_dir"/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar;
+if [ -z "$UPGRADE_KAFKA_STREAMS_TEST_VERSION" ]; then
+  for file in "$base_dir"/streams/examples/build/libs/kafka-streams-examples*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+else
+  for file in "$base_dir"/streams/upgrade-system-tests-0100/build/libs/kafka-streams-upgrade-system-tests*.jar;
+  do
+    if should_include_file "$file"; then
+      CLASSPATH="$CLASSPATH":"$file"
+    fi
+  done
+fi
+
+for file in "$rocksdb_lib_dir"/rocksdb*.jar;
 do
   CLASSPATH="$CLASSPATH":"$file"
 done
diff --git a/build.gradle b/build.gradle
index d221d965c5e..2a540211018 100644
--- a/build.gradle
+++ b/build.gradle
@@ -776,6 +776,19 @@ project(':streams:examples') {
   }
 }
 
+project(':streams:upgrade-system-tests-0100') {
+  archivesBaseName = "kafka-streams-upgrade-system-tests-0100"
+
+  dependencies {
+    testCompile libs.kafkaStreams_0100
+  }
+
+  systemTestLibs {
+    dependsOn testJar
+  }
+}
+
+
 project(':log4j-appender') {
   archivesBaseName = "kafka-log4j-appender"
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index 93b92bb52a3..dbbb9127199 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -918,7 +918,7 @@ public void onFailure(RuntimeException e) {
                 log.error("Unexpected interrupt received in heartbeat thread for group {}", groupId, e);
                 this.failed.set(new RuntimeException(e));
             } catch (RuntimeException e) {
-                log.error("Heartbeat thread for group {} failed due to unexpected error" , groupId, e);
+                log.error("Heartbeat thread for group {} failed due to unexpected error", groupId, e);
                 this.failed.set(e);
             }
         }
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index 212d701aaaa..74887483354 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -316,7 +316,7 @@ public int hashCode() {
             Field f = this.schema.get(i);
             if (f.type() instanceof ArrayOf) {
                 if (this.get(f) != null) {
-                    Object[] arrayObject = (Object []) this.get(f);
+                    Object[] arrayObject = (Object[]) this.get(f);
                     for (Object arrayItem: arrayObject)
                         result = prime * result + arrayItem.hashCode();
                 }
diff --git a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
index 8e0b8dbfa3d..b80dfccf3d9 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/authenticator/SaslClientCallbackHandler.java
@@ -17,7 +17,9 @@
  */
 package org.apache.kafka.common.security.authenticator;
 
-import java.util.Map;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.network.Mode;
+import org.apache.kafka.common.security.auth.AuthCallbackHandler;
 
 import javax.security.auth.Subject;
 import javax.security.auth.callback.Callback;
@@ -26,10 +28,7 @@
 import javax.security.auth.callback.UnsupportedCallbackException;
 import javax.security.sasl.AuthorizeCallback;
 import javax.security.sasl.RealmCallback;
-
-import org.apache.kafka.common.config.SaslConfigs;
-import org.apache.kafka.common.network.Mode;
-import org.apache.kafka.common.security.auth.AuthCallbackHandler;
+import java.util.Map;
 
 /**
  * Callback handler for Sasl clients. The callbacks required for the SASL mechanism
@@ -59,7 +58,7 @@ public void handle(Callback[] callbacks) throws UnsupportedCallbackException {
                     nc.setName(nc.getDefaultName());
             } else if (callback instanceof PasswordCallback) {
                 if (!isKerberos && subject != null && !subject.getPrivateCredentials(String.class).isEmpty()) {
-                    char [] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
+                    char[] password = subject.getPrivateCredentials(String.class).iterator().next().toCharArray();
                     ((PasswordCallback) callback).setPassword(password);
                 } else {
                     String errorMessage = "Could not login: the client is being asked for a password, but the Kafka" +
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
index a844bb08917..5186d05d492 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerRecordTest.java
@@ -26,24 +26,24 @@
 
     @Test
     public void testEqualsAndHashCode() {
-        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> producerRecord = new ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, producerRecord);
         assertEquals(producerRecord.hashCode(), producerRecord.hashCode());
 
-        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1 , "key", 1);
+        ProducerRecord<String, Integer> equalRecord = new ProducerRecord<>("test", 1, "key", 1);
         assertEquals(producerRecord, equalRecord);
         assertEquals(producerRecord.hashCode(), equalRecord.hashCode());
 
-        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1 , "key", 1);
+        ProducerRecord<String, Integer> topicMisMatch = new ProducerRecord<>("test-1", 1, "key", 1);
         assertFalse(producerRecord.equals(topicMisMatch));
 
-        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2 , "key", 1);
+        ProducerRecord<String, Integer> partitionMismatch = new ProducerRecord<>("test", 2, "key", 1);
         assertFalse(producerRecord.equals(partitionMismatch));
 
-        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1 , "key-1", 1);
+        ProducerRecord<String, Integer> keyMisMatch = new ProducerRecord<>("test", 1, "key-1", 1);
         assertFalse(producerRecord.equals(keyMisMatch));
 
-        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1 , "key", 2);
+        ProducerRecord<String, Integer> valueMisMatch = new ProducerRecord<>("test", 1, "key", 2);
         assertFalse(producerRecord.equals(valueMisMatch));
 
         ProducerRecord<String, Integer> nullFieldsRecord = new ProducerRecord<>("topic", null, null, null, null);
diff --git a/docs/upgrade.html b/docs/upgrade.html
index e6b9747d0f9..faa96c1aca6 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -55,6 +55,23 @@ <h5><a id="upgrade_10_1_breaking" href="#upgrade_10_1_breaking">Potential breaki
 
 <h5><a id="upgrade_1010_streams" href="#upgrade_1010_streams">Streams API changes in 0.10.1.0</a></h5>
 <ul>
+    <li> Upgrading from 0.10.0.x to 0.10.1.2 requires two rolling bounces with config <code>upgrade.from="0.10.0"</code> set for first upgrade phase
+         (cf. <a href="https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade">KIP-268</a>).
+         As an alternative, an offline upgrade is also possible.
+        <ul>
+            <li> prepare your application instances for a rolling bounce and make sure that config <code>upgrade.from</code> is set to <code>"0.10.0"</code> for new version 0.10.1.2 </li>
+            <li> bounce each instance of your application once </li>
+            <li> prepare your newly deployed 0.10.1.2 application instances for a second round of rolling bounces; make sure to remove the value for config <code>upgrade.mode</code> </li>
+            <li> bounce each instance of your application once more to complete the upgrade </li>
+        </ul>
+    </li>
+    <li> Upgrading from 0.10.0.x to 0.10.1.0 or 0.10.1.1 requires an offline upgrade (rolling bounce upgrade is not supported)
+        <ul>
+            <li> stop all old (0.10.0.x) application instances </li>
+            <li> update your code and swap old code and jar file with new code and new jar file </li>
+            <li> restart all new (0.10.1.0 or 0.10.1.1) application instances </li>
+        </ul>
+    </li>
     <li> Stream grouping and aggregation split into two methods:
         <ul>
             <li> old: KStream #aggregateByKey(), #reduceByKey(), and #countByKey() </li>
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index 2ff459f1520..07944a9e42d 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -31,6 +31,7 @@ versions += [
   jackson: "2.6.3",
   jetty: "9.2.22.v20170606",
   jersey: "2.22.2",
+  kafka_0100: "0.10.0.1",
   log4j: "1.2.17",
   jopt: "4.9",
   junit: "4.12",
@@ -91,6 +92,7 @@ libs += [
   junit: "junit:junit:$versions.junit",
   log4j: "log4j:log4j:$versions.log4j",
   joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
+  kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
   lz4: "net.jpountz.lz4:lz4:$versions.lz4",
   metrics: "com.yammer.metrics:metrics-core:$versions.metrics",
   powermock: "org.powermock:powermock-module-junit4:$versions.powermock",
diff --git a/gradle/rat.gradle b/gradle/rat.gradle
index d62b3722a4f..a51876c23ea 100644
--- a/gradle/rat.gradle
+++ b/gradle/rat.gradle
@@ -84,9 +84,15 @@ class RatTask extends DefaultTask {
     if (!reportDir.exists()) {
       reportDir.mkdirs()
     }
-    generateXmlReport(reportDir)
-    printUnknownFiles()
-    generateHtmlReport()
+    def origEncoding = System.getProperty("file.encoding")
+    try {
+      System.setProperty("file.encoding", "UTF-8") //affects the output of the ant rat task
+      generateXmlReport(reportDir)
+      printUnknownFiles()
+      generateHtmlReport()
+    } finally {
+      System.setProperty("file.encoding", origEncoding)
+    }
   }
 }
 
@@ -109,7 +115,7 @@ class RatPlugin implements Plugin<Project> {
       mavenCentral()
     }
     project.dependencies {
-      rat 'org.apache.rat:apache-rat-tasks:0.11'
+      rat 'org.apache.rat:apache-rat-tasks:0.12'
     }
   }
 }
diff --git a/jenkins.sh b/jenkins.sh
new file mode 100755
index 00000000000..c21eb1d8500
--- /dev/null
+++ b/jenkins.sh
@@ -0,0 +1,20 @@
+#!/bin/bash
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This script is used for verifying changes in Jenkins. In order to provide faster feedback, the tasks are ordered so
+# that faster tasks are executed in every module before slower tasks (if possible). For example, the unit tests for all
+# the modules are executed before the integration tests.
+./gradlew clean compileJava compileScala compileTestJava compileTestScala checkstyleMain checkstyleTest test rat --no-daemon -PxmlFindBugsReport=true -PtestLoggingEvents=started,passed,skipped,failed "$@"
diff --git a/settings.gradle b/settings.gradle
index d430c2fd919..f3a1b81ba65 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -13,5 +13,5 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender',
-        'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
+include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'streams:upgrade-system-tests-0100',
+        'log4j-appender', 'connect:api', 'connect:runtime', 'connect:json', 'connect:file'
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 5ba438376f6..e33efefc38f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -39,6 +39,7 @@
 import java.util.Set;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
 
 /**
  * Configuration for Kafka Streams. Documentation for these configurations can be found in the <a
@@ -54,6 +55,16 @@
     // Prefix used to isolate producer configs from consumer configs.
     public static final String PRODUCER_PREFIX = "producer.";
 
+    /**
+     * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"} for upgrading an application from version {@code 0.10.0.x}.
+     */
+    public static final String UPGRADE_FROM_0100 = "0.10.0";
+
+    /** {@code upgrade.from} */
+    public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
+    public static final String UPGRADE_FROM_DOC = "Allows upgrading from version 0.10.0 to version 0.10.1 (or newer) in a backward compatible way. " +
+        "Default is null. Accepted values are \"" + UPGRADE_FROM_0100 + "\" (for upgrading from 0.10.0.x).";
+
     /** <code>state.dir</code> */
     public static final String STATE_DIR_CONFIG = "state.dir";
     private static final String STATE_DIR_DOC = "Directory location for state store.";
@@ -257,14 +268,19 @@
                                         10 * 1024 * 1024L,
                                         atLeast(0),
                                         Importance.LOW,
-                                        CACHE_MAX_BYTES_BUFFERING_DOC);
+                                        CACHE_MAX_BYTES_BUFFERING_DOC)
+                                .define(UPGRADE_FROM_CONFIG,
+                                        ConfigDef.Type.STRING,
+                                        null,
+                                        in(null, UPGRADE_FROM_0100),
+                                        ConfigDef.Importance.LOW,
+                                        UPGRADE_FROM_DOC);
     }
 
     // this is the list of configs for underlying clients
     // that streams prefer different default values
     private static final Map<String, Object> PRODUCER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         Map<String, Object> tempProducerDefaultOverrides = new HashMap<>();
         tempProducerDefaultOverrides.put(ProducerConfig.LINGER_MS_CONFIG, "100");
 
@@ -272,8 +288,7 @@
     }
 
     private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
-    static
-    {
+    static {
         Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
         tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1000");
         tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
@@ -342,6 +357,7 @@ public StreamsConfig(Map<?, ?> props) {
         consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId + "-consumer");
 
         // add configs required for stream partition assignor
+        consumerProps.put(UPGRADE_FROM_CONFIG, getString(UPGRADE_FROM_CONFIG));
         consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, streamThread);
         consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, getInt(REPLICATION_FACTOR_CONFIG));
         consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, getInt(NUM_STANDBY_REPLICAS_CONFIG));
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
index 81f1f63078f..bfca83a39b4 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java
@@ -676,7 +676,7 @@ private void connectProcessorAndStateStore(String processorName, String stateSto
         }
     }
 
-    private Set<String> findSourceTopicsForProcessorParents(String [] parents) {
+    private Set<String> findSourceTopicsForProcessorParents(String[] parents) {
         final Set<String> sourceTopics = new HashSet<>();
         for (String parent : parents) {
             NodeFactory nodeFactory = nodeFactories.get(parent);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index dcba5437bf5..e6c407fdbae 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -66,7 +65,7 @@
         public final TaskId taskId;
         public final TopicPartition partition;
 
-        public AssignedPartition(TaskId taskId, TopicPartition partition) {
+        AssignedPartition(final TaskId taskId, final TopicPartition partition) {
             this.taskId = taskId;
             this.partition = partition;
         }
@@ -92,6 +91,7 @@ public int compare(TopicPartition p1, TopicPartition p2) {
 
     private StreamThread streamThread;
 
+    private int userMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
     private int numStandbyReplicas;
     private Map<Integer, TopologyBuilder.TopicsInfo> topicGroups;
     private Map<TopicPartition, Set<TaskId>> partitionToTaskIds;
@@ -111,6 +111,11 @@ public int compare(TopicPartition p1, TopicPartition p2) {
     public void configure(Map<String, ?> configs) {
         numStandbyReplicas = (Integer) configs.get(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG);
 
+        final String upgradeMode = (String) configs.get(StreamsConfig.UPGRADE_FROM_CONFIG);
+        if (StreamsConfig.UPGRADE_FROM_0100.equals(upgradeMode)) {
+            log.info("Downgrading metadata version from 2 to 1 for upgrade from 0.10.0.x.");
+            userMetadataVersion = 1;
+        }
 
         Object o = configs.get(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE);
         if (o == null) {
@@ -174,7 +179,7 @@ public Subscription subscription(Set<String> topics) {
         Set<TaskId> prevTasks = streamThread.prevTasks();
         Set<TaskId> standbyTasks = streamThread.cachedTasks();
         standbyTasks.removeAll(prevTasks);
-        SubscriptionInfo data = new SubscriptionInfo(streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
+        SubscriptionInfo data = new SubscriptionInfo(userMetadataVersion, streamThread.processId, prevTasks, standbyTasks, this.userEndPointConfig);
 
         if (streamThread.builder.sourceTopicPattern() != null) {
             SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates();
@@ -265,12 +270,16 @@ public Subscription subscription(Set<String> topics) {
         Map<UUID, ClientState<TaskId>> states = new HashMap<>();
         Map<UUID, HostInfo> consumerEndPointMap = new HashMap<>();
         // decode subscription info
+        int minUserMetadataVersion = SubscriptionInfo.CURRENT_VERSION;
         for (Map.Entry<String, Subscription> entry : subscriptions.entrySet()) {
             String consumerId = entry.getKey();
             Subscription subscription = entry.getValue();
 
-
             SubscriptionInfo info = SubscriptionInfo.decode(subscription.userData());
+            final int usedVersion = info.version;
+            if (usedVersion < minUserMetadataVersion) {
+                minUserMetadataVersion = usedVersion;
+            }
             if (info.userEndPoint != null) {
                 final String[] hostPort = info.userEndPoint.split(":");
                 consumerEndPointMap.put(info.processId, new HostInfo(hostPort[0], Integer.valueOf(hostPort[1])));
@@ -460,6 +469,7 @@ public Subscription subscription(Set<String> topics) {
 
 
                 assignmentSuppliers.add(new AssignmentSupplier(consumer,
+                                                               minUserMetadataVersion,
                                                                active,
                                                                standby,
                                                                endPointMap,
@@ -483,17 +493,20 @@ public Subscription subscription(Set<String> topics) {
 
     class AssignmentSupplier {
         private final String consumer;
+        private final int metadataVersion;
         private final List<TaskId> active;
         private final Map<TaskId, Set<TopicPartition>> standby;
         private final Map<HostInfo, Set<TopicPartition>> endPointMap;
         private final List<TopicPartition> activePartitions;
 
         AssignmentSupplier(final String consumer,
+                           final int metadataVersion,
                            final List<TaskId> active,
                            final Map<TaskId, Set<TopicPartition>> standby,
                            final Map<HostInfo, Set<TopicPartition>> endPointMap,
                            final List<TopicPartition> activePartitions) {
             this.consumer = consumer;
+            this.metadataVersion = metadataVersion;
             this.active = active;
             this.standby = standby;
             this.endPointMap = endPointMap;
@@ -501,7 +514,8 @@ public Subscription subscription(Set<String> topics) {
         }
 
         Assignment get() {
-            return new Assignment(activePartitions, new AssignmentInfo(active,
+            return new Assignment(activePartitions, new AssignmentInfo(metadataVersion,
+                                                                       active,
                                                                        standby,
                                                                        endPointMap).encode());
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
index 6569f8587bc..ce9aa6309ca 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.common.record.ByteBufferInputStream;
@@ -56,7 +55,7 @@ public AssignmentInfo(List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>>
         this(CURRENT_VERSION, activeTasks, standbyTasks, hostState);
     }
 
-    protected AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
+    public AssignmentInfo(int version, List<TaskId> activeTasks, Map<TaskId, Set<TopicPartition>> standbyTasks,
                              Map<HostInfo, Set<TopicPartition>> hostState) {
         this.version = version;
         this.activeTasks = activeTasks;
@@ -155,9 +154,7 @@ public static AssignmentInfo decode(ByteBuffer data) {
                 }
             }
 
-            return new AssignmentInfo(activeTasks, standbyTasks, hostStateToTopicPartitions);
-
-
+            return new AssignmentInfo(version, activeTasks, standbyTasks, hostStateToTopicPartitions);
         } catch (IOException ex) {
             throw new TaskAssignmentException("Failed to decode AssignmentInfo", ex);
         }
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
index c3481c05156..92c50a2a942 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/SubscriptionInfo.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.streams.errors.TaskAssignmentException;
@@ -32,7 +31,7 @@
 
     private static final Logger log = LoggerFactory.getLogger(SubscriptionInfo.class);
 
-    private static final int CURRENT_VERSION = 2;
+    public static final int CURRENT_VERSION = 2;
 
     public final int version;
     public final UUID processId;
@@ -44,7 +43,7 @@ public SubscriptionInfo(UUID processId, Set<TaskId> prevTasks, Set<TaskId> stand
         this(CURRENT_VERSION, processId, prevTasks, standbyTasks, userEndPoint);
     }
 
-    private SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
+    public SubscriptionInfo(int version, UUID processId, Set<TaskId> prevTasks, Set<TaskId> standbyTasks, String userEndPoint) {
         this.version = version;
         this.processId = processId;
         this.prevTasks = prevTasks;
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index 35b88db583b..e4ba9cdf143 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -22,6 +22,7 @@
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -39,11 +40,12 @@
     public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     @Test
-    public void testStartAndClose() throws Exception {
+    public void testStartAndClose() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testStartAndClose");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final int oldInitCount = MockMetricsReporter.INIT_COUNT.get();
         final int oldCloseCount = MockMetricsReporter.CLOSE_COUNT.get();
@@ -62,11 +64,12 @@ public void testStartAndClose() throws Exception {
     }
 
     @Test
-    public void testCloseIsIdempotent() throws Exception {
+    public void testCloseIsIdempotent() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCloseIsIdempotent");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         props.setProperty(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MockMetricsReporter.class.getName());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -79,10 +82,11 @@ public void testCloseIsIdempotent() throws Exception {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartOnceClosed() throws Exception {
+    public void testCannotStartOnceClosed() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartOnceClosed");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -99,10 +103,11 @@ public void testCannotStartOnceClosed() throws Exception {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotStartTwice() throws Exception {
+    public void testCannotStartTwice() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotStartTwice");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -119,25 +124,25 @@ public void testCannotStartTwice() throws Exception {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.allMetadata();
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetAllTasksWithStoreWhenNotRunning() throws Exception {
+    public void shouldNotGetAllTasksWithStoreWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.allMetadataForStore("store");
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndSerializerWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", Serdes.String().serializer());
     }
 
     @Test(expected = IllegalStateException.class)
-    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() throws Exception {
+    public void shouldNotGetTaskWithKeyAndPartitionerWhenNotRunning() {
         final KafkaStreams streams = createKafkaStreams();
         streams.metadataForKey("store", "key", new StreamPartitioner<String, Object>() {
             @Override
@@ -152,16 +157,18 @@ private KafkaStreams createKafkaStreams() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "appId");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         return new KafkaStreams(builder, props);
     }
 
     @Test
-    public void testCleanup() throws Exception {
+    public void testCleanup() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testLocalCleanup");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
@@ -173,10 +180,11 @@ public void testCleanup() throws Exception {
     }
 
     @Test(expected = IllegalStateException.class)
-    public void testCannotCleanupWhileRunning() throws Exception {
+    public void testCannotCleanupWhileRunning() {
         final Properties props = new Properties();
         props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "testCannotCleanupWhileRunning");
         props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        props.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStreamBuilder builder = new KStreamBuilder();
         final KafkaStreams streams = new KafkaStreams(builder, props);
diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index f03bed95c1a..9d401487d6c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -58,7 +58,7 @@ public void setUp() {
     }
 
     @Test
-    public void testGetProducerConfigs() throws Exception {
+    public void testGetProducerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getProducerConfigs("client");
         assertEquals(returnedProps.get(ProducerConfig.CLIENT_ID_CONFIG), "client-producer");
         assertEquals(returnedProps.get(ProducerConfig.LINGER_MS_CONFIG), "100");
@@ -66,7 +66,7 @@ public void testGetProducerConfigs() throws Exception {
     }
 
     @Test
-    public void testGetConsumerConfigs() throws Exception {
+    public void testGetConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getConsumerConfigs(null, "example-application", "client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-consumer");
         assertEquals(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG), "example-application");
@@ -75,7 +75,7 @@ public void testGetConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void testGetRestoreConsumerConfigs() throws Exception {
+    public void testGetRestoreConsumerConfigs() {
         Map<String, Object> returnedProps = streamsConfig.getRestoreConsumerConfigs("client");
         assertEquals(returnedProps.get(ConsumerConfig.CLIENT_ID_CONFIG), "client-restore-consumer");
         assertNull(returnedProps.get(ConsumerConfig.GROUP_ID_CONFIG));
@@ -84,7 +84,7 @@ public void testGetRestoreConsumerConfigs() throws Exception {
 
     @Test
     public void defaultSerdeShouldBeConfigured() {
-        Map<String, Object> serializerConfigs = new HashMap<String, Object>();
+        Map<String, Object> serializerConfigs = new HashMap<>();
         serializerConfigs.put("key.serializer.encoding", "UTF8");
         serializerConfigs.put("value.serializer.encoding", "UTF-16");
         Serializer<String> serializer = Serdes.String().serializer();
@@ -115,7 +115,7 @@ public void shouldSupportMultipleBootstrapServers() {
     }
 
     @Test
-    public void shouldSupportPrefixedConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -125,7 +125,7 @@ public void shouldSupportPrefixedConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldSupportPrefixedRestoreConsumerConfigs() {
         props.put(consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "earliest");
         props.put(consumerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -135,7 +135,7 @@ public void shouldSupportPrefixedRestoreConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getConsumerConfigs(null, "groupId", "clientId");
@@ -143,7 +143,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfConsumerConfig() thro
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(consumerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> consumerConfigs = streamsConfig.getRestoreConsumerConfigs("clientId");
@@ -151,7 +151,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfRestoreConsumerConfig
     }
 
     @Test
-    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() throws Exception {
+    public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() {
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         props.put(producerPrefix("interceptor.statsd.host"), "host");
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("clientId");
@@ -160,7 +160,7 @@ public void shouldSupportPrefixedPropertiesThatAreNotPartOfProducerConfig() thro
 
 
     @Test
-    public void shouldSupportPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportPrefixedProducerConfigs() {
         props.put(producerPrefix(ProducerConfig.BUFFER_MEMORY_CONFIG), 10);
         props.put(producerPrefix(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG), 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -170,7 +170,7 @@ public void shouldSupportPrefixedProducerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -180,7 +180,7 @@ public void shouldBeSupportNonPrefixedConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception {
+    public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() {
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -190,7 +190,7 @@ public void shouldBeSupportNonPrefixedRestoreConsumerConfigs() throws Exception
     }
 
     @Test
-    public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
+    public void shouldSupportNonPrefixedProducerConfigs() {
         props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 10);
         props.put(ConsumerConfig.METRICS_NUM_SAMPLES_CONFIG, 1);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -199,24 +199,22 @@ public void shouldSupportNonPrefixedProducerConfigs() throws Exception {
         assertEquals(1, configs.get(ProducerConfig.METRICS_NUM_SAMPLES_CONFIG));
     }
 
-
-
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfKeySerdeConfigFails() {
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.keySerde();
     }
 
     @Test(expected = StreamsException.class)
-    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() throws Exception {
+    public void shouldThrowStreamsExceptionIfValueSerdeConfigFails() {
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, MisconfiguredSerde.class);
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.valueSerde();
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConfigs() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -226,7 +224,7 @@ public void shouldOverrideStreamsDefaultConsumerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
+    public void shouldOverrideStreamsDefaultProducerConfigs() {
         props.put(StreamsConfig.producerPrefix(ProducerConfig.LINGER_MS_CONFIG), "10000");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         final Map<String, Object> producerConfigs = streamsConfig.getProducerConfigs("client");
@@ -234,7 +232,7 @@ public void shouldOverrideStreamsDefaultProducerConfigs() throws Exception {
     }
 
     @Test
-    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throws Exception {
+    public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG), "latest");
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.MAX_POLL_RECORDS_CONFIG), "10");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
@@ -244,14 +242,14 @@ public void shouldOverrideStreamsDefaultConsumerConifgsOnRestoreConsumer() throw
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getConsumerConfigs(null, "a", "b");
     }
 
     @Test(expected = ConfigException.class)
-    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() throws Exception {
+    public void shouldThrowExceptionIfRestoreConsumerAutoCommitIsOverridden() {
         props.put(StreamsConfig.consumerPrefix(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG), "true");
         final StreamsConfig streamsConfig = new StreamsConfig(props);
         streamsConfig.getRestoreConsumerConfigs("client");
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index a5fb0763187..88098dc679b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -32,6 +32,7 @@
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.ValueMapper;
+import org.apache.kafka.test.TestUtils;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -79,13 +80,12 @@
     private static final String OUTPUT_TOPIC_C = "C";
 
     @BeforeClass
-    public static void startKafkaCluster() throws Exception {
+    public static void startKafkaCluster() {
         CLUSTER.createTopic(INPUT_TOPIC_A);
         CLUSTER.createTopic(OUTPUT_TOPIC_B);
         CLUSTER.createTopic(OUTPUT_TOPIC_C);
     }
 
-
     @Parameter
     public long cacheSizeBytes;
 
@@ -117,6 +117,7 @@ public void shouldFanoutTheInput() throws Exception {
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
+        streamsConfiguration.setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
 
         final KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
         final KStream<byte[], String> stream2 = stream1.mapValues(
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index ab08dbe0f59..eeb455bcc1e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -122,8 +122,8 @@ public void shouldReduce() throws Exception {
 
         List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 5);
+            new StringDeserializer(),
+            5);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -172,8 +172,8 @@ public String apply(Windowed<String> windowedKey, String value) {
 
         List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Comparator<KeyValue<String, String>>
             comparator =
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index e5560c1b62f..383a79363a8 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -97,11 +97,10 @@ public void before() {
         streamsConfiguration = new Properties();
         final String applicationId = "kgrouped-stream-test-" + testNo;
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
-        streamsConfiguration
-            .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1);
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);
 
@@ -155,8 +154,8 @@ public void shouldReduce() throws Exception {
 
         final List<KeyValue<String, String>> results = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 10);
+            new StringDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, String>>() {
             @Override
@@ -209,8 +208,8 @@ public String apply(final Windowed<String> windowedKey, final String value) {
 
         final List<KeyValue<String, String>> windowedOutput = receiveMessages(
             new StringDeserializer(),
-            new StringDeserializer()
-            , 15);
+            new StringDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, String>>
             comparator =
@@ -263,8 +262,8 @@ public void shouldAggregate() throws Exception {
 
         final List<KeyValue<String, Integer>> results = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 10);
+            new IntegerDeserializer(),
+            10);
 
         Collections.sort(results, new Comparator<KeyValue<String, Integer>>() {
             @Override
@@ -313,8 +312,8 @@ public String apply(final Windowed<String> windowedKey, final Integer value) {
 
         final List<KeyValue<String, Integer>> windowedMessages = receiveMessages(
             new StringDeserializer(),
-            new IntegerDeserializer()
-            , 15);
+            new IntegerDeserializer(),
+            15);
 
         final Comparator<KeyValue<String, Integer>>
             comparator =
@@ -364,8 +363,8 @@ public void shouldCount() throws Exception {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
@@ -406,8 +405,8 @@ public String apply(final Windowed<Integer> windowedKey, final Long value) {
 
         final List<KeyValue<String, Long>> results = receiveMessages(
             new StringDeserializer(),
-            new LongDeserializer()
-            , 10);
+            new LongDeserializer(),
+            10);
         Collections.sort(results, new Comparator<KeyValue<String, Long>>() {
             @Override
             public int compare(final KeyValue<String, Long> o1, final KeyValue<String, Long> o2) {
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 5f85536efe3..7848d1b3f2b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -260,7 +260,7 @@ private Properties prepareTest() throws Exception {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
-        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 8);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
index e46a016447d..8a1e13a1925 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java
@@ -37,6 +37,7 @@
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockStateStoreSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -99,13 +100,14 @@ private Properties configProps() {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
 
     @SuppressWarnings("unchecked")
     @Test
-    public void testSubscription() throws Exception {
+    public void testSubscription() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -148,7 +150,7 @@ public void testSubscription() throws Exception {
     }
 
     @Test
-    public void testAssignBasic() throws Exception {
+    public void testAssignBasic() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -215,7 +217,7 @@ public void testAssignBasic() throws Exception {
     }
 
     @Test
-    public void testAssignWithNewTasks() throws Exception {
+    public void testAssignWithNewTasks() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopologyBuilder builder = new TopologyBuilder();
@@ -274,7 +276,7 @@ public void testAssignWithNewTasks() throws Exception {
     }
 
     @Test
-    public void testAssignWithStates() throws Exception {
+    public void testAssignWithStates() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -334,7 +336,7 @@ public void testAssignWithStates() throws Exception {
     }
 
     @Test
-    public void testAssignWithStandbyReplicas() throws Exception {
+    public void testAssignWithStandbyReplicas() {
         Properties props = configProps();
         props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1");
         StreamsConfig config = new StreamsConfig(props);
@@ -449,7 +451,7 @@ private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment)
     }
 
     @Test
-    public void testOnAssignment() throws Exception {
+    public void testOnAssignment() {
         StreamsConfig config = new StreamsConfig(configProps());
 
         TopicPartition t2p3 = new TopicPartition("topic2", 3);
@@ -482,7 +484,7 @@ public void testOnAssignment() throws Exception {
     }
 
     @Test
-    public void testAssignWithInternalTopics() throws Exception {
+    public void testAssignWithInternalTopics() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -521,7 +523,7 @@ public void testAssignWithInternalTopics() throws Exception {
     }
 
     @Test
-    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throws Exception {
+    public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() {
         StreamsConfig config = new StreamsConfig(configProps());
         String applicationId = "test";
         TopologyBuilder builder = new TopologyBuilder();
@@ -555,7 +557,7 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throw
         subscriptions.put("consumer10",
                           new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks, userEndPoint).encode()));
 
-        Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions);
+        partitionAssignor.assign(metadata, subscriptions);
 
         // check prepared internal topics
         assertEquals(2, internalTopicManager.readyTopics.size());
@@ -563,7 +565,7 @@ public void testAssignWithInternalTopicThatsSourceIsAnotherInternalTopic() throw
     }
 
     @Test
-    public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
+    public void shouldAddUserDefinedEndPointToSubscription() {
         final Properties properties = configProps();
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:8080");
         final StreamsConfig config = new StreamsConfig(properties);
@@ -589,7 +591,70 @@ public void shouldAddUserDefinedEndPointToSubscription() throws Exception {
     }
 
     @Test
-    public void shouldMapUserEndPointToTopicPartitions() throws Exception {
+    public void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions() {
+        final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>();
+        final Set<TaskId> emptyTasks = Collections.emptySet();
+        subscriptions.put(
+            "consumer1",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(1, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+        subscriptions.put(
+            "consumer2",
+            new PartitionAssignor.Subscription(
+                Collections.singletonList("topic1"),
+                new SubscriptionInfo(2, UUID.randomUUID(), emptyTasks, emptyTasks, null).encode()
+            )
+        );
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        StreamsConfig config = new StreamsConfig(configProps());
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        final StreamThread streamThread = new StreamThread(
+            builder,
+            config,
+            new MockClientSupplier(),
+            "applicationId",
+            "clientId",
+            UUID.randomUUID(),
+            new Metrics(),
+            new SystemTime(),
+            new StreamsMetadataState(builder));
+
+        partitionAssignor.configure(config.getConsumerConfigs(streamThread, "test", "clientId"));
+        final Map<String, PartitionAssignor.Assignment> assignment = partitionAssignor.assign(metadata, subscriptions);
+
+        assertEquals(2, assignment.size());
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer1").userData()).version);
+        assertEquals(1, AssignmentInfo.decode(assignment.get("consumer2").userData()).version);
+    }
+
+    @Test
+    public void shouldDownGradeSubscription() {
+        final Properties properties = configProps();
+        properties.put(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100);
+        final StreamsConfig config = new StreamsConfig(properties);
+
+        final TopologyBuilder builder = new TopologyBuilder();
+        builder.addSource("source1", "topic1");
+
+        final String clientId = "client-id";
+        final StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId,
+            UUID.randomUUID(), new Metrics(), new SystemTime(), new StreamsMetadataState(builder));
+
+        final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
+        partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId));
+
+        final PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1"));
+
+        assertEquals(1, SubscriptionInfo.decode(subscription.userData()).version);
+    }
+
+    @Test
+    public void shouldMapUserEndPointToTopicPartitions() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:8080";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -628,7 +693,7 @@ public void shouldMapUserEndPointToTopicPartitions() throws Exception {
     }
 
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -655,7 +720,7 @@ public void shouldThrowExceptionIfApplicationServerConfigIsNotHostPortPair() thr
     }
     
     @Test
-    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() throws Exception {
+    public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() {
         final Properties properties = configProps();
         final String myEndPoint = "localhost:j87yhk";
         properties.put(StreamsConfig.APPLICATION_SERVER_CONFIG, myEndPoint);
@@ -682,7 +747,7 @@ public void shouldThrowExceptionIfApplicationServerConfigPortIsNotAnInteger() th
     }
 
     @Test
-    public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exception {
+    public void shouldExposeHostStateToTopicPartitionsOnAssignment() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
         final Map<HostInfo, Set<TopicPartition>> hostState =
@@ -696,7 +761,7 @@ public void shouldExposeHostStateToTopicPartitionsOnAssignment() throws Exceptio
     }
 
     @Test
-    public void shouldSetClusterMetadataOnAssignment() throws Exception {
+    public void shouldSetClusterMetadataOnAssignment() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
 
         final List<TopicPartition> topic = Arrays.asList(new TopicPartition("topic", 0));
@@ -718,7 +783,7 @@ public void shouldSetClusterMetadataOnAssignment() throws Exception {
     }
 
     @Test
-    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() throws Exception {
+    public void shouldReturnEmptyClusterMetadataIfItHasntBeenBuilt() {
         final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor();
         final Cluster cluster = partitionAssignor.clusterMetadata();
         assertNotNull(cluster);
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 2f252e9fa60..2ea57388f2c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -17,11 +17,6 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertSame;
-import static org.junit.Assert.assertTrue;
-
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
@@ -40,6 +35,7 @@
 import org.apache.kafka.test.MockClientSupplier;
 import org.apache.kafka.test.MockProcessorSupplier;
 import org.apache.kafka.test.MockTimestampExtractor;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Test;
 
 import java.io.File;
@@ -56,6 +52,11 @@
 import java.util.Set;
 import java.util.UUID;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
 public class StreamThreadTest {
 
     private final String clientId = "clientId";
@@ -117,6 +118,7 @@ private Properties configProps() {
                 setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171");
                 setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3");
                 setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class.getName());
+                setProperty(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getAbsolutePath());
             }
         };
     }
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
index ce94a23a2c9..6c94c18991a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignmentInfoTest.java
@@ -65,10 +65,9 @@ public void shouldDecodePreviousVersion() throws Exception {
         assertEquals(oldVersion.activeTasks, decoded.activeTasks);
         assertEquals(oldVersion.standbyTasks, decoded.standbyTasks);
         assertEquals(0, decoded.partitionsByHostState.size()); // should be empty as wasn't in V1
-        assertEquals(2, decoded.version); // automatically upgraded to v2 on decode;
+        assertEquals(1, decoded.version);
     }
 
-
     /**
      * This is a clone of what the V1 encoding did. The encode method has changed for V2
      * so it is impossible to test compatibility without having this
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
index f920c515c0e..63ad01d8609 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestClient.java
@@ -105,7 +105,7 @@ public boolean test(String key, Integer value) {
             }
         });
 
-        data.process(SmokeTestUtil.<Integer>printProcessorSupplier("data"));
+        data.process(SmokeTestUtil.<String, Integer>printProcessorSupplier("data"));
 
         // min
         KGroupedStream<String, Integer>
@@ -131,7 +131,7 @@ public Integer apply(String aggKey, Integer value, Integer aggregate) {
         ).to(stringSerde, intSerde, "min");
 
         KTable<String, Integer> minTable = builder.table(stringSerde, intSerde, "min", "minStoreName");
-        minTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("min"));
+        minTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("min"));
 
         // max
         groupedData.aggregate(
@@ -153,7 +153,7 @@ public Integer apply(String aggKey, Integer value, Integer aggregate) {
         ).to(stringSerde, intSerde, "max");
 
         KTable<String, Integer> maxTable = builder.table(stringSerde, intSerde, "max", "maxStoreName");
-        maxTable.toStream().process(SmokeTestUtil.<Integer>printProcessorSupplier("max"));
+        maxTable.toStream().process(SmokeTestUtil.<String, Integer>printProcessorSupplier("max"));
 
         // sum
         groupedData.aggregate(
@@ -176,7 +176,7 @@ public Long apply(String aggKey, Integer value, Long aggregate) {
 
 
         KTable<String, Long> sumTable = builder.table(stringSerde, longSerde, "sum", "sumStoreName");
-        sumTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("sum"));
+        sumTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("sum"));
 
         // cnt
         groupedData.count(UnlimitedWindows.of(), "uwin-cnt")
@@ -185,7 +185,7 @@ public Long apply(String aggKey, Integer value, Long aggregate) {
         ).to(stringSerde, longSerde, "cnt");
 
         KTable<String, Long> cntTable = builder.table(stringSerde, longSerde, "cnt", "cntStoreName");
-        cntTable.toStream().process(SmokeTestUtil.<Long>printProcessorSupplier("cnt"));
+        cntTable.toStream().process(SmokeTestUtil.<String, Long>printProcessorSupplier("cnt"));
 
         // dif
         maxTable.join(minTable,
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
index f9d30d50e36..f103355ac9e 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestDriver.java
@@ -125,38 +125,48 @@ public void run() {
     }
 
     public static Map<String, Set<Integer>> generate(String kafka, final int numKeys, final int maxRecordsPerKey) throws Exception {
-        Properties props = new Properties();
+        return generate(kafka, numKeys, maxRecordsPerKey, true);
+    }
+
+    public static Map<String, Set<Integer>> generate(final String kafka,
+                                                     final int numKeys,
+                                                     final int maxRecordsPerKey,
+                                                     final boolean autoTerminate) throws Exception {
+        final Properties props = new Properties();
         props.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
         props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
 
-        KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
+        final KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(props);
 
         int numRecordsProduced = 0;
 
-        Map<String, Set<Integer>> allData = new HashMap<>();
-        ValueList[] data = new ValueList[numKeys];
+        final Map<String, Set<Integer>> allData = new HashMap<>();
+        final ValueList[] data = new ValueList[numKeys];
         for (int i = 0; i < numKeys; i++) {
             data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
             allData.put(data[i].key, new HashSet<Integer>());
         }
-        Random rand = new Random();
+        final Random rand = new Random();
 
-        int remaining = data.length;
+        int remaining = 1; // dummy value must be positive if <autoTerminate> is false
+        if (autoTerminate) {
+            remaining = data.length;
+        }
 
         while (remaining > 0) {
-            int index = rand.nextInt(remaining);
-            String key = data[index].key;
+            final int index = autoTerminate ? rand.nextInt(remaining) : rand.nextInt(numKeys);
+            final String key = data[index].key;
             int value = data[index].next();
 
-            if (value < 0) {
+            if (autoTerminate && value < 0) {
                 remaining--;
                 data[index] = data[remaining];
                 value = END;
             }
 
-            ProducerRecord<byte[], byte[]> record =
+            final ProducerRecord<byte[], byte[]> record =
                     new ProducerRecord<>("data", stringSerde.serializer().serialize("", key), intSerde.serializer().serialize("", value));
 
             producer.send(record);
@@ -165,8 +175,9 @@ public void run() {
                 numRecordsProduced++;
                 allData.get(key).add(value);
 
-                if (numRecordsProduced % 100 == 0)
+                if (numRecordsProduced % 100 == 0) {
                     System.out.println(numRecordsProduced + " records produced");
+                }
 
                 Thread.sleep(10);
             }
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
index 7ff738f1d95..d9ad745e169 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/SmokeTestUtil.java
@@ -37,27 +37,20 @@
     public final static long START_TIME = 60000L * 60 * 24 * 365 * 30;
     public final static int END = Integer.MAX_VALUE;
 
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic) {
-        return printProcessorSupplier(topic, false);
-    }
-
-    public static <T> ProcessorSupplier<String, T> printProcessorSupplier(final String topic, final boolean printOffset) {
-        return new ProcessorSupplier<String, T>() {
-            public Processor<String, T> get() {
-                return new AbstractProcessor<String, T>() {
+    public static <K, V> ProcessorSupplier<K, V> printProcessorSupplier(final String topic) {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
                     private int numRecordsProcessed = 0;
-                    private ProcessorContext context;
 
                     @Override
                     public void init(ProcessorContext context) {
                         System.out.println("initializing processor: topic=" + topic + " taskId=" + context.taskId());
                         numRecordsProcessed = 0;
-                        this.context = context;
                     }
 
                     @Override
-                    public void process(String key, T value) {
-                        if (printOffset) System.out.println(">>> " + context.offset());
+                    public void process(K key, V value) {
                         numRecordsProcessed++;
                         if (numRecordsProcessed % 100 == 0) {
                             System.out.println("processed " + numRecordsProcessed + " records from topic=" + topic);
@@ -65,12 +58,10 @@ public void process(String key, T value) {
                     }
 
                     @Override
-                    public void punctuate(long timestamp) {
-                    }
+                    public void punctuate(long timestamp) {}
 
                     @Override
-                    public void close() {
-                    }
+                    public void close() {}
                 };
             }
         };
diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
index c26544e4fc5..3328ae5e480 100644
--- a/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/StreamsSmokeTest.java
@@ -24,7 +24,7 @@
 public class StreamsSmokeTest {
 
     /**
-     *  args ::= command kafka zookeeper stateDir
+     *  args ::= command kafka zookeeper stateDir disableAutoTerminate
      *  command := "run" | "process"
      *
      * @param args
@@ -34,12 +34,14 @@ public static void main(String[] args) throws Exception {
         String kafka = args.length > 1 ? args[1] : null;
         String zookeeper = args.length > 2 ? args[2] : null;
         String stateDir = args.length > 3 ? args[3] : null;
+        boolean disableAutoTerminate = args.length > 4;
 
-        System.out.println("StreamsSmokeTest instance started");
+        System.out.println("StreamsTest instance started (StreamsSmokeTest)");
         System.out.println("command=" + command);
         System.out.println("kafka=" + kafka);
         System.out.println("zookeeper=" + zookeeper);
         System.out.println("stateDir=" + stateDir);
+        System.out.println("disableAutoTerminate=" + disableAutoTerminate);
 
         switch (command) {
             case "standalone":
@@ -49,8 +51,12 @@ public static void main(String[] args) throws Exception {
                 // this starts the driver (data generation and result verification)
                 final int numKeys = 10;
                 final int maxRecordsPerKey = 500;
-                Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
-                SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                if (disableAutoTerminate) {
+                    SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey, false);
+                } else {
+                    Map<String, Set<Integer>> allData = SmokeTestDriver.generate(kafka, numKeys, maxRecordsPerKey);
+                    SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+                }
                 break;
             case "process":
                 // this starts a KafkaStreams client
diff --git a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..e771bead6e8
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.smoketest.SmokeTestUtil;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir, [upgradeFrom: optional]) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+        final String upgradeFrom = args.length > 3 ? args[3] : null;
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest trunk)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+        System.out.println("upgradeFrom=" + upgradeFrom);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(SmokeTestUtil.printProcessorSupplier("data"));
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+        if (upgradeFrom != null) {
+            config.setProperty(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFrom);
+        }
+
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+}
diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
index 83a9092fa20..1bedd870032 100644
--- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
+++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java
@@ -345,7 +345,7 @@ public synchronized long position(TopicPartition partition) {
             // consumer.subscribe(new TopicPartition(topicName, 1));
             // Set up the partition that matches the ID (which is what ProcessorStateManager expects) ...
             List<PartitionInfo> partitionInfos = new ArrayList<>();
-            partitionInfos.add(new PartitionInfo(topicName , id.partition, null, null, null));
+            partitionInfos.add(new PartitionInfo(topicName, id.partition, null, null, null));
             consumer.updatePartitions(topicName, partitionInfos);
             consumer.updateEndOffsets(Collections.singletonMap(new TopicPartition(topicName, id.partition), 0L));
         }
diff --git a/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..7d3ed436881
--- /dev/null
+++ b/streams/upgrade-system-tests-0100/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.processor.AbstractProcessor;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.ProcessorContext;
+import org.apache.kafka.streams.processor.ProcessorSupplier;
+
+import java.util.Properties;
+
+public class StreamsUpgradeTest {
+
+    @SuppressWarnings("unchecked")
+    public static void main(final String[] args) {
+        if (args.length < 3) {
+            System.err.println("StreamsUpgradeTest requires three argument (kafka-url, zookeeper-url, state-dir) but only " + args.length + " provided: "
+                + (args.length > 0 ? args[0] + " " : "")
+                + (args.length > 1 ? args[1] : ""));
+        }
+        final String kafka = args[0];
+        final String zookeeper = args[1];
+        final String stateDir = args[2];
+
+        System.out.println("StreamsTest instance started (StreamsUpgradeTest v0.10.0)");
+        System.out.println("kafka=" + kafka);
+        System.out.println("zookeeper=" + zookeeper);
+        System.out.println("stateDir=" + stateDir);
+
+        final KStreamBuilder builder = new KStreamBuilder();
+        final KStream dataStream = builder.stream("data");
+        dataStream.process(printProcessorSupplier());
+        dataStream.to("echo");
+
+        final Properties config = new Properties();
+        config.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "StreamsUpgradeTest");
+        config.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+        config.setProperty(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
+        config.setProperty(StreamsConfig.STATE_DIR_CONFIG, stateDir);
+        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+
+        final KafkaStreams streams = new KafkaStreams(builder, config);
+        streams.start();
+
+        Runtime.getRuntime().addShutdownHook(new Thread() {
+            @Override
+            public void run() {
+                System.out.println("closing Kafka Streams instance");
+                System.out.flush();
+                streams.close();
+                System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+                System.out.flush();
+            }
+        });
+    }
+
+    private static <K, V> ProcessorSupplier<K, V> printProcessorSupplier() {
+        return new ProcessorSupplier<K, V>() {
+            public Processor<K, V> get() {
+                return new AbstractProcessor<K, V>() {
+                    private int numRecordsProcessed = 0;
+
+                    @Override
+                    public void init(final ProcessorContext context) {
+                        System.out.println("initializing processor: topic=data taskId=" + context.taskId());
+                        numRecordsProcessed = 0;
+                    }
+
+                    @Override
+                    public void process(final K key, final V value) {
+                        numRecordsProcessed++;
+                        if (numRecordsProcessed % 100 == 0) {
+                            System.out.println("processed " + numRecordsProcessed + " records from topic=data");
+                        }
+                    }
+
+                    @Override
+                    public void punctuate(final long timestamp) {}
+
+                    @Override
+                    public void close() {}
+                };
+            }
+        };
+    }
+}
diff --git a/tests/kafkatest/services/streams.py b/tests/kafkatest/services/streams.py
index a63810e2bd4..b857bd53224 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -33,6 +33,8 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
     LOG4J_CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties")
     PID_FILE = os.path.join(PERSISTENT_ROOT, "streams.pid")
 
+    CLEAN_NODE_ENABLED = True
+
     logs = {
         "streams_log": {
             "path": LOG_FILE,
@@ -43,6 +45,114 @@ class StreamsSmokeTestBaseService(KafkaPathResolverMixin, Service):
         "streams_stderr": {
             "path": STDERR_FILE,
             "collect_default": True},
+        "streams_log.0-1": {
+            "path": LOG_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stdout.0-1": {
+            "path": STDOUT_FILE + ".0-1",
+            "collect_default": True},
+        "streams_stderr.0-1": {
+            "path": STDERR_FILE + ".0-1",
+            "collect_default": True},
+        "streams_log.0-2": {
+            "path": LOG_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stdout.0-2": {
+            "path": STDOUT_FILE + ".0-2",
+            "collect_default": True},
+        "streams_stderr.0-2": {
+            "path": STDERR_FILE + ".0-2",
+            "collect_default": True},
+        "streams_log.0-3": {
+            "path": LOG_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stdout.0-3": {
+            "path": STDOUT_FILE + ".0-3",
+            "collect_default": True},
+        "streams_stderr.0-3": {
+            "path": STDERR_FILE + ".0-3",
+            "collect_default": True},
+        "streams_log.0-4": {
+            "path": LOG_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stdout.0-4": {
+            "path": STDOUT_FILE + ".0-4",
+            "collect_default": True},
+        "streams_stderr.0-4": {
+            "path": STDERR_FILE + ".0-4",
+            "collect_default": True},
+        "streams_log.0-5": {
+            "path": LOG_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stdout.0-5": {
+            "path": STDOUT_FILE + ".0-5",
+            "collect_default": True},
+        "streams_stderr.0-5": {
+            "path": STDERR_FILE + ".0-5",
+            "collect_default": True},
+        "streams_log.0-6": {
+            "path": LOG_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stdout.0-6": {
+            "path": STDOUT_FILE + ".0-6",
+            "collect_default": True},
+        "streams_stderr.0-6": {
+            "path": STDERR_FILE + ".0-6",
+            "collect_default": True},
+        "streams_log.1-1": {
+            "path": LOG_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stdout.1-1": {
+            "path": STDOUT_FILE + ".1-1",
+            "collect_default": True},
+        "streams_stderr.1-1": {
+            "path": STDERR_FILE + ".1-1",
+            "collect_default": True},
+        "streams_log.1-2": {
+            "path": LOG_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stdout.1-2": {
+            "path": STDOUT_FILE + ".1-2",
+            "collect_default": True},
+        "streams_stderr.1-2": {
+            "path": STDERR_FILE + ".1-2",
+            "collect_default": True},
+        "streams_log.1-3": {
+            "path": LOG_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stdout.1-3": {
+            "path": STDOUT_FILE + ".1-3",
+            "collect_default": True},
+        "streams_stderr.1-3": {
+            "path": STDERR_FILE + ".1-3",
+            "collect_default": True},
+        "streams_log.1-4": {
+            "path": LOG_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stdout.1-4": {
+            "path": STDOUT_FILE + ".1-4",
+            "collect_default": True},
+        "streams_stderr.1-4": {
+            "path": STDERR_FILE + ".1-4",
+            "collect_default": True},
+        "streams_log.1-5": {
+            "path": LOG_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stdout.1-5": {
+            "path": STDOUT_FILE + ".1-5",
+            "collect_default": True},
+        "streams_stderr.1-5": {
+            "path": STDERR_FILE + ".1-5",
+            "collect_default": True},
+        "streams_log.1-6": {
+            "path": LOG_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stdout.1-6": {
+            "path": STDOUT_FILE + ".1-6",
+            "collect_default": True},
+        "streams_stderr.1-6": {
+            "path": STDERR_FILE + ".1-6",
+            "collect_default": True},
     }
 
     def __init__(self, context, kafka, command):
@@ -95,7 +205,8 @@ def wait(self):
 
     def clean_node(self, node):
         node.account.kill_process("streams", clean_shutdown=False, allow_fail=True)
-        node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
+        if self.CLEAN_NODE_ENABLED:
+            node.account.ssh("rm -rf " + self.PERSISTENT_ROOT, allow_fail=False)
 
     def start_cmd(self, node):
         args = self.args.copy()
@@ -120,10 +231,10 @@ def start_node(self, node):
 
         node.account.create_file(self.LOG4J_CONFIG_FILE, self.render('tools_log4j.properties', log_file=self.LOG_FILE))
 
-        self.logger.info("Starting StreamsSmokeTest process on " + str(node.account))
+        self.logger.info("Starting StreamsTest process on " + str(node.account))
         with node.account.monitor_log(self.STDOUT_FILE) as monitor:
             node.account.ssh(self.start_cmd(node))
-            monitor.wait_until('StreamsSmokeTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsSmokeTest finished startup on " + str(node.account))
+            monitor.wait_until('StreamsTest instance started', timeout_sec=15, err_msg="Never saw message indicating StreamsTest finished startup on " + str(node.account))
 
         if len(self.pids(node)) == 0:
             raise RuntimeError("No process ids recorded")
@@ -132,8 +243,62 @@ def start_node(self, node):
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(context, kafka, "run")
+        self.DISABLE_AUTO_TERMINATE = ""
 
+    def disable_auto_terminate(self):
+        self.DISABLE_AUTO_TERMINATE = "disableAutoTerminate"
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['disable_auto_terminate'] = self.DISABLE_AUTO_TERMINATE
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              "INCLUDE_TEST_JARS=true %(kafka_run_class)s org.apache.kafka.streams.smoketest.StreamsSmokeTest " \
+              " %(command)s %(kafka)s %(zk)s %(state_dir)s %(disable_auto_terminate)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
 
 class StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, context, kafka):
         super(StreamsSmokeTestJobRunnerService, self).__init__(context, kafka, "process")
+
+class StreamsUpgradeTestJobRunnerService(StreamsSmokeTestBaseService):
+    def __init__(self, context, kafka):
+        super(StreamsUpgradeTestJobRunnerService, self).__init__(context, kafka, "")
+        self.UPGRADE_FROM = ""
+
+    def set_version(self, kafka_streams_version):
+        self.KAFKA_STREAMS_VERSION = kafka_streams_version
+
+    def set_upgrade_from(self, upgrade_from):
+        self.UPGRADE_FROM = upgrade_from
+
+    def start_cmd(self, node):
+        args = self.args.copy()
+        args['kafka'] = self.kafka.bootstrap_servers()
+        args['zk'] = self.kafka.zk.connect_setting()
+        args['state_dir'] = self.PERSISTENT_ROOT
+        args['stdout'] = self.STDOUT_FILE
+        args['stderr'] = self.STDERR_FILE
+        args['pidfile'] = self.PID_FILE
+        args['log4j'] = self.LOG4J_CONFIG_FILE
+        args['version'] = self.KAFKA_STREAMS_VERSION
+        args['upgrade_from'] = self.UPGRADE_FROM
+        args['kafka_run_class'] = self.path.script("kafka-run-class.sh", node)
+
+        cmd = "( export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%(log4j)s\"; " \
+              " INCLUDE_TEST_JARS=true UPGRADE_KAFKA_STREAMS_TEST_VERSION=%(version)s " \
+              " %(kafka_run_class)s org.apache.kafka.streams.tests.StreamsUpgradeTest " \
+              " %(kafka)s %(zk)s %(state_dir)s %(upgrade_from)s " \
+              " & echo $! >&3 ) 1>> %(stdout)s 2>> %(stderr)s 3> %(pidfile)s" % args
+
+        return cmd
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py b/tests/kafkatest/tests/streams/streams_upgrade_test.py
new file mode 100644
index 00000000000..8266e073d13
--- /dev/null
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -0,0 +1,200 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from kafkatest.tests.kafka_test import KafkaTest
+from kafkatest.services.streams import StreamsSmokeTestDriverService, StreamsUpgradeTestJobRunnerService
+from kafkatest.version import LATEST_0_10_0, TRUNK_VERSION
+import random
+
+class StreamsUpgradeTest(KafkaTest):
+    """
+    Test upgrading Kafka Streams from 0.10.0.x to 0.10.1.x (ie, TRUNK)
+    """
+
+    def __init__(self, test_context):
+        super(StreamsUpgradeTest, self).__init__(test_context, num_zk=1, num_brokers=1, topics={
+            'echo' : { 'partitions': 5 },
+            'data' : { 'partitions': 5 }
+        })
+
+        self.driver = StreamsSmokeTestDriverService(test_context, self.kafka)
+        self.driver.disable_auto_terminate()
+        self.processor1 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor2 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+        self.processor3 = StreamsUpgradeTestJobRunnerService(test_context, self.kafka)
+
+    def test_upgrade(self):
+        """
+        Starts 3 KafkaStreams instances with version 0.10.0, and upgrades one-by-one to 0.10.1
+        """
+
+        self.driver.start()
+        self.start_all_nodes_with_0100()
+
+        self.processors = [self.processor1, self.processor2, self.processor3]
+
+        counter = 1
+        random.seed()
+
+        # first rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            p.CLEAN_NODE_ENABLED = False
+            self.do_rolling_bounce(p, "0.10.0", counter)
+            counter = counter + 1
+
+        # second rolling bounce
+        random.shuffle(self.processors)
+        for p in self.processors:
+            self.do_rolling_bounce(p, "", counter)
+            counter = counter + 1
+
+        # shutdown
+        self.driver.stop()
+        self.driver.wait()
+
+        random.shuffle(self.processors)
+        for p in self.processors:
+            node = p.node
+            with node.account.monitor_log(p.STDOUT_FILE) as monitor:
+                p.stop()
+                monitor.wait_until("UPGRADE-TEST-CLIENT-CLOSED",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'UPGRADE-TEST-CLIENT-CLOSED' on" + str(node.account))
+
+        self.driver.stop()
+
+    def start_all_nodes_with_0100(self):
+        # start first with 0.10.0
+        self.prepare_for_0100(self.processor1)
+        node1 = self.processor1.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as monitor:
+            with node1.account.monitor_log(self.processor1.LOG_FILE) as log_monitor:
+                self.processor1.start()
+                log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                       timeout_sec=60,
+                                       err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node1.account))
+                monitor.wait_until("processed 100 records from topic",
+                                   timeout_sec=60,
+                                   err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+
+        # start second with 0.10.0
+        self.prepare_for_0100(self.processor2)
+        node2 = self.processor2.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node2.account.monitor_log(self.processor2.LOG_FILE) as log_monitor:
+                    self.processor2.start()
+                    log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                           timeout_sec=60,
+                                           err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node2.account))
+                    first_monitor.wait_until("processed 100 records from topic",
+                                             timeout_sec=60,
+                                             err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                    second_monitor.wait_until("processed 100 records from topic",
+                                              timeout_sec=60,
+                                              err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+
+        # start third with 0.10.0
+        self.prepare_for_0100(self.processor3)
+        node3 = self.processor3.node
+        with node1.account.monitor_log(self.processor1.STDOUT_FILE) as first_monitor:
+            with node2.account.monitor_log(self.processor2.STDOUT_FILE) as second_monitor:
+                with node3.account.monitor_log(self.processor3.STDOUT_FILE) as third_monitor:
+                    with node3.account.monitor_log(self.processor3.LOG_FILE) as log_monitor:
+                        self.processor3.start()
+                        log_monitor.wait_until("Kafka version : 0.10.0.1",
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version 0.10.0.1" + str(node3.account))
+                        first_monitor.wait_until("processed 100 records from topic",
+                                                 timeout_sec=60,
+                                                 err_msg="Never saw output 'processed 100 records from topic' on" + str(node1.account))
+                        second_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node2.account))
+                        third_monitor.wait_until("processed 100 records from topic",
+                                                  timeout_sec=60,
+                                                  err_msg="Never saw output 'processed 100 records from topic' on" + str(node3.account))
+
+    @staticmethod
+    def prepare_for_0100(processor):
+        processor.node.account.ssh("rm -rf " + processor.PERSISTENT_ROOT, allow_fail=False)
+        processor.set_version(str(LATEST_0_10_0))
+
+    def do_rolling_bounce(self, processor, upgrade_from, counter):
+        first_other_processor = None
+        second_other_processor = None
+        for p in self.processors:
+            if p != processor:
+                if first_other_processor is None:
+                    first_other_processor = p
+                else:
+                    second_other_processor = p
+
+        node = processor.node
+        first_other_node = first_other_processor.node
+        second_other_node = second_other_processor.node
+
+        # stop processor and wait for rebalance of others
+        with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+            with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                processor.stop()
+                first_other_monitor.wait_until("processed 100 records from topic",
+                                               timeout_sec=60,
+                                               err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                second_other_monitor.wait_until("processed 100 records from topic",
+                                                timeout_sec=60,
+                                                err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+        node.account.ssh_capture("grep UPGRADE-TEST-CLIENT-CLOSED %s" % processor.STDOUT_FILE, allow_fail=False)
+
+        if upgrade_from == "":  # upgrade disabled -- second round of rolling bounces
+            roll_counter = ".1-"  # second round of rolling bounces
+        else:
+            roll_counter = ".0-"  # first  round of rolling boundes
+
+        node.account.ssh("mv " + processor.STDOUT_FILE + " " + processor.STDOUT_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.STDERR_FILE + " " + processor.STDERR_FILE + roll_counter + str(counter), allow_fail=False)
+        node.account.ssh("mv " + processor.LOG_FILE + " " + processor.LOG_FILE + roll_counter + str(counter), allow_fail=False)
+
+        processor.set_version("")  # set to TRUNK
+        processor.set_upgrade_from(upgrade_from)
+
+        grep_metadata_error = "grep \"org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode subscription data: version=2\" "
+        with node.account.monitor_log(processor.STDOUT_FILE) as monitor:
+            with node.account.monitor_log(processor.LOG_FILE) as log_monitor:
+                with first_other_node.account.monitor_log(first_other_processor.STDOUT_FILE) as first_other_monitor:
+                    with second_other_node.account.monitor_log(second_other_processor.STDOUT_FILE) as second_other_monitor:
+                        processor.start()
+
+                        log_monitor.wait_until("Kafka version : " + str(TRUNK_VERSION),
+                                               timeout_sec=60,
+                                               err_msg="Could not detect Kafka Streams version " + str(TRUNK_VERSION) + " " + str(node.account))
+                        first_other_monitor.wait_until("processed 100 records from topic",
+                                                       timeout_sec=60,
+                                                       err_msg="Never saw output 'processed 100 records from topic' on" + str(first_other_node.account))
+                        found = list(first_other_node.account.ssh_capture(grep_metadata_error + first_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        second_other_monitor.wait_until("processed 100 records from topic",
+                                                        timeout_sec=60,
+                                                        err_msg="Never saw output 'processed 100 records from topic' on" + str(second_other_node.account))
+                        found = list(second_other_node.account.ssh_capture(grep_metadata_error + second_other_processor.STDERR_FILE, allow_fail=True))
+                        if len(found) > 0:
+                            raise Exception("Kafka Streams failed with 'unable to decode subscription data: version=2'")
+
+                        monitor.wait_until("processed 100 records from topic",
+                                           timeout_sec=60,
+                                           err_msg="Never saw output 'processed 100 records from topic' on" + str(node.account))
\ No newline at end of file
diff --git a/tests/kafkatest/version.py b/tests/kafkatest/version.py
index 239a9f45d37..ebf5ecf6a51 100644
--- a/tests/kafkatest/version.py
+++ b/tests/kafkatest/version.py
@@ -64,6 +64,7 @@ def get_version(node=None):
         return TRUNK
 
 TRUNK = KafkaVersion("trunk")
+TRUNK_VERSION = KafkaVersion("0.10.1.2-SNAPSHOT")
 
 # 0.8.2.X versions
 V_0_8_2_1 = KafkaVersion("0.8.2.1")
@@ -78,7 +79,11 @@ def get_version(node=None):
 # 0.10.0.X versions
 V_0_10_0_0 = KafkaVersion("0.10.0.0")
 V_0_10_0_1 = KafkaVersion("0.10.0.1")
-# Adding 0.10.0 as the next version will be 0.10.1.x
 LATEST_0_10_0 = V_0_10_0_1
 
-LATEST_0_10 = LATEST_0_10_0
\ No newline at end of file
+# 0.10.1.X versions
+V_0_10_1_0 = KafkaVersion("0.10.1.0")
+V_0_10_1_1 = KafkaVersion("0.10.1.1")
+LATEST_0_10_1 = V_0_10_1_1
+
+LATEST_0_10 = LATEST_0_10_1
diff --git a/vagrant/base.sh b/vagrant/base.sh
index 88878dcc2ed..b8cde3a48cc 100755
--- a/vagrant/base.sh
+++ b/vagrant/base.sh
@@ -52,6 +52,8 @@ get_kafka() {
 
     kafka_dir=/opt/kafka-$version
     url=https://s3-us-west-2.amazonaws.com/kafka-packages-$version/kafka_2.10-$version.tgz
+    # the .tgz above does not include the streams test jar hence we need to get it separately
+    url_streams_test=https://s3-us-west-2.amazonaws.com/kafka-packages/kafka-streams-$version-test.jar
     if [ ! -d /opt/kafka-$version ]; then
         pushd /tmp
         curl -O $url


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> ERROR "SubscriptionInfo - unable to decode subscription data: version=2" when upgrading from 0.10.0.0 to 0.10.2.1
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-6054
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6054
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.1
>            Reporter: James Cheng
>            Assignee: Matthias J. Sax
>            Priority: Major
>              Labels: kip
>             Fix For: 1.2.0
>
>
> KIP: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-268%3A+Simplify+Kafka+Streams+Rebalance+Metadata+Upgrade]
> We upgraded an app from kafka-streams 0.10.0.0 to 0.10.2.1. We did a rolling upgrade of the app, so that one point, there were both 0.10.0.0-based instances and 0.10.2.1-based instances running.
> We observed the following stack trace:
> {code:java}
> 2017-10-11 07:02:19.964 [StreamThread-3] ERROR o.a.k.s.p.i.a.SubscriptionInfo -
> unable to decode subscription data: version=2
> org.apache.kafka.streams.errors.TaskAssignmentException: unable to decode
> subscription data: version=2
>         at org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo.decode(SubscriptionInfo.java:113)
>         at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:235)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:260)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:404)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$900(AbstractCoordinator.java:81)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:358)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:340)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
>         at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
>         at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
>         at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
>         at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
>         at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
>         at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
>         at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
>         at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
>         
> {code}
> I spoke with [~mjsax] and he said this is a known issue that happens when you have both 0.10.0.0 instances and 0.10.2.1 instances running at the same time, because the internal version number of the protocol changed when adding Interactive Queries. Matthias asked me to file this JIRA>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message