kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/2] kafka git commit: KAFKA-4060: Remove zk client dependency in kafka streams
Date Wed, 11 Jan 2017 17:15:30 GMT
KAFKA-4060: Remove zk client dependency in kafka streams

dguy guozhangwang This is a new PR for KAFKA-4060.

Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>
Author: Hojjat Jafarpour <hojjat@HojjatJpoursMBP.attlocal.net>

Reviewers: Damian Guy, Matthias J. Sax, Isamel Juma, Guozhang Wang

Closes #1884 from hjafarpour/KAFKA-4060-Remove-ZkClient-dependency-in-Kafka-Streams-new


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4b71c0bd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4b71c0bd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4b71c0bd

Branch: refs/heads/trunk
Commit: 4b71c0bdc1acf244e3c96fa809a1a0e48471d586
Parents: a95170f
Author: Hojjat Jafarpour <hojjat@Hojjat-Jafarpours-MBP.local>
Authored: Wed Jan 11 09:15:26 2017 -0800
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed Jan 11 09:15:26 2017 -0800

----------------------------------------------------------------------
 build.gradle                                    |  15 -
 .../examples/pageview/PageViewTypedDemo.java    |   1 -
 .../examples/pageview/PageViewUntypedDemo.java  |   1 -
 .../examples/wordcount/WordCountDemo.java       |   1 -
 .../wordcount/WordCountProcessorDemo.java       |   1 -
 .../org/apache/kafka/streams/StreamsConfig.java | 301 ++++++++++++-------
 .../internals/InternalTopicManager.java         | 264 +++++-----------
 .../internals/StreamPartitionAssignor.java      |  22 +-
 .../processor/internals/StreamThread.java       |  11 +-
 .../processor/internals/StreamsKafkaClient.java | 270 +++++++++++++++++
 .../integration/FanoutIntegrationTest.java      |   1 -
 .../InternalTopicIntegrationTest.java           |  33 +-
 .../KStreamAggregationDedupIntegrationTest.java |   1 -
 .../KStreamAggregationIntegrationTest.java      |   2 +-
 .../KStreamKTableJoinIntegrationTest.java       |   1 -
 .../integration/KStreamRepartitionJoinTest.java |   1 -
 .../QueryableStateIntegrationTest.java          |   1 -
 .../integration/ResetIntegrationTest.java       |   1 -
 .../integration/utils/EmbeddedKafkaCluster.java |   1 -
 .../kafka/streams/perf/SimpleBenchmark.java     |   4 -
 .../internals/StreamPartitionAssignorTest.java  |  70 +++--
 .../processor/internals/StreamThreadTest.java   |  24 +-
 .../streams/smoketest/SmokeTestClient.java      |   1 -
 .../kafka/test/MockInternalTopicManager.java    |  55 ++++
 24 files changed, 672 insertions(+), 411 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index dba3376..96bfa62 100644
--- a/build.gradle
+++ b/build.gradle
@@ -699,21 +699,6 @@ project(':streams') {
     compile libs.slf4jApi
     compile libs.rocksDBJni
 
-    // the following 3 dependencies should be removed after KIP-4 (zkclient, zookeeper, jacksonDatabind)
-    compile (libs.zkclient) {
-      exclude module: 'zookeeper'
-    }
-
-    compile (libs.zookeeper) {
-      exclude module: 'jline'
-      exclude module: 'netty'
-      // users should be able to choose the logging implementation (and slf4j bridge)
-      exclude module: 'slf4j-log4j12'
-      exclude module: 'log4j'
-    }
-
-    compile libs.jacksonDatabind
-
     testCompile project(':clients').sourceSets.test.output
     testCompile project(':core')
     testCompile project(':core').sourceSets.test.output

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
index f88f62f..4756722 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java
@@ -83,7 +83,6 @@ public class PageViewTypedDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
index 77cf0ca..67e8cc5 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java
@@ -59,7 +59,6 @@ public class PageViewUntypedDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class);
 
         // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
index bf1d8cb..99fbc15 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java
@@ -49,7 +49,6 @@ public class WordCountDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
----------------------------------------------------------------------
diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index 1ee6928..e0560a9 100644
--- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -106,7 +106,6 @@ public class WordCountProcessorDemo {
         Properties props = new Properties();
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor");
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 53f49ec..6b20e98 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -141,123 +141,194 @@ public class StreamsConfig extends AbstractConfig {
     public static final String CACHE_MAX_BYTES_BUFFERING_CONFIG = "cache.max.bytes.buffering";
     public static final String CACHE_MAX_BYTES_BUFFERING_DOC = "Maximum number of memory bytes to be used for buffering across all threads";
 
+    public static final String SECURITY_PROTOCOL_CONFIG = CommonClientConfigs.SECURITY_PROTOCOL_CONFIG;
+    public static final String SECURITY_PROTOCOL_DOC = CommonClientConfigs.SECURITY_PROTOCOL_DOC;
+    public static final String DEFAULT_SECURITY_PROTOCOL = CommonClientConfigs.DEFAULT_SECURITY_PROTOCOL;
+
+    public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG;
+    public static final String CONNECTIONS_MAX_IDLE_MS_DOC = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_DOC;
+
+    public static final String RETRY_BACKOFF_MS_CONFIG = CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG;
+    public static final String RETRY_BACKOFF_MS_DOC = CommonClientConfigs.RETRY_BACKOFF_MS_DOC;
+
+    public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
+    public static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC;
+
+    public static final String RECONNECT_BACKOFF_MS_CONFIG = CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG;
+    public static final String RECONNECT_BACKOFF_MS_DOC = CommonClientConfigs.RECONNECT_BACKOFF_MS_DOC;
+
+    public static final String SEND_BUFFER_CONFIG = CommonClientConfigs.SEND_BUFFER_CONFIG;
+    public static final String SEND_BUFFER_DOC = CommonClientConfigs.SEND_BUFFER_DOC;
+
+    public static final String RECEIVE_BUFFER_CONFIG = CommonClientConfigs.RECEIVE_BUFFER_CONFIG;
+    public static final String RECEIVE_BUFFER_DOC = CommonClientConfigs.RECEIVE_BUFFER_DOC;
+
+    public static final String REQUEST_TIMEOUT_MS_CONFIG = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG;
+    public static final String REQUEST_TIMEOUT_MS_DOC = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC;
+
     static {
         CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG,      // required with no default value
-                                        Type.STRING,
-                                        Importance.HIGH,
-                                        StreamsConfig.APPLICATION_ID_DOC)
-                                .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
-                                        Type.LIST,
-                                        Importance.HIGH,
-                                        CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
-                                .define(CLIENT_ID_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.HIGH,
-                                        CommonClientConfigs.CLIENT_ID_DOC)
-                                .define(ZOOKEEPER_CONNECT_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.HIGH,
-                                        StreamsConfig.ZOOKEEPER_CONNECT_DOC)
-                                .define(STATE_DIR_CONFIG,
-                                        Type.STRING,
-                                        "/tmp/kafka-streams",
-                                        Importance.MEDIUM,
-                                        STATE_DIR_DOC)
-                                .define(REPLICATION_FACTOR_CONFIG,
-                                        Type.INT,
-                                        1,
-                                        Importance.MEDIUM,
-                                        REPLICATION_FACTOR_DOC)
-                                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        FailOnInvalidTimestamp.class.getName(),
-                                        Importance.MEDIUM,
-                                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
-                                .define(PARTITION_GROUPER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        DefaultPartitionGrouper.class.getName(),
-                                        Importance.MEDIUM,
-                                        PARTITION_GROUPER_CLASS_DOC)
-                                .define(KEY_SERDE_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Serdes.ByteArraySerde.class.getName(),
-                                        Importance.MEDIUM,
-                                        KEY_SERDE_CLASS_DOC)
-                                .define(VALUE_SERDE_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        Serdes.ByteArraySerde.class.getName(),
-                                        Importance.MEDIUM,
-                                        VALUE_SERDE_CLASS_DOC)
-                                .define(COMMIT_INTERVAL_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        Importance.LOW,
-                                        COMMIT_INTERVAL_MS_DOC)
-                                .define(POLL_MS_CONFIG,
-                                        Type.LONG,
-                                        100,
-                                        Importance.LOW,
-                                        POLL_MS_DOC)
-                                .define(NUM_STREAM_THREADS_CONFIG,
-                                        Type.INT,
-                                        1,
-                                        Importance.LOW,
-                                        NUM_STREAM_THREADS_DOC)
-                                .define(NUM_STANDBY_REPLICAS_CONFIG,
-                                        Type.INT,
-                                        0,
-                                        Importance.LOW,
-                                        NUM_STANDBY_REPLICAS_DOC)
-                                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
-                                        Type.INT,
-                                        1000,
-                                        Importance.LOW,
-                                        BUFFERED_RECORDS_PER_PARTITION_DOC)
-                                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
-                                        Type.LONG,
-                                        60000,
-                                        Importance.LOW,
-                                        STATE_CLEANUP_DELAY_MS_DOC)
-                                .define(METRIC_REPORTER_CLASSES_CONFIG,
-                                        Type.LIST,
-                                        "",
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
-                                        Type.LONG,
-                                        30000,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
-                                .define(METRICS_NUM_SAMPLES_CONFIG,
-                                        Type.INT,
-                                        2,
-                                        atLeast(1),
-                                        Importance.LOW,
-                                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
-                                .define(APPLICATION_SERVER_CONFIG,
-                                        Type.STRING,
-                                        "",
-                                        Importance.LOW,
-                                        APPLICATION_SERVER_DOC)
-                                .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
-                                        Type.CLASS,
-                                        null,
-                                        Importance.LOW,
-                                        ROCKSDB_CONFIG_SETTER_CLASS_DOC)
-                                .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
-                                        Type.LONG,
-                                        24 * 60 * 60 * 1000,
-                                        Importance.MEDIUM,
-                                        WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
-                                .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
-                                        Type.LONG,
-                                        10 * 1024 * 1024L,
-                                        atLeast(0),
-                                        Importance.LOW,
-                                        CACHE_MAX_BYTES_BUFFERING_DOC);
+                Type.STRING,
+                Importance.HIGH,
+                StreamsConfig.APPLICATION_ID_DOC)
+                .define(BOOTSTRAP_SERVERS_CONFIG,       // required with no default value
+                        Type.LIST,
+                        Importance.HIGH,
+                        CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                .define(CLIENT_ID_CONFIG,
+                        Type.STRING,
+                        "",
+                        Importance.HIGH,
+                        CommonClientConfigs.CLIENT_ID_DOC)
+                .define(ZOOKEEPER_CONNECT_CONFIG,
+                        Type.STRING,
+                        "",
+                        Importance.HIGH,
+                        StreamsConfig.ZOOKEEPER_CONNECT_DOC)
+                .define(STATE_DIR_CONFIG,
+                        Type.STRING,
+                        "/tmp/kafka-streams",
+                        Importance.MEDIUM,
+                        STATE_DIR_DOC)
+                .define(REPLICATION_FACTOR_CONFIG,
+                        Type.INT,
+                        1,
+                        Importance.MEDIUM,
+                        REPLICATION_FACTOR_DOC)
+                .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
+                        Type.CLASS,
+                        FailOnInvalidTimestamp.class.getName(),
+                        Importance.MEDIUM,
+                        TIMESTAMP_EXTRACTOR_CLASS_DOC)
+                .define(PARTITION_GROUPER_CLASS_CONFIG,
+                        Type.CLASS,
+                        DefaultPartitionGrouper.class.getName(),
+                        Importance.MEDIUM,
+                        PARTITION_GROUPER_CLASS_DOC)
+                .define(KEY_SERDE_CLASS_CONFIG,
+                        Type.CLASS,
+                        Serdes.ByteArraySerde.class.getName(),
+                        Importance.MEDIUM,
+                        KEY_SERDE_CLASS_DOC)
+                .define(VALUE_SERDE_CLASS_CONFIG,
+                        Type.CLASS,
+                        Serdes.ByteArraySerde.class.getName(),
+                        Importance.MEDIUM,
+                        VALUE_SERDE_CLASS_DOC)
+                .define(COMMIT_INTERVAL_MS_CONFIG,
+                        Type.LONG,
+                        30000,
+                        Importance.LOW,
+                        COMMIT_INTERVAL_MS_DOC)
+                .define(POLL_MS_CONFIG,
+                        Type.LONG,
+                        100,
+                        Importance.LOW,
+                        POLL_MS_DOC)
+                .define(NUM_STREAM_THREADS_CONFIG,
+                        Type.INT,
+                        1,
+                        Importance.LOW,
+                        NUM_STREAM_THREADS_DOC)
+                .define(NUM_STANDBY_REPLICAS_CONFIG,
+                        Type.INT,
+                        0,
+                        Importance.LOW,
+                        NUM_STANDBY_REPLICAS_DOC)
+                .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG,
+                        Type.INT,
+                        1000,
+                        Importance.LOW,
+                        BUFFERED_RECORDS_PER_PARTITION_DOC)
+                .define(STATE_CLEANUP_DELAY_MS_CONFIG,
+                        Type.LONG,
+                        60000,
+                        Importance.LOW,
+                        STATE_CLEANUP_DELAY_MS_DOC)
+                .define(METRIC_REPORTER_CLASSES_CONFIG,
+                        Type.LIST,
+                        "",
+                        Importance.LOW,
+                        CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
+                .define(METRICS_SAMPLE_WINDOW_MS_CONFIG,
+                        Type.LONG,
+                        30000,
+                        atLeast(0),
+                        Importance.LOW,
+                        CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_DOC)
+                .define(METRICS_NUM_SAMPLES_CONFIG,
+                        Type.INT,
+                        2,
+                        atLeast(1),
+                        Importance.LOW,
+                        CommonClientConfigs.METRICS_NUM_SAMPLES_DOC)
+                .define(APPLICATION_SERVER_CONFIG,
+                        Type.STRING,
+                        "",
+                        Importance.LOW,
+                        APPLICATION_SERVER_DOC)
+                .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
+                        Type.CLASS,
+                        null,
+                        Importance.LOW,
+                        ROCKSDB_CONFIG_SETTER_CLASS_DOC)
+                .define(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
+                        Type.LONG,
+                        24 * 60 * 60 * 1000,
+                        Importance.MEDIUM,
+                        WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_DOC)
+                .define(CACHE_MAX_BYTES_BUFFERING_CONFIG,
+                        Type.LONG,
+                        10 * 1024 * 1024L,
+                        atLeast(0),
+                        Importance.LOW,
+                        CACHE_MAX_BYTES_BUFFERING_DOC)
+                .define(SECURITY_PROTOCOL_CONFIG,
+                        Type.STRING,
+                        DEFAULT_SECURITY_PROTOCOL,
+                        Importance.MEDIUM,
+                        SECURITY_PROTOCOL_DOC)
+                .define(CONNECTIONS_MAX_IDLE_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        9 * 60 * 1000,
+                        ConfigDef.Importance.MEDIUM,
+                        CONNECTIONS_MAX_IDLE_MS_DOC)
+                .define(RETRY_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        100L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        RETRY_BACKOFF_MS_DOC)
+                .define(METADATA_MAX_AGE_CONFIG,
+                        ConfigDef.Type.LONG,
+                        5 * 60 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.LOW,
+                        METADATA_MAX_AGE_DOC)
+                .define(RECONNECT_BACKOFF_MS_CONFIG,
+                        ConfigDef.Type.LONG,
+                        50L,
+                        atLeast(0L),
+                        ConfigDef.Importance.LOW,
+                        RECONNECT_BACKOFF_MS_DOC)
+                .define(SEND_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        128 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        SEND_BUFFER_DOC)
+                .define(RECEIVE_BUFFER_CONFIG,
+                        ConfigDef.Type.INT,
+                        32 * 1024,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        RECEIVE_BUFFER_DOC)
+                .define(REQUEST_TIMEOUT_MS_CONFIG,
+                        ConfigDef.Type.INT,
+                        40 * 1000,
+                        atLeast(0),
+                        ConfigDef.Importance.MEDIUM,
+                        REQUEST_TIMEOUT_MS_DOC);
     }
 
     // this is the list of configs for underlying clients

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
index c586779..5794e30 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java
@@ -17,242 +17,116 @@
 
 package org.apache.kafka.streams.processor.internals;
 
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.core.type.TypeReference;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.exception.ZkNoNodeException;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
+import org.apache.kafka.common.requests.MetadataResponse;
 import org.apache.kafka.streams.errors.StreamsException;
-import org.apache.zookeeper.ZooDefs;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.io.UnsupportedEncodingException;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Collection;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 public class InternalTopicManager {
 
     private static final Logger log = LoggerFactory.getLogger(InternalTopicManager.class);
-
-    // TODO: the following ZK dependency should be removed after KIP-4
-    private static final String ZK_TOPIC_PATH = "/brokers/topics";
-    private static final String ZK_BROKER_PATH = "/brokers/ids";
-    private static final String ZK_DELETE_TOPIC_PATH = "/admin/delete_topics";
-    private static final String ZK_ENTITY_CONFIG_PATH = "/config/topics";
-    // TODO: the following LogConfig dependency should be removed after KIP-4
     public static final String CLEANUP_POLICY_PROP = "cleanup.policy";
     public static final String RETENTION_MS = "retention.ms";
     public static final Long WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
+    private static final int MAX_TOPIC_READY_TRY = 5;
 
-    final ZkClient zkClient;
-    private final int replicationFactor;
     private final long windowChangeLogAdditionalRetention;
 
-    private class ZKStringSerializer implements ZkSerializer {
-
-        /**
-         * @throws AssertionError if the byte String encoding type is not supported
-         */
-        @Override
-        public byte[] serialize(Object data) {
-            try {
-                return ((String) data).getBytes("UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-
-        /**
-         * @throws AssertionError if the byte String encoding type is not supported
-         */
-        @Override
-        public Object deserialize(byte[] bytes) {
-            try {
-                if (bytes == null)
-                    return null;
-                else
-                    return new String(bytes, "UTF-8");
-            } catch (UnsupportedEncodingException e) {
-                throw new AssertionError(e);
-            }
-        }
-    }
-
-    public InternalTopicManager() {
-        this.zkClient = null;
-        this.replicationFactor = 0;
-        this.windowChangeLogAdditionalRetention = WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT;
-    }
+    private final int replicationFactor;
+    private final StreamsKafkaClient streamsKafkaClient;
 
-    public InternalTopicManager(String zkConnect, final int replicationFactor, long windowChangeLogAdditionalRetention) {
-        this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer());
+    public InternalTopicManager(final StreamsKafkaClient streamsKafkaClient, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+        this.streamsKafkaClient = streamsKafkaClient;
         this.replicationFactor = replicationFactor;
         this.windowChangeLogAdditionalRetention = windowChangeLogAdditionalRetention;
     }
 
-    public void makeReady(InternalTopicConfig topic, int numPartitions) {
-        boolean topicNotReady = true;
-
-        while (topicNotReady) {
-            Map<Integer, List<Integer>> topicMetadata = getTopicMetadata(topic.name());
+    /**
+     * Prepares the set of given internal topics. If the topic with the correct number of partitions exists ignores it. For the ones with different number of
+     * partitions delete them and create new ones with correct number of partitons along with the non existing topics.
+     * @param topic
+     */
+    public void makeReady(final InternalTopicConfig topic, int numPartitions) {
 
-            if (topicMetadata == null) {
-                try {
-                    createTopic(topic, numPartitions, replicationFactor);
-                } catch (ZkNodeExistsException e) {
-                    // ignore and continue
-                }
-            } else {
-                if (topicMetadata.size() > numPartitions) {
-                    // else if topic exists with more #.partitions than needed, delete in order to re-create it
-                    try {
-                        deleteTopic(topic.name());
-                    } catch (ZkNodeExistsException e) {
-                        // ignore and continue
-                    }
-                } else if (topicMetadata.size() < numPartitions) {
-                    // else if topic exists with less #.partitions than needed, add partitions
-                    try {
-                        addPartitions(topic.name(), numPartitions - topicMetadata.size(), replicationFactor, topicMetadata);
-                    } catch (ZkNoNodeException e) {
-                        // ignore and continue
-                    }
-                } else {
-                    topicNotReady = false;
-                }
+        Map<InternalTopicConfig, Integer> topics = new HashMap<>();
+        topics.put(topic, numPartitions);
+        for (int i = 0; i < MAX_TOPIC_READY_TRY; i++) {
+            try {
+                Collection<MetadataResponse.TopicMetadata> topicMetadatas = streamsKafkaClient.fetchTopicMetadata();
+                Map<InternalTopicConfig, Integer> topicsToBeDeleted = getTopicsToBeDeleted(topics, topicMetadatas);
+                Map<InternalTopicConfig, Integer> topicsToBeCreated = filterExistingTopics(topics, topicMetadatas);
+                topicsToBeCreated.putAll(topicsToBeDeleted);
+                streamsKafkaClient.deleteTopics(topicsToBeDeleted);
+                streamsKafkaClient.createTopics(topicsToBeCreated, replicationFactor, windowChangeLogAdditionalRetention);
+                return;
+            } catch (StreamsException ex) {
+                log.debug("Could not create internal topics: " + ex.getMessage());
+                log.debug("Retry #" + i);
             }
         }
+        throw new StreamsException("Could not create internal topics.");
     }
 
-    private List<Integer> getBrokers() {
-        List<Integer> brokers = new ArrayList<>();
-        for (String broker: zkClient.getChildren(ZK_BROKER_PATH)) {
-            brokers.add(Integer.parseInt(broker));
-        }
-        Collections.sort(brokers);
-
-        log.debug("Read brokers {} from ZK in partition assignor.", brokers);
-
-        return brokers;
-    }
-
-    @SuppressWarnings("unchecked")
-    private Map<Integer, List<Integer>> getTopicMetadata(String topic) {
-        String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true);
-
-        if (data == null) return null;
-
+    public void close() {
         try {
-            ObjectMapper mapper = new ObjectMapper();
-
-            Map<String, Object> dataMap = mapper.readValue(data, new TypeReference<Map<String, Object>>() {
-
-            });
-
-            Map<Integer, List<Integer>> partitions = (Map<Integer, List<Integer>>) dataMap.get("partitions");
-
-            log.debug("Read partitions {} for topic {} from ZK in partition assignor.", partitions, topic);
-
-            return partitions;
+            streamsKafkaClient.close();
         } catch (IOException e) {
-            throw new StreamsException("Error while reading topic metadata from ZK for internal topic " + topic, e);
+            log.warn("Could not close StreamsKafkaClient.");
         }
     }
 
-    private void createTopic(InternalTopicConfig topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException {
-        log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic.name(), numPartitions);
-        ObjectMapper mapper = new ObjectMapper();
-        List<Integer> brokers = getBrokers();
-        int numBrokers = brokers.size();
-        if (numBrokers < replicationFactor) {
-            log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " +  numBrokers);
-            replicationFactor = numBrokers;
-        }
-
-        Map<Integer, List<Integer>> assignment = new HashMap<>();
-
-        for (int i = 0; i < numPartitions; i++) {
-            ArrayList<Integer> brokerList = new ArrayList<>();
-            for (int r = 0; r < replicationFactor; r++) {
-                int shift = r * numBrokers / replicationFactor;
-                brokerList.add(brokers.get((i + shift) % numBrokers));
+    /**
+     * Return the non existing topics.
+     *
+     * @param topicsPartitionsMap
+     * @param topicsMetadata
+     * @return
+     */
+    private Map<InternalTopicConfig, Integer> filterExistingTopics(final Map<InternalTopicConfig, Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
+        Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
+        Map<InternalTopicConfig, Integer> nonExistingTopics = new HashMap<>();
+        // Add the topics that don't exist to the nonExistingTopics.
+        for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
+            if (existingTopicNamesPartitions.get(topic.name()) == null) {
+                nonExistingTopics.put(topic, topicsPartitionsMap.get(topic));
             }
-            assignment.put(i, brokerList);
-        }
-        // write out config first just like in AdminUtils.scala createOrUpdateTopicPartitionAssignmentPathInZK()
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("config", topic.toProperties(windowChangeLogAdditionalRetention));
-            String data = mapper.writeValueAsString(dataMap);
-            zkClient.createPersistent(ZK_ENTITY_CONFIG_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        } catch (JsonProcessingException e) {
-            throw new StreamsException("Error while creating topic config in ZK for internal topic " + topic, e);
-        }
-
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", assignment);
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.createPersistent(ZK_TOPIC_PATH + "/" + topic.name(), data, ZooDefs.Ids.OPEN_ACL_UNSAFE);
-        } catch (JsonProcessingException e) {
-            throw new StreamsException("Error while creating topic metadata in ZK for internal topic " + topic, e);
         }
+        return nonExistingTopics;
     }
 
-    private void deleteTopic(String topic) throws ZkNodeExistsException {
-        log.debug("Deleting topic {} from ZK in partition assignor.", topic);
-
-        zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE);
-    }
-
-    private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) {
-        log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment);
-
-        List<Integer> brokers = getBrokers();
-        int numBrokers = brokers.size();
-        if (numBrokers < replicationFactor) {
-            log.warn("Not enough brokers found. The replication factor is reduced from " + replicationFactor + " to " +  numBrokers);
-            replicationFactor = numBrokers;
-        }
-
-        int startIndex = existingAssignment.size();
-
-        Map<Integer, List<Integer>> newAssignment = new HashMap<>(existingAssignment);
-
-        for (int i = 0; i < numPartitions; i++) {
-            ArrayList<Integer> brokerList = new ArrayList<>();
-            for (int r = 0; r < replicationFactor; r++) {
-                int shift = r * numBrokers / replicationFactor;
-                brokerList.add(brokers.get((i + shift) % numBrokers));
+    /**
+     * Return the topics that exist but have different partiton number to be deleted.
+     * @param topicsPartitionsMap
+     * @param topicsMetadata
+     * @return
+     */
+    private Map<InternalTopicConfig, Integer> getTopicsToBeDeleted(final Map<InternalTopicConfig, Integer> topicsPartitionsMap, Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
+        Map<String, Integer> existingTopicNamesPartitions = getExistingTopicNamesPartitions(topicsMetadata);
+        Map<InternalTopicConfig, Integer> deleteTopics = new HashMap<>();
+        // Add the topics that don't exist to the nonExistingTopics.
+        for (InternalTopicConfig topic: topicsPartitionsMap.keySet()) {
+            if (existingTopicNamesPartitions.get(topic.name()) != null) {
+                if (existingTopicNamesPartitions.get(topic.name()) != topicsPartitionsMap.get(topic)) {
+                    deleteTopics.put(topic, topicsPartitionsMap.get(topic));
+                }
             }
-            newAssignment.put(i + startIndex, brokerList);
         }
+        return deleteTopics;
+    }
 
-        // try to write to ZK with open ACL
-        try {
-            Map<String, Object> dataMap = new HashMap<>();
-            dataMap.put("version", 1);
-            dataMap.put("partitions", newAssignment);
-
-            ObjectMapper mapper = new ObjectMapper();
-            String data = mapper.writeValueAsString(dataMap);
-
-            zkClient.writeData(ZK_TOPIC_PATH + "/" + topic, data);
-        } catch (JsonProcessingException e) {
-            throw new StreamsException("Error while updating topic metadata in ZK for internal topic " + topic, e);
+    private Map<String, Integer> getExistingTopicNamesPartitions(Collection<MetadataResponse.TopicMetadata> topicsMetadata) {
+        // The names of existing topics
+        Map<String, Integer> existingTopicNamesPartitions = new HashMap<>();
+        for (MetadataResponse.TopicMetadata topicMetadata: topicsMetadata) {
+            existingTopicNamesPartitions.put(topicMetadata.topic(), topicMetadata.partitionMetadata().size());
         }
+        return existingTopicNamesPartitions;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 53607e8..7b48a6f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -205,16 +205,12 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
             this.userEndPoint = userEndPoint;
         }
 
-        if (configs.containsKey(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG)) {
-            internalTopicManager = new InternalTopicManager(
-                    (String) configs.get(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG),
-                    configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
-                    configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
-                            (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
-                            : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
-        } else {
-            log.info("stream-thread [{}] Config '{}' isn't supplied and hence no internal topics will be created.",  streamThread.getName(), StreamsConfig.ZOOKEEPER_CONNECT_CONFIG);
-        }
+        internalTopicManager = new InternalTopicManager(
+                new StreamsKafkaClient(this.streamThread.config),
+                configs.containsKey(StreamsConfig.REPLICATION_FACTOR_CONFIG) ? (Integer) configs.get(StreamsConfig.REPLICATION_FACTOR_CONFIG) : 1,
+                configs.containsKey(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG) ?
+                        (Long) configs.get(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG)
+                        : WINDOW_CHANGE_LOG_ADDITIONAL_RETENTION_DEFAULT);
     }
 
     @Override
@@ -340,7 +336,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                                 }
                             }
                         }
-
                         // if we still have not find the right number of partitions,
                         // another iteration is needed
                         if (numPartitions == UNKNOWN)
@@ -535,7 +530,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
 
                 // finally, encode the assignment before sending back to coordinator
                 assignment.put(consumer, new Assignment(activePartitions, new AssignmentInfo(active, standby, partitionsByHostState).encode()));
-
                 i++;
             }
         }
@@ -682,7 +676,6 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
                 }
             }
         }
-
         // enforce co-partitioning restrictions to repartition topics by updating their number of partitions
         for (Map.Entry<String, InternalTopicMetadata> entry : allRepartitionTopicsNumPartitions.entrySet()) {
             if (copartitionGroup.contains(entry.getKey())) {
@@ -752,4 +745,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable
         }
     }
 
+    public void close() {
+        internalTopicManager.close();
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 07c348d..65e857f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -404,15 +404,12 @@ public class StreamThread extends Thread {
         } catch (Throwable e) {
             log.error("{} Failed to close restore consumer: ", logPrefix, e);
         }
-
-        // TODO remove this
-        // hotfix to improve ZK behavior als long as KAFKA-4060 is not fixed (c.f. KAFKA-4369)
-        // when removing this, make StreamPartitionAssignor#internalTopicManager "private" again
-        if (partitionAssignor != null && partitionAssignor.internalTopicManager != null) {
-            partitionAssignor.internalTopicManager.zkClient.close();
+        try {
+            partitionAssignor.close();
+        } catch (Throwable e) {
+            log.error("stream-thread [{}] Failed to close KafkaStreamClient: ", this.getName(), e);
         }
 
-        // remove all tasks
         removeStreamTasks();
         removeStandbyTasks();
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
new file mode 100644
index 0000000..132c1af
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClient.java
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE
+ * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file
+ * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the
+ * License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.clients.ClientUtils;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.NetworkClient;
+import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.ClientRequest;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.metrics.JmxReporter;
+import org.apache.kafka.common.metrics.MetricConfig;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.MetricsReporter;
+import org.apache.kafka.common.network.ChannelBuilder;
+import org.apache.kafka.common.network.Selector;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.CreateTopicsRequest;
+import org.apache.kafka.common.requests.CreateTopicsResponse;
+import org.apache.kafka.common.requests.DeleteTopicsRequest;
+import org.apache.kafka.common.requests.DeleteTopicsResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.AbstractRequest;
+import org.apache.kafka.common.requests.RequestHeader;
+import org.apache.kafka.common.utils.SystemTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.errors.StreamsException;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
+import java.util.List;
+import java.util.LinkedHashMap;
+import java.util.Properties;
+import java.util.Collection;
+import java.util.concurrent.TimeUnit;
+
+public class StreamsKafkaClient {
+
+    private final KafkaClient kafkaClient;
+    private final List<MetricsReporter> reporters;
+    private final StreamsConfig streamsConfig;
+
+    private static final int MAX_INFLIGHT_REQUESTS = 100;
+
+    public StreamsKafkaClient(final StreamsConfig streamsConfig) {
+
+        this.streamsConfig = streamsConfig;
+        final Time time = new SystemTime();
+
+        final Map<String, String> metricTags = new LinkedHashMap<>();
+        metricTags.put("client-id", StreamsConfig.CLIENT_ID_CONFIG);
+
+        final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
+        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        metadata.update(Cluster.bootstrap(addresses), time.milliseconds());
+
+        final MetricConfig metricConfig = new MetricConfig().samples(streamsConfig.getInt(CommonClientConfigs.METRICS_NUM_SAMPLES_CONFIG))
+                .timeWindow(streamsConfig.getLong(CommonClientConfigs.METRICS_SAMPLE_WINDOW_MS_CONFIG), TimeUnit.MILLISECONDS)
+                .tags(metricTags);
+        reporters = streamsConfig.getConfiguredInstances(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG,
+                MetricsReporter.class);
+        // TODO: This should come from the KafkaStream
+        reporters.add(new JmxReporter("kafka.admin"));
+        final Metrics metrics = new Metrics(metricConfig, reporters, time);
+
+        final ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(streamsConfig.values());
+
+        final Selector selector = new Selector(streamsConfig.getLong(StreamsConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG), metrics, time, "kafka-client", channelBuilder);
+
+        kafkaClient = new NetworkClient(
+                selector,
+                metadata,
+                streamsConfig.getString(StreamsConfig.CLIENT_ID_CONFIG),
+                MAX_INFLIGHT_REQUESTS, // a fixed large enough value will suffice
+                streamsConfig.getLong(StreamsConfig.RECONNECT_BACKOFF_MS_CONFIG),
+                streamsConfig.getInt(StreamsConfig.SEND_BUFFER_CONFIG),
+                streamsConfig.getInt(StreamsConfig.RECEIVE_BUFFER_CONFIG),
+                streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG), time);
+    }
+
+    public void close() throws IOException {
+        for (MetricsReporter metricsReporter: this.reporters) {
+            metricsReporter.close();
+        }
+    }
+
+    /**
+     * Creates a set of new topics using batch request.
+     *
+     * @param topicsMap
+     * @param replicationFactor
+     * @param windowChangeLogAdditionalRetention
+     */
+    public void createTopics(final Map<InternalTopicConfig, Integer> topicsMap, final int replicationFactor, final long windowChangeLogAdditionalRetention) {
+
+        final Map<String, CreateTopicsRequest.TopicDetails> topicRequestDetails = new HashMap<>();
+        for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
+            final Properties topicProperties = internalTopicConfig.toProperties(windowChangeLogAdditionalRetention);
+            final Map<String, String> topicConfig = new HashMap<>();
+            for (String key : topicProperties.stringPropertyNames()) {
+                topicConfig.put(key, topicProperties.getProperty(key));
+            }
+            final CreateTopicsRequest.TopicDetails topicDetails = new CreateTopicsRequest.TopicDetails(topicsMap.get(internalTopicConfig), (short) replicationFactor, topicConfig);
+
+            topicRequestDetails.put(internalTopicConfig.name(), topicDetails);
+        }
+        final CreateTopicsRequest createTopicsRequest = new CreateTopicsRequest(topicRequestDetails, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        final ClientResponse clientResponse = sendRequest(createTopicsRequest, ApiKeys.CREATE_TOPICS);
+        if (!(clientResponse.responseBody() instanceof CreateTopicsResponse)) {
+            throw new StreamsException("Inconsistent response type for internal topic creation request. Expected CreateTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
+        }
+        final CreateTopicsResponse createTopicsResponse =  (CreateTopicsResponse) clientResponse.responseBody();
+
+        for (InternalTopicConfig internalTopicConfig:topicsMap.keySet()) {
+            short errorCode = createTopicsResponse.errors().get(internalTopicConfig.name()).code();
+            if (errorCode > 0) {
+                if (errorCode == Errors.TOPIC_ALREADY_EXISTS.code()) {
+                    continue;
+                } else {
+                    throw new StreamsException("Could not create topic: " + internalTopicConfig.name() + ". " + createTopicsResponse.errors().get(internalTopicConfig.name()).name());
+                }
+            }
+        }
+    }
+
+    /**
+     * Delets a set of topics.
+     *
+     * @param topics
+     */
+    public void deleteTopics(final Map<InternalTopicConfig, Integer> topics) {
+
+        final Set<String> topicNames = new HashSet<>();
+        for (InternalTopicConfig internalTopicConfig: topics.keySet()) {
+            topicNames.add(internalTopicConfig.name());
+        }
+        deleteTopics(topicNames);
+    }
+
+    /**
+     * Delete a set of topics in one request.
+     *
+     * @param topics
+     */
+    private void deleteTopics(final Set<String> topics) {
+
+        final DeleteTopicsRequest deleteTopicsRequest = new DeleteTopicsRequest(topics, streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG));
+        final ClientResponse clientResponse = sendRequest(deleteTopicsRequest, ApiKeys.DELETE_TOPICS);
+        if (!(clientResponse.responseBody() instanceof DeleteTopicsResponse)) {
+            throw new StreamsException("Inconsistent response type for internal topic deletion request. Expected DeleteTopicsResponse but received " + clientResponse.responseBody().getClass().getName());
+        }
+        final DeleteTopicsResponse deleteTopicsResponse = (DeleteTopicsResponse) clientResponse.responseBody();
+        for (String topicName: deleteTopicsResponse.errors().keySet()) {
+            if (deleteTopicsResponse.errors().get(topicName).code() > 0) {
+                throw new StreamsException("Could not delete topic: " + topicName);
+            }
+        }
+
+    }
+
+    /**
+     * Send a request to kafka broker of this client. Keep polling until the corresponding response is received.
+     *
+     * @param request
+     * @param apiKeys
+     */
+    private ClientResponse sendRequest(final AbstractRequest request, final ApiKeys apiKeys) {
+
+        String brokerId = null;
+
+        final Metadata metadata = new Metadata(streamsConfig.getLong(StreamsConfig.RETRY_BACKOFF_MS_CONFIG), streamsConfig.getLong(StreamsConfig.METADATA_MAX_AGE_CONFIG));
+        final List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(streamsConfig.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
+        metadata.update(Cluster.bootstrap(addresses), Time.SYSTEM.milliseconds());
+
+        final List<Node> nodes = metadata.fetch().nodes();
+        final long readyTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        boolean foundNode = false;
+        while (!foundNode && (Time.SYSTEM.milliseconds() < readyTimeout)) {
+            for (Node node: nodes) {
+                if (kafkaClient.ready(node, Time.SYSTEM.milliseconds())) {
+                    brokerId = Integer.toString(node.id());
+                    foundNode = true;
+                    break;
+                }
+            }
+            kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+        }
+        if (brokerId == null) {
+            throw new StreamsException("Could not find any available broker.");
+        }
+
+        final RequestHeader requestHeader = kafkaClient.nextRequestHeader(apiKeys);
+
+        final ClientRequest clientRequest = new ClientRequest(brokerId, Time.SYSTEM.milliseconds(), true, requestHeader, request, null);
+
+        kafkaClient.send(clientRequest, Time.SYSTEM.milliseconds());
+
+        final long responseTimeout = Time.SYSTEM.milliseconds() + streamsConfig.getInt(StreamsConfig.REQUEST_TIMEOUT_MS_CONFIG);
+        // Poll for the response.
+        while (Time.SYSTEM.milliseconds() < responseTimeout) {
+            List<ClientResponse> responseList = kafkaClient.poll(streamsConfig.getLong(StreamsConfig.POLL_MS_CONFIG), Time.SYSTEM.milliseconds());
+            if (!responseList.isEmpty()) {
+                if (responseList.size() > 1) {
+                    throw new StreamsException("Sent one request but received multiple or no responses.");
+                }
+                if (responseList.get(0).requestHeader().equals(requestHeader)) {
+                    return responseList.get(0);
+                } else {
+                    throw new StreamsException("Inconsistent response received.");
+                }
+            }
+        }
+        throw new StreamsException("Failed to get response from broker within timeout");
+    }
+
+
+    /**
+     * Get the metadata for a topic.
+     * @param topic
+     * @return
+     */
+    public MetadataResponse.TopicMetadata getTopicMetadata(final String topic) {
+
+        final ClientResponse clientResponse = sendRequest(MetadataRequest.allTopics(), ApiKeys.METADATA);
+        if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
+            throw new StreamsException("Inconsistent response type for internal topic metadata request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
+        }
+        final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
+        for (MetadataResponse.TopicMetadata topicMetadata: metadataResponse.topicMetadata()) {
+            if (topicMetadata.topic().equalsIgnoreCase(topic)) {
+                return topicMetadata;
+            }
+        }
+        return null;
+    }
+
+
+    public Collection<MetadataResponse.TopicMetadata> fetchTopicMetadata() {
+        final ClientResponse clientResponse = sendRequest(MetadataRequest.allTopics(), ApiKeys.METADATA);
+        if (!(clientResponse.responseBody() instanceof MetadataResponse)) {
+            throw new StreamsException("Inconsistent response type for internal topic metadata request. Expected MetadataResponse but received " + clientResponse.responseBody().getClass().getName());
+        }
+        final MetadataResponse metadataResponse = (MetadataResponse) clientResponse.responseBody();
+        return metadataResponse.topicMetadata();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 03df4cc..1d2a3e2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -112,7 +112,6 @@ public class FanoutIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, cacheSizeBytes);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index ac83f1c..52bd6e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -48,10 +48,10 @@ import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.Map;
 
+import java.util.Properties;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Locale;
-import java.util.Properties;
+import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import static org.junit.Assert.assertEquals;
@@ -83,24 +83,22 @@ public class InternalTopicIntegrationTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
     }
 
-
     private Properties getTopicConfigProperties(final String changelog) {
         // Note: You must initialize the ZkClient with ZKStringSerializer.  If you don't, then
-        // createTopic() will only seem to work (it will return without error).  The topic will exist in
+        // createTopics() will only seem to work (it will return without error).  The topic will exist in
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
         // topic.
         final ZkClient zkClient = new ZkClient(
-            CLUSTER.zKConnectString(),
-            DEFAULT_ZK_SESSION_TIMEOUT_MS,
-            DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
-            ZKStringSerializer$.MODULE$);
+                CLUSTER.zKConnectString(),
+                DEFAULT_ZK_SESSION_TIMEOUT_MS,
+                DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
+                ZKStringSerializer$.MODULE$);
         try {
             final boolean isSecure = false;
             final ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
@@ -132,7 +130,6 @@ public class InternalTopicIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
@@ -142,13 +139,13 @@ public class InternalTopicIntegrationTest {
         final KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
 
         final KStream<String, Long> wordCounts = textLines
-            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
-                @Override
-                public Iterable<String> apply(final String value) {
-                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
-                }
-            }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
-            .count("Counts").toStream();
+                .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                    @Override
+                    public Iterable<String> apply(final String value) {
+                        return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                    }
+                }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper())
+                .count("Counts").toStream();
 
         wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
 
@@ -222,4 +219,4 @@ public class InternalTopicIntegrationTest {
         final long retention = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS) + durationMs;
         assertEquals(retention, Long.parseLong(properties.getProperty(LogConfig.RetentionMsProp())));
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
index d672fd0..9397e03 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java
@@ -82,7 +82,6 @@ public class KStreamAggregationDedupIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 2000);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index cac8384..5cc2a59 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -98,11 +98,11 @@ public class KStreamAggregationIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0);
 
+
         final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper();
         stream = builder.stream(Serdes.Integer(), Serdes.String(), streamOneInput);
         groupedStream = stream

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index a939ef2..3618f15 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -86,7 +86,6 @@ public class KStreamKTableJoinIntegrationTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test-" + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index 33354ac..e8a042a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -101,7 +101,6 @@ public class KStreamRepartitionJoinTest {
         streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 3);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 6dc1782..b8f91fa 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -137,7 +137,6 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         streamsConfiguration
             .put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory("qs-test").getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index efb5c81..f1a8f68 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -275,7 +275,6 @@ public class ResetIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + testNo);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Long().getClass());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
index 9bef23e..519b1f5 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -60,7 +60,6 @@ public class EmbeddedKafkaCluster extends ExternalResource {
 
         brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
         brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
-
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
         putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);

http://git-wip-us.apache.org/repos/asf/kafka/blob/4b71c0bd/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
index 630167a..264be3b 100644
--- a/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
+++ b/streams/src/test/java/org/apache/kafka/streams/perf/SimpleBenchmark.java
@@ -142,7 +142,6 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
         props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.Integer().getClass());
@@ -425,7 +424,6 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
@@ -468,7 +466,6 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-sink");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
@@ -560,7 +557,6 @@ public class SimpleBenchmark {
         props.put(StreamsConfig.APPLICATION_ID_CONFIG, "simple-benchmark-streams-with-store");
         props.put(StreamsConfig.STATE_DIR_CONFIG, stateDir.toString());
         props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeeper);
         props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 


Mime
View raw message