kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: KAFKA-4105: Queryable state tests
Date Mon, 05 Sep 2016 04:49:52 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk af9fc503d -> a960faf5f


KAFKA-4105: Queryable state tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy, Guozhang Wang

Closes #1806 from enothereska/queryable-state-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a960faf5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a960faf5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a960faf5

Branch: refs/heads/trunk
Commit: a960faf5f48025548de71a84b16e0ccf2a1837ca
Parents: af9fc50
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Sun Sep 4 21:49:48 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Sun Sep 4 21:49:48 2016 -0700

----------------------------------------------------------------------
 .../apache/kafka/streams/KafkaStreamsTest.java  |   5 +-
 .../integration/FanoutIntegrationTest.java      |   5 +-
 .../InternalTopicIntegrationTest.java           |   5 +-
 .../KStreamAggregationIntegrationTest.java      |   8 +-
 .../KStreamKTableJoinIntegrationTest.java       |   6 +-
 .../integration/KStreamRepartitionJoinTest.java |   8 +-
 .../QueryableStateIntegrationTest.java          | 410 ++++++++++++++++++-
 .../integration/RegexSourceIntegrationTest.java |   5 +-
 .../integration/ResetIntegrationTest.java       |   5 +-
 .../integration/utils/EmbeddedKafkaCluster.java | 144 +++++++
 .../utils/EmbeddedSingleNodeKafkaCluster.java   | 135 ------
 11 files changed, 566 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index b15c2ee..7330810 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -18,7 +18,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.test.MockMetricsReporter;
@@ -35,10 +35,11 @@ import static org.junit.Assert.assertTrue;
 
 public class KafkaStreamsTest {
 
+    private static final int NUM_BROKERS = 1;
     // We need this to avoid the KafkaConsumer hanging on poll (this may occur if the test doesn't complete
     // quick enough)
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     @Test
     public void testStartAndClose() throws Exception {

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
index 6494533..56cba58 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -38,7 +38,7 @@ import java.util.List;
 import java.util.Locale;
 import java.util.Properties;
 
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -64,8 +64,9 @@ import static org.junit.Assert.assertThat;
  * </pre>
  */
 public class FanoutIntegrationTest {
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String INPUT_TOPIC_A = "A";
     private static final String OUTPUT_TOPIC_B = "B";
     private static final String OUTPUT_TOPIC_C = "C";

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 968e060..f88c1b2 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsConfig;
@@ -55,8 +55,9 @@ import scala.collection.Map;
  * Tests related to internal topics in streams
  */
 public class InternalTopicIntegrationTest {
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
index 71aebad..17e197c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
@@ -51,10 +51,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class KStreamAggregationIntegrationTest {
-
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
-        new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
     private static volatile int testNo = 0;
     private KStreamBuilder builder;
     private Properties streamsConfiguration;

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
index e290eb0..536ad24 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.java
@@ -41,7 +41,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Properties;
 
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -52,8 +52,10 @@ import static org.junit.Assert.assertThat;
  * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
  */
 public class KStreamKTableJoinIntegrationTest {
+
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String USER_CLICKS_TOPIC = "user-clicks";
     private static final String USER_REGIONS_TOPIC = "user-regions";
     private static final String USER_REGIONS_STORE_NAME = "user-regions-store-name";

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
index ba6956d..de9c2c9 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java
@@ -21,7 +21,7 @@ import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.kstream.KStream;
@@ -47,10 +47,10 @@ import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.Is.is;
 
 public class KStreamRepartitionJoinTest {
-
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
-        new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
index 974f109..02e8eb7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java
@@ -15,24 +15,35 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
 import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.ValueMapper;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.apache.kafka.streams.state.ReadOnlyWindowStore;
+import org.apache.kafka.streams.state.StreamsMetadata;
 import org.apache.kafka.streams.state.WindowStoreIterator;
+import org.apache.kafka.test.MockKeyValueMapper;
 import org.apache.kafka.test.TestUtils;
+import org.apache.kafka.test.TestCondition;
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertTrue;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -43,24 +54,35 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Locale;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Map;
+import java.util.HashMap;
 import java.util.TreeSet;
 
+
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.core.IsEqual.equalTo;
-
 public class QueryableStateIntegrationTest {
-
+    private static final int NUM_BROKERS = 2;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER =
-        new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER =
+        new EmbeddedKafkaCluster(NUM_BROKERS);
     private static final String STREAM_ONE = "stream-one";
-    private static final String STREAM_TWO = "stream-two";
+    private static final String STREAM_CONCURRENT = "stream-concurrent";
     private static final String OUTPUT_TOPIC = "output";
-
+    private static final String OUTPUT_TOPIC_CONCURRENT = "output-concurrent";
+    private static final String STREAM_THREE = "stream-three";
+    private static final int NUM_PARTITIONS = NUM_BROKERS;
+    private static final int NUM_REPLICAS = NUM_BROKERS;
+    private static final long WINDOW_SIZE = 60000L;
+    private static final String OUTPUT_TOPIC_THREE = "output-three";
     private Properties streamsConfiguration;
-    private KStreamBuilder builder;
+    private List<String> inputValues;
+    private Set<String> inputValuesKeys;
     private KafkaStreams kafkaStreams;
     private Comparator<KeyValue<String, String>> stringComparator;
     private Comparator<KeyValue<String, Long>> stringLongComparator;
@@ -68,13 +90,15 @@ public class QueryableStateIntegrationTest {
     @BeforeClass
     public static void createTopics() {
         CLUSTER.createTopic(STREAM_ONE);
-        CLUSTER.createTopic(STREAM_TWO);
+        CLUSTER.createTopic(STREAM_CONCURRENT);
+        CLUSTER.createTopic(STREAM_THREE, NUM_PARTITIONS, NUM_REPLICAS);
         CLUSTER.createTopic(OUTPUT_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC_CONCURRENT);
+        CLUSTER.createTopic(OUTPUT_TOPIC_THREE);
     }
 
     @Before
     public void before() throws IOException {
-        builder = new KStreamBuilder();
         streamsConfiguration = new Properties();
         final String applicationId = "queryable-state";
 
@@ -89,6 +113,7 @@ public class QueryableStateIntegrationTest {
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
         streamsConfiguration
             .put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
+
         stringComparator = new Comparator<KeyValue<String, String>>() {
 
             @Override
@@ -105,6 +130,26 @@ public class QueryableStateIntegrationTest {
                 return o1.key.compareTo(o2.key);
             }
         };
+        inputValues = Arrays.asList("hello world",
+                                    "all streams lead to kafka",
+                                    "streams",
+                                    "kafka streams",
+                                    "the cat in the hat",
+                                    "green eggs and ham",
+                                    "that sam i am",
+                                    "up the creek without a paddle",
+                                    "run forest run",
+                                    "a tank full of gas",
+                                    "eat sleep rave repeat",
+                                    "one jolly sailor",
+                                    "king of the world");
+        inputValuesKeys = new HashSet<>();
+        for (String sentence : inputValues) {
+            String[] words = sentence.split("\\W+");
+            for (String word : words) {
+                inputValuesKeys.add(word);
+            }
+        }
     }
 
     @After
@@ -115,8 +160,216 @@ public class QueryableStateIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
     }
 
+
+    /**
+     * Creates a typical word count topology
+     * @param inputTopic
+     * @param outputTopic
+     * @param streamsConfiguration config
+     * @return
+     */
+    private KafkaStreams createCountStream(String inputTopic, String outputTopic, Properties streamsConfiguration) {
+        KStreamBuilder builder = new KStreamBuilder();
+        final Serde<String> stringSerde = Serdes.String();
+        final KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, inputTopic);
+
+        final KGroupedStream<String, String> groupedByWord = textLines
+            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                }
+            })
+            .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper());
+
+        // Create a State Store for the all time word count
+        groupedByWord.count("word-count-store-" + inputTopic).to(Serdes.String(), Serdes.Long(), outputTopic);
+
+        // Create a Windowed State Store that contains the word count for every 1 minute
+        groupedByWord.count(TimeWindows.of(WINDOW_SIZE), "windowed-word-count-store-" + inputTopic);
+
+        return new KafkaStreams(builder, streamsConfiguration);
+    }
+
+    private class StreamRunnable implements Runnable {
+        private final KafkaStreams myStream;
+        private boolean closed = false;
+        StreamRunnable(String inputTopic, String outputTopic, int queryPort) {
+            Properties props = (Properties) streamsConfiguration.clone();
+            props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, "localhost:" + queryPort);
+            this.myStream = createCountStream(inputTopic, outputTopic, props);
+        }
+
+        @Override
+        public void run() {
+            this.myStream.start();
+
+        }
+
+        public void close() {
+            if (!closed) {
+                this.myStream.close();
+                closed = true;
+            }
+        }
+
+        public boolean isClosed() {
+            return closed;
+        }
+
+        public final KafkaStreams getStream() {
+            return myStream;
+        }
+    }
+
+    private void verifyAllKVKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+                                 final Set<String> keys, final String storeName) throws Exception {
+        for (final String key : keys) {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+                    if (metadata == null) {
+                        return false;
+                    }
+                    final int index = metadata.hostInfo().port();
+                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                    final ReadOnlyKeyValueStore<String, Long> store;
+                    try {
+                        store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>keyValueStore());
+                    } catch (IllegalStateException e) {
+                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        return false;
+                    }
+                    return store != null && store.get(key) != null;
+                }
+            }, 30000, "waiting for metadata, store and value to be non null");
+        }
+    }
+
+
+    private void verifyAllWindowedKeys(final StreamRunnable[] streamRunnables, final KafkaStreams streams,
+                                       final Set<String> keys, final String storeName ,
+                                       final Long from, final Long to) throws Exception {
+        for (final String key : keys) {
+            TestUtils.waitForCondition(new TestCondition() {
+                @Override
+                public boolean conditionMet() {
+                    final StreamsMetadata metadata = streams.metadataForKey(storeName, key, new StringSerializer());
+                    if (metadata == null) {
+                        return false;
+                    }
+                    final int index = metadata.hostInfo().port();
+                    final KafkaStreams streamsWithKey = streamRunnables[index].getStream();
+                    final ReadOnlyWindowStore<String, Long> store;
+                    try {
+                        store = streamsWithKey.store(storeName, QueryableStoreTypes.<String, Long>windowStore());
+                    } catch (IllegalStateException e) {
+                        // Kafka Streams instance may have closed but rebalance hasn't happened
+                        return false;
+                    }
+                    return store != null && store.fetch(key, from, to) != null;
+                }
+            }, 30000, "waiting for metadata, store and value to be non null");
+        }
+    }
+
+
+    @Test
+    public void queryOnRebalance() throws Exception {
+        int numThreads = NUM_PARTITIONS;
+        StreamRunnable[] streamRunnables = new StreamRunnable[numThreads];
+        Thread[] streamThreads = new Thread[numThreads];
+        final int numIterations = 500000;
+
+        // create concurrent producer
+        ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_THREE, inputValues, numIterations);
+        Thread producerThread = new Thread(producerRunnable);
+
+        // create three stream threads
+        for (int i = 0; i < numThreads; i++) {
+            streamRunnables[i] = new StreamRunnable(STREAM_THREE, OUTPUT_TOPIC_THREE, i);
+            streamThreads[i] = new Thread(streamRunnables[i]);
+            streamThreads[i].start();
+        }
+        producerThread.start();
+
+        try {
+            waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_THREE, 1);
+
+            for (int i = 0; i < numThreads; i++) {
+                verifyAllKVKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
+                    "word-count-store-" + STREAM_THREE);
+                verifyAllWindowedKeys(streamRunnables, streamRunnables[i].getStream(), inputValuesKeys,
+                    "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE);
+            }
+
+            // kill N-1 threads
+            for (int i = 1; i < numThreads; i++) {
+                streamRunnables[i].close();
+                streamThreads[i].interrupt();
+                streamThreads[i].join();
+            }
+
+            // query from the remaining thread
+            verifyAllKVKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
+                "word-count-store-" + STREAM_THREE);
+            verifyAllWindowedKeys(streamRunnables, streamRunnables[0].getStream(), inputValuesKeys,
+                "windowed-word-count-store-" + STREAM_THREE, 0L, WINDOW_SIZE);
+        } finally {
+            for (int i = 0; i < numThreads; i++) {
+                if (!streamRunnables[i].isClosed()) {
+                    streamRunnables[i].close();
+                    streamThreads[i].interrupt();
+                    streamThreads[i].join();
+                }
+            }
+            producerRunnable.shutdown();
+            producerThread.interrupt();
+            producerThread.join();
+        }
+    }
+
+    @Test
+    public void concurrentAccesses() throws Exception {
+
+        final int numIterations = 500000;
+
+        ProducerRunnable producerRunnable = new ProducerRunnable(STREAM_CONCURRENT, inputValues, numIterations);
+        Thread producerThread = new Thread(producerRunnable);
+        kafkaStreams = createCountStream(STREAM_CONCURRENT, OUTPUT_TOPIC_CONCURRENT, streamsConfiguration);
+        kafkaStreams.start();
+        producerThread.start();
+
+        try {
+            waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC_CONCURRENT, 1);
+
+            final ReadOnlyKeyValueStore<String, Long>
+                keyValueStore = kafkaStreams.store("word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>keyValueStore());
+
+            final ReadOnlyWindowStore<String, Long> windowStore =
+                kafkaStreams.store("windowed-word-count-store-" + STREAM_CONCURRENT, QueryableStoreTypes.<String, Long>windowStore());
+
+
+            Map<String, Long> expectedWindowState = new HashMap<>();
+            Map<String, Long> expectedCount = new HashMap<>();
+            while (producerRunnable.getCurrIteration() < numIterations) {
+                verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
+                    expectedCount, windowStore, keyValueStore, false);
+            }
+            // finally check if all keys are there
+            verifyGreaterOrEqual(inputValuesKeys.toArray(new String[inputValuesKeys.size()]), expectedWindowState,
+                expectedCount, windowStore, keyValueStore, true);
+        } finally {
+            producerRunnable.shutdown();
+            producerThread.interrupt();
+            producerThread.join();
+        }
+    }
+
     @Test
     public void shouldBeAbleToQueryState() throws Exception {
+        KStreamBuilder builder = new KStreamBuilder();
         final String[] keys = {"hello", "goodbye", "welcome", "go", "kafka"};
 
         final Set<KeyValue<String, String>> batch1 = new TreeSet<>(stringComparator);
@@ -147,11 +400,11 @@ public class QueryableStateIntegrationTest {
         // Non Windowed
         s1.groupByKey().count("my-count").to(Serdes.String(), Serdes.Long(), OUTPUT_TOPIC);
 
-        s1.groupByKey().count(TimeWindows.of(60000L), "windowed-count");
+        s1.groupByKey().count(TimeWindows.of(WINDOW_SIZE), "windowed-count");
         kafkaStreams = new KafkaStreams(builder, streamsConfiguration);
         kafkaStreams.start();
 
-        waitUntilAtLeastOneRecordProcessed();
+        waitUntilAtLeastNumRecordProcessed(OUTPUT_TOPIC, 1);
 
         final ReadOnlyKeyValueStore<String, Long>
             myCount = kafkaStreams.store("my-count", QueryableStoreTypes.<String, Long>keyValueStore());
@@ -225,7 +478,74 @@ public class QueryableStateIntegrationTest {
         assertThat(countState, equalTo(expectedCount));
     }
 
-    private void waitUntilAtLeastOneRecordProcessed() throws InterruptedException {
+    /**
+     * Verify that the new count is greater than or equal to the previous count.
+     * Note: this method changes the values in expectedWindowState and expectedCount
+     * @param keys All the keys we ever expect to find
+     * @param expectedWindowedCount Expected windowed count
+     * @param expectedCount Expected count
+     * @param windowStore Window Store
+     * @param keyValueStore Key-value store
+     * @param failIfKeyNotFound if true, tests fails if an expected key is not found in store. If false,
+     *                          the method merely inserts the new found key into the list of
+     *                          expected keys.
+     * @throws InterruptedException
+     */
+    private void verifyGreaterOrEqual(final String[] keys,
+                                      Map<String, Long> expectedWindowedCount,
+                                      Map<String, Long> expectedCount,
+                                      final ReadOnlyWindowStore<String, Long> windowStore,
+                                      final ReadOnlyKeyValueStore<String, Long> keyValueStore,
+                                      boolean failIfKeyNotFound)
+        throws InterruptedException {
+        final Map<String, Long> windowState = new HashMap<>();
+        final Map<String, Long> countState = new HashMap<>();
+
+        for (String key : keys) {
+            Map<String, Long> map = fetchMap(windowStore, key);
+            if (map.equals(Collections.<String, Long>emptyMap()) && failIfKeyNotFound) {
+                fail("Key not found " + key);
+            }
+            windowState.putAll(map);
+            final Long value = keyValueStore.get(key);
+            if (value != null) {
+                countState.put(key, value);
+            } else {
+                if (failIfKeyNotFound) {
+                    fail("Key not found " + key);
+                }
+            }
+        }
+
+        for (Map.Entry<String, Long> actualWindowStateEntry : windowState.entrySet()) {
+            if (expectedWindowedCount.containsKey(actualWindowStateEntry.getKey())) {
+                Long expectedValue = expectedWindowedCount.get(actualWindowStateEntry.getKey());
+                assertTrue(actualWindowStateEntry.getValue() >= expectedValue);
+            } else {
+                if (failIfKeyNotFound) {
+                    fail("Key not found " + actualWindowStateEntry.getKey());
+                }
+            }
+            // return this for next round of comparisons
+            expectedWindowedCount.put(actualWindowStateEntry.getKey(), actualWindowStateEntry.getValue());
+        }
+
+        for (Map.Entry<String, Long> actualCountStateEntry : countState.entrySet()) {
+            if (expectedCount.containsKey(actualCountStateEntry.getKey())) {
+                Long expectedValue = expectedCount.get(actualCountStateEntry.getKey());
+                assertTrue(actualCountStateEntry.getValue() >= expectedValue);
+            } else {
+                if (failIfKeyNotFound) {
+                    fail("Key not found " + actualCountStateEntry.getKey());
+                }
+            }
+            // return this for next round of comparisons
+            expectedCount.put(actualCountStateEntry.getKey(), actualCountStateEntry.getValue());
+        }
+
+    }
+
+    private void waitUntilAtLeastNumRecordProcessed(String topic, int numRecs) throws InterruptedException {
         final Properties config = new Properties();
         config.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         config.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "queryable-state-consumer");
@@ -235,8 +555,8 @@ public class QueryableStateIntegrationTest {
         config.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
                            LongDeserializer.class.getName());
         IntegrationTestUtils.waitUntilMinValuesRecordsReceived(config,
-                                                               OUTPUT_TOPIC,
-                                                               1,
+                                                               topic,
+                                                               numRecs,
                                                                60 *
                                                                1000);
     }
@@ -252,5 +572,65 @@ public class QueryableStateIntegrationTest {
         return Collections.emptySet();
     }
 
+    private Map<String, Long> fetchMap(final ReadOnlyWindowStore<String, Long> store,
+                                       final String key) {
+
+        final WindowStoreIterator<Long> fetch = store.fetch(key, 0, System.currentTimeMillis());
+        if (fetch.hasNext()) {
+            KeyValue<Long, Long> next = fetch.next();
+            return Collections.singletonMap(key, next.value);
+        }
+        return Collections.emptyMap();
+    }
+
+
+    /**
+     * A class that periodically produces records in a separate thread
+     */
+    private class ProducerRunnable implements Runnable {
+        private String topic;
+        private final List<String> inputValues;
+        private final int numIterations;
+        private int currIteration = 0;
+        boolean shutdown = false;
+
+        ProducerRunnable(String topic, List<String> inputValues, int numIterations) {
+            this.topic = topic;
+            this.inputValues = inputValues;
+            this.numIterations = numIterations;
+        }
+
+        private synchronized void incrementInteration() {
+            currIteration++;
+        }
+        public synchronized int getCurrIteration() {
+            return currIteration;
+        }
+        public synchronized void shutdown() {
+            shutdown = true;
+        }
+
+        @Override
+        public void run() {
+            Properties producerConfig = new Properties();
+            producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+            producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+            producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+            producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+            producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+            final KafkaProducer<String, String>
+                producer =
+                new KafkaProducer<>(producerConfig, new StringSerializer(), new StringSerializer());
+
+            while (getCurrIteration() < numIterations && !shutdown) {
+                for (int i = 0; i < inputValues.size(); i++) {
+                    producer.send(new ProducerRecord<String, String>(topic, inputValues.get(i)));
+                }
+                incrementInteration();
+            }
+        }
+    }
+
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index 51fa06a..dd43af6 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -28,7 +28,7 @@ import org.apache.kafka.streams.KafkaClientSupplier;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -67,8 +67,9 @@ import static org.junit.Assert.fail;
  */
 
 public class RegexSourceIntegrationTest {
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static final String TOPIC_1 = "topic-1";
     private static final String TOPIC_2 = "topic-2";

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
index 155ec10..79ec117 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationTest.java
@@ -29,7 +29,7 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KStreamBuilder;
@@ -58,8 +58,9 @@ import static org.hamcrest.MatcherAssert.assertThat;
  * Tests local state store and global application cleanup.
  */
 public class ResetIntegrationTest {
+    private static final int NUM_BROKERS = 1;
     @ClassRule
-    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
 
     private static final String APP_ID = "cleanup-integration-test";
     private static final String INPUT_TOPIC = "inputTopic";

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..8e9101d
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.integration.utils;
+
+import kafka.server.KafkaConfig$;
+import kafka.zk.EmbeddedZookeeper;
+import org.junit.rules.ExternalResource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
+ */
+public class EmbeddedKafkaCluster extends ExternalResource {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+    private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
+    private EmbeddedZookeeper zookeeper = null;
+    private final KafkaEmbedded[] brokers;
+
+    public EmbeddedKafkaCluster(int numBrokers) {
+        this.brokers = new KafkaEmbedded[numBrokers];
+    }
+
+    /**
+     * Creates and starts a Kafka cluster.
+     */
+    public void start() throws IOException, InterruptedException {
+        Properties brokerConfig = new Properties();
+
+        log.debug("Initiating embedded Kafka cluster startup");
+        log.debug("Starting a ZooKeeper instance");
+        zookeeper = new EmbeddedZookeeper();
+        log.debug("ZooKeeper instance is running at {}", zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
+        brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
+        brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
+        brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
+
+        for (int i = 0; i < this.brokers.length; i++) {
+            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
+            brokers[i] = new KafkaEmbedded(brokerConfig);
+
+            log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
+                brokers[i].brokerList(), brokers[i].zookeeperConnect());
+        }
+    }
+
+    /**
+     * Stop the Kafka cluster.
+     */
+    public void stop() {
+        for (int i = 0; i < this.brokers.length; i++) {
+            brokers[i].stop();
+        }
+        zookeeper.shutdown();
+    }
+
+    /**
+     * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
+     * Example: `127.0.0.1:2181`.
+     *
+     * You can use this to e.g. tell Kafka brokers how to connect to this instance.
+     */
+    public String zKConnectString() {
+        return "localhost:" + zookeeper.port();
+    }
+
+    /**
+     * This cluster's `bootstrap.servers` value.  Example: `127.0.0.1:9092`.
+     *
+     * You can use this to tell Kafka producers how to connect to this cluster.
+     */
+    public String bootstrapServers() {
+        return brokers[0].brokerList();
+    }
+
+    protected void before() throws Throwable {
+        start();
+    }
+
+    protected void after() {
+        stop();
+    }
+
+    /**
+     * Create a Kafka topic with 1 partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic) {
+        createTopic(topic, 1, 1, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (the partitions of) this topic.
+     */
+    public void createTopic(String topic, int partitions, int replication) {
+        createTopic(topic, partitions, replication, new Properties());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (partitions of) this topic.
+     * @param topicConfig Additional topic-level configuration settings.
+     */
+    public void createTopic(String topic,
+                            int partitions,
+                            int replication,
+                            Properties topicConfig) {
+        brokers[0].createTopic(topic, partitions, replication, topicConfig);
+    }
+
+    public void deleteTopic(String topic) {
+        brokers[0].deleteTopic(topic);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/a960faf5/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
deleted file mode 100644
index 92290f5..0000000
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedSingleNodeKafkaCluster.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.streams.integration.utils;
-
-import kafka.server.KafkaConfig$;
-import kafka.zk.EmbeddedZookeeper;
-import org.junit.rules.ExternalResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Properties;
-
-/**
- * Runs an in-memory, "embedded" Kafka cluster with 1 ZooKeeper instance and 1 Kafka broker.
- */
-public class EmbeddedSingleNodeKafkaCluster extends ExternalResource {
-
-    private static final Logger log = LoggerFactory.getLogger(EmbeddedSingleNodeKafkaCluster.class);
-    private static final int DEFAULT_BROKER_PORT = 0; // 0 results in a random port being selected
-    private EmbeddedZookeeper zookeeper = null;
-    private KafkaEmbedded broker = null;
-
-    /**
-     * Creates and starts a Kafka cluster.
-     */
-    public void start() throws IOException, InterruptedException {
-        Properties brokerConfig = new Properties();
-
-        log.debug("Initiating embedded Kafka cluster startup");
-        log.debug("Starting a ZooKeeper instance");
-        zookeeper = new EmbeddedZookeeper();
-        log.debug("ZooKeeper instance is running at {}", zKConnectString());
-        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
-        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), DEFAULT_BROKER_PORT);
-        brokerConfig.put(KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
-        brokerConfig.put(KafkaConfig$.MODULE$.LogCleanerDedupeBufferSizeProp(), 2 * 1024 * 1024L);
-        brokerConfig.put(KafkaConfig$.MODULE$.GroupMinSessionTimeoutMsProp(), 0);
-
-        log.debug("Starting a Kafka instance on port {} ...", brokerConfig.getProperty(KafkaConfig$.MODULE$.PortProp()));
-        broker = new KafkaEmbedded(brokerConfig);
-
-        log.debug("Kafka instance is running at {}, connected to ZooKeeper at {}",
-            broker.brokerList(), broker.zookeeperConnect());
-    }
-
-    /**
-     * Stop the Kafka cluster.
-     */
-    public void stop() {
-        broker.stop();
-        zookeeper.shutdown();
-    }
-
-    /**
-     * The ZooKeeper connection string aka `zookeeper.connect` in `hostnameOrIp:port` format.
-     * Example: `127.0.0.1:2181`.
-     *
-     * You can use this to e.g. tell Kafka brokers how to connect to this instance.
-     */
-    public String zKConnectString() {
-        return "localhost:" + zookeeper.port();
-    }
-
-    /**
-     * This cluster's `bootstrap.servers` value.  Example: `127.0.0.1:9092`.
-     *
-     * You can use this to tell Kafka producers how to connect to this cluster.
-     */
-    public String bootstrapServers() {
-        return broker.brokerList();
-    }
-
-    protected void before() throws Throwable {
-        start();
-    }
-
-    protected void after() {
-        stop();
-    }
-
-    /**
-     * Create a Kafka topic with 1 partition and a replication factor of 1.
-     *
-     * @param topic The name of the topic.
-     */
-    public void createTopic(String topic) {
-        createTopic(topic, 1, 1, new Properties());
-    }
-
-    /**
-     * Create a Kafka topic with the given parameters.
-     *
-     * @param topic       The name of the topic.
-     * @param partitions  The number of partitions for this topic.
-     * @param replication The replication factor for (the partitions of) this topic.
-     */
-    public void createTopic(String topic, int partitions, int replication) {
-        createTopic(topic, partitions, replication, new Properties());
-    }
-
-    /**
-     * Create a Kafka topic with the given parameters.
-     *
-     * @param topic       The name of the topic.
-     * @param partitions  The number of partitions for this topic.
-     * @param replication The replication factor for (partitions of) this topic.
-     * @param topicConfig Additional topic-level configuration settings.
-     */
-    public void createTopic(String topic,
-                            int partitions,
-                            int replication,
-                            Properties topicConfig) {
-        broker.createTopic(topic, partitions, replication, topicConfig);
-    }
-
-    public void deleteTopic(String topic) {
-        broker.deleteTopic(topic);
-    }
-}


Mime
View raw message