kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject kafka git commit: MINOR: Added more integration tests
Date Wed, 04 May 2016 21:04:32 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.10.0 9b6761ccd -> f4bb4a651


MINOR: Added more integration tests

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Ismael Juma, Michael G. Noll, Guozhang Wang

Closes #1285 from enothereska/more-integration-tests


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/f4bb4a65
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/f4bb4a65
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/f4bb4a65

Branch: refs/heads/0.10.0
Commit: f4bb4a6515dfbd5986ae9b3190bbb58dd8cf4e41
Parents: 9b6761c
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Tue May 3 13:26:22 2016 -0700
Committer: Guozhang Wang <wangguoz@gmail.com>
Committed: Wed May 4 14:04:20 2016 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   1 +
 .../integration/FanoutIntegrationTest.java      | 166 ++++++++++++
 .../InternalTopicIntegrationTest.java           |  16 +-
 .../integration/JoinIntegrationTest.java        | 266 +++++++++++++++++++
 .../integration/MapFunctionIntegrationTest.java | 127 +++++++++
 .../integration/PassThroughIntegrationTest.java | 113 ++++++++
 .../integration/WordCountIntegrationTest.java   | 149 +++++++++++
 7 files changed, 830 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 7a45515..5f52cce 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -148,6 +148,7 @@
       <allow pkg="scala" />
       <allow pkg="scala.collection" />
       <allow pkg="org.I0Itec.zkclient" />
+      <allow pkg="org.hamcrest.CoreMatchers" />
     </subpackage>
 
     <subpackage name="state">

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
new file mode 100644
index 0000000..a7b4785
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/FanoutIntegrationTest.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test that demonstrates "fan-out", using an embedded Kafka cluster.
+ *
+ * This example shows how you can read from one input topic/stream, transform the data (here:
+ * trivially) in two different ways via two intermediate streams, and then write the respective
+ * results to two output topics.
+ *
+ * <pre>
+ * {@code
+ *
+ *                                         +---map()---> stream2 ---to()---> Kafka topic B
+ *                                         |
+ * Kafka topic A ---stream()--> stream1 ---+
+ *                                         |
+ *                                         +---map()---> stream3 ---to()---> Kafka topic C
+ *
+ * }
+ * </pre>
+ */
+public class FanoutIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String INPUT_TOPIC_A = "A";
+    private static final String OUTPUT_TOPIC_B = "B";
+    private static final String OUTPUT_TOPIC_C = "C";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(INPUT_TOPIC_A);
+        CLUSTER.createTopic(OUTPUT_TOPIC_B);
+        CLUSTER.createTopic(OUTPUT_TOPIC_C);
+    }
+
+    @Test
+    public void shouldFanoutTheInput() throws Exception {
+        List<String> inputValues = Arrays.asList("Hello", "World");
+        List<String> expectedValuesForB = new ArrayList<>();
+        List<String> expectedValuesForC = new ArrayList<>();
+        for (String input : inputValues) {
+            expectedValuesForB.add(input.toUpperCase(Locale.getDefault()));
+            expectedValuesForC.add(input.toLowerCase(Locale.getDefault()));
+        }
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        KStreamBuilder builder = new KStreamBuilder();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "fanout-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        KStream<byte[], String> stream1 = builder.stream(INPUT_TOPIC_A);
+        KStream<byte[], String> stream2 = stream1.mapValues(
+            new ValueMapper<String, String>() {
+                @Override
+                public String apply(String value) {
+                    return value.toUpperCase(Locale.getDefault());
+                }
+            });
+        KStream<byte[], String> stream3 = stream1.mapValues(
+            new ValueMapper<String, String>() {
+                @Override
+                public String apply(String value) {
+                    return value.toLowerCase(Locale.getDefault());
+                }
+            });
+        stream2.to(OUTPUT_TOPIC_B);
+        stream3.to(OUTPUT_TOPIC_C);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceValuesSynchronously(INPUT_TOPIC_A, inputValues, producerConfig);
+
+        // Give the stream processing application some time to do its work.
+        Thread.sleep(5000);
+        streams.close();
+
+        //
+        // Step 3: Verify the application's output data.
+        //
+
+        // Verify output topic B
+        Properties consumerConfigB = new Properties();
+        consumerConfigB.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfigB.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicB");
+        consumerConfigB.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfigB.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfigB.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        List<String> actualValuesForB = IntegrationTestUtils.readValues(OUTPUT_TOPIC_B, consumerConfigB, inputValues.size());
+        assertThat(actualValuesForB, equalTo(expectedValuesForB));
+
+        // Verify output topic C
+        Properties consumerConfigC = new Properties();
+        consumerConfigC.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfigC.put(ConsumerConfig.GROUP_ID_CONFIG, "fanout-integration-test-standard-consumer-topicC");
+        consumerConfigC.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfigC.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfigC.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        List<String> actualValuesForC = IntegrationTestUtils.readValues(OUTPUT_TOPIC_C, consumerConfigC, inputValues.size());
+        assertThat(actualValuesForC, equalTo(expectedValuesForC));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
index 2a3e767..66111c4 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java
@@ -55,7 +55,7 @@ import scala.collection.Map;
  */
 public class InternalTopicIntegrationTest {
     @ClassRule
-    public static EmbeddedSingleNodeKafkaCluster cluster = new EmbeddedSingleNodeKafkaCluster();
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
     private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
     private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
     private static final int DEFAULT_ZK_SESSION_TIMEOUT_MS = 10 * 1000;
@@ -63,8 +63,8 @@ public class InternalTopicIntegrationTest {
 
     @BeforeClass
     public static void startKafkaCluster() throws Exception {
-        cluster.createTopic(DEFAULT_INPUT_TOPIC);
-        cluster.createTopic(DEFAULT_OUTPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
     }
 
     /**
@@ -79,12 +79,12 @@ public class InternalTopicIntegrationTest {
         // only ZooKeeper and will be returned when listing topics, but Kafka itself does not create the
         // topic.
         ZkClient zkClient = new ZkClient(
-            cluster.zKConnectString(),
+            CLUSTER.zKConnectString(),
             DEFAULT_ZK_SESSION_TIMEOUT_MS,
             DEFAULT_ZK_CONNECTION_TIMEOUT_MS,
             ZKStringSerializer$.MODULE$);
         boolean isSecure = false;
-        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(cluster.zKConnectString()), isSecure);
+        ZkUtils zkUtils = new ZkUtils(zkClient, new ZkConnection(CLUSTER.zKConnectString()), isSecure);
 
         Map<String, Properties> topicConfigs = AdminUtils.fetchAllTopicConfigs(zkUtils);
         Iterator it = topicConfigs.iterator();
@@ -118,8 +118,8 @@ public class InternalTopicIntegrationTest {
 
         Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "compact-topics-integration-test");
-        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, cluster.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
         streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
@@ -153,7 +153,7 @@ public class InternalTopicIntegrationTest {
         // Step 2: Produce some input data to the input topic.
         //
         Properties producerConfig = new Properties();
-        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, cluster.bootstrapServers());
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
         producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
         producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
         producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
new file mode 100644
index 0000000..1fc0ba6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/JoinIntegrationTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.kafka.streams.integration;
+
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Reducer;
+import org.apache.kafka.streams.kstream.ValueJoiner;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test that demonstrates how to perform a join between a KStream and a
+ * KTable (think: KStream.leftJoin(KTable)), i.e. an example of a stateful computation.
+ */
+public class JoinIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String USER_CLICKS_TOPIC = "user-clicks";
+    private static final String USER_REGIONS_TOPIC = "user-regions";
+    private static final String OUTPUT_TOPIC = "output-topic";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(USER_CLICKS_TOPIC);
+        CLUSTER.createTopic(USER_REGIONS_TOPIC);
+        CLUSTER.createTopic(OUTPUT_TOPIC);
+    }
+
+    /**
+     * Tuple for a region and its associated number of clicks.
+     */
+    private static final class RegionWithClicks {
+
+        private final String region;
+        private final long clicks;
+
+        public RegionWithClicks(String region, long clicks) {
+            if (region == null || region.isEmpty()) {
+                throw new IllegalArgumentException("region must be set");
+            }
+            if (clicks < 0) {
+                throw new IllegalArgumentException("clicks must not be negative");
+            }
+            this.region = region;
+            this.clicks = clicks;
+        }
+
+        public String getRegion() {
+            return region;
+        }
+
+        public long getClicks() {
+            return clicks;
+        }
+
+    }
+
+    @Test
+    public void shouldCountClicksPerRegion() throws Exception {
+        // Input 1: Clicks per user (multiple records allowed per user).
+        List<KeyValue<String, Long>> userClicks = Arrays.asList(
+            new KeyValue<>("alice", 13L),
+            new KeyValue<>("bob", 4L),
+            new KeyValue<>("chao", 25L),
+            new KeyValue<>("bob", 19L),
+            new KeyValue<>("dave", 56L),
+            new KeyValue<>("eve", 78L),
+            new KeyValue<>("alice", 40L),
+            new KeyValue<>("fang", 99L)
+        );
+
+        // Input 2: Region per user (multiple records allowed per user).
+        List<KeyValue<String, String>> userRegions = Arrays.asList(
+            new KeyValue<>("alice", "asia"),   /* Alice lived in Asia originally... */
+            new KeyValue<>("bob", "americas"),
+            new KeyValue<>("chao", "asia"),
+            new KeyValue<>("dave", "europe"),
+            new KeyValue<>("alice", "europe"), /* ...but moved to Europe some time later. */
+            new KeyValue<>("eve", "americas"),
+            new KeyValue<>("fang", "asia")
+        );
+
+        List<KeyValue<String, Long>> expectedClicksPerRegion = Arrays.asList(
+            new KeyValue<>("europe", 13L),
+            new KeyValue<>("americas", 4L),
+            new KeyValue<>("asia", 25L),
+            new KeyValue<>("americas", 23L),
+            new KeyValue<>("europe", 69L),
+            new KeyValue<>("americas", 101L),
+            new KeyValue<>("europe", 109L),
+            new KeyValue<>("asia", 124L)
+        );
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        final Serde<String> stringSerde = Serdes.String();
+        final Serde<Long> longSerde = Serdes.Long();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "join-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        // Explicitly place the state directory under /tmp so that we can remove it via
+        // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
+        // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
+        // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
+        // accordingly.
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        // This KStream contains information such as "alice" -> 13L.
+        //
+        // Because this is a KStream ("record stream"), multiple records for the same user will be
+        // considered as separate click-count events, each of which will be added to the total count.
+        KStream<String, Long> userClicksStream = builder.stream(stringSerde, longSerde, USER_CLICKS_TOPIC);
+
+        // This KTable contains information such as "alice" -> "europe".
+        //
+        // Because this is a KTable ("changelog stream"), only the latest value (here: region) for a
+        // record key will be considered at the time when a new user-click record (see above) is
+        // received for the `leftJoin` below.  Any previous region values are being considered out of
+        // date.  This behavior is quite different to the KStream for user clicks above.
+        //
+        // For example, the user "alice" will be considered to live in "europe" (although originally she
+        // lived in "asia") because, at the time her first user-click record is being received and
+        // subsequently processed in the `leftJoin`, the latest region update for "alice" is "europe"
+        // (which overrides her previous region value of "asia").
+        KTable<String, String> userRegionsTable =
+            builder.table(stringSerde, stringSerde, USER_REGIONS_TOPIC);
+
+        // Compute the number of clicks per region, e.g. "europe" -> 13L.
+        //
+        // The resulting KTable is continuously being updated as new data records are arriving in the
+        // input KStream `userClicksStream` and input KTable `userRegionsTable`.
+        KTable<String, Long> clicksPerRegion = userClicksStream
+            // Join the stream against the table.
+            //
+            // Null values possible: In general, null values are possible for region (i.e. the value of
+            // the KTable we are joining against) so we must guard against that (here: by setting the
+            // fallback region "UNKNOWN").  In this specific example this is not really needed because
+            // we know, based on the test setup, that all users have appropriate region entries at the
+            // time we perform the join.
+            //
+            // Also, we need to return a tuple of (region, clicks) for each user.  But because Java does
+            // not support tuples out-of-the-box, we must use a custom class `RegionWithClicks` to
+            // achieve the same effect.
+            .leftJoin(userRegionsTable, new ValueJoiner<Long, String, RegionWithClicks>() {
+                @Override
+                public RegionWithClicks apply(Long clicks, String region) {
+                    RegionWithClicks regionWithClicks = new RegionWithClicks(region == null ? "UNKNOWN" : region, clicks);
+                    return regionWithClicks;
+                }
+            })
+            // Change the stream from <user> -> <region, clicks> to <region> -> <clicks>
+            .map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() {
+                @Override
+                public KeyValue<String, Long> apply(String key, RegionWithClicks value) {
+                    return new KeyValue<>(value.getRegion(), value.getClicks());
+                }
+            })
+            // Compute the total per region by summing the individual click counts per region.
+            .reduceByKey(new Reducer<Long>() {
+                @Override
+                public Long apply(Long value1, Long value2) {
+                    return value1 + value2;
+                }
+            }, stringSerde, longSerde, "ClicksPerRegionUnwindowed");
+
+        // Write the (continuously updating) results to the output topic.
+        clicksPerRegion.to(stringSerde, longSerde, OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
+        //
+        // Step 2: Publish user-region information.
+        //
+        // To keep this code example simple and easier to understand/reason about, we publish all
+        // user-region records before any user-click records (cf. step 3).  In practice though,
+        // data records would typically be arriving concurrently in both input streams/topics.
+        Properties userRegionsProducerConfig = new Properties();
+        userRegionsProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        userRegionsProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        userRegionsProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        userRegionsProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        userRegionsProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_REGIONS_TOPIC, userRegions, userRegionsProducerConfig);
+
+        //
+        // Step 3: Publish some user click events.
+        //
+        Properties userClicksProducerConfig = new Properties();
+        userClicksProducerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        userClicksProducerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        userClicksProducerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        userClicksProducerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        userClicksProducerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class);
+        IntegrationTestUtils.produceKeyValuesSynchronously(USER_CLICKS_TOPIC, userClicks, userClicksProducerConfig);
+
+        // Give the stream processing application some time to do its work.
+        Thread.sleep(5000);
+        streams.close();
+
+        //
+        // Step 4: Verify the application's output data.
+        //
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "join-integration-test-standard-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        List<KeyValue<String, Long>> actualClicksPerRegion = IntegrationTestUtils.readKeyValues(OUTPUT_TOPIC, consumerConfig);
+        assertThat(actualClicksPerRegion, equalTo(expectedClicksPerRegion));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
new file mode 100644
index 0000000..47c00c1
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/MapFunctionIntegrationTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.ValueMapper;
+
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test based on a simple map, using an embedded Kafka cluster.
+ */
+public class MapFunctionIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldUppercaseTheInput() throws Exception {
+        List<String> inputValues = Arrays.asList("hello", "world");
+        List<String> expectedValues = new ArrayList<>();
+        for (String input : inputValues) {
+            expectedValues.add(input.toUpperCase(Locale.getDefault()));
+        }
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        KStreamBuilder builder = new KStreamBuilder();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "map-function-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.ByteArray().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        KStream<byte[], String> input = builder.stream(DEFAULT_INPUT_TOPIC);
+        KStream<byte[], String> uppercased = input.mapValues(new ValueMapper<String, String>() {
+            @Override
+            public String apply(String value) {
+                return value.toUpperCase(Locale.getDefault());
+            }
+        });
+        uppercased.to(DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+        // Give the stream processing application some time to do its work.
+        Thread.sleep(5000);
+        streams.close();
+
+        //
+        // Step 3: Verify the application's output data.
+        //
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "map-function-integration-test-standard-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
+        assertThat(actualValues, equalTo(expectedValues));
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
new file mode 100644
index 0000000..2627a3a
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/PassThroughIntegrationTest.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+/**
+ * End-to-end integration test that reads data from an input topic and writes the same data as-is to
+ * a new output topic, using an embedded Kafka cluster.
+ */
+public class PassThroughIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldWriteTheInputDataAsIsToTheOutputTopic() throws Exception {
+        List<String> inputValues = Arrays.asList(
+            "hello world",
+            "the world is not enough",
+            "the world of the stock market is coming to an end"
+        );
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        KStreamBuilder builder = new KStreamBuilder();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "pass-through-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+
+        // Write the input data as-is to the output topic.
+        builder.stream(DEFAULT_INPUT_TOPIC).to(DEFAULT_OUTPUT_TOPIC);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+        // Give the stream processing application some time to do its work.
+        Thread.sleep(5000);
+        streams.close();
+
+        //
+        // Step 3: Verify the application's output data.
+        //
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "pass-through-integration-test-standard-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        List<String> actualValues = IntegrationTestUtils.readValues(DEFAULT_OUTPUT_TOPIC, consumerConfig, inputValues.size());
+        assertThat(actualValues, equalTo(inputValues));
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/f4bb4a65/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
new file mode 100644
index 0000000..5c32a6c
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/WordCountIntegrationTest.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements.  See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.  You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KStreamBuilder;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.ValueMapper;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.junit.Assert.assertThat;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Locale;
+import java.util.Properties;
+
+import org.apache.kafka.streams.integration.utils.EmbeddedSingleNodeKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+
+
+/**
+ * End-to-end integration test based on a simple word count example, using an embedded Kafka
+ * cluster.
+ */
+public class WordCountIntegrationTest {
+    @ClassRule
+    public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();
+    private static final String DEFAULT_INPUT_TOPIC = "inputTopic";
+    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
+
+    @BeforeClass
+    public static void startKafkaCluster() throws Exception {
+        CLUSTER.createTopic(DEFAULT_INPUT_TOPIC);
+        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
+    }
+
+    @Test
+    public void shouldCountWords() throws Exception {
+        List<String> inputValues = Arrays.asList("hello", "world", "world", "hello world");
+        List<KeyValue<String, Long>> expectedWordCounts = Arrays.asList(
+            new KeyValue<>("hello", 1L),
+            new KeyValue<>("world", 1L),
+            new KeyValue<>("world", 2L),
+            new KeyValue<>("hello", 2L),
+            new KeyValue<>("world", 3L)
+        );
+
+        //
+        // Step 1: Configure and start the processor topology.
+        //
+        final Serde<String> stringSerde = Serdes.String();
+        final Serde<Long> longSerde = Serdes.Long();
+
+        Properties streamsConfiguration = new Properties();
+        streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-integration-test");
+        streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, CLUSTER.zKConnectString());
+        streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
+        // Explicitly place the state directory under /tmp so that we can remove it via
+        // `purgeLocalStreamsState` below.  Once Streams is updated to expose the effective
+        // StreamsConfig configuration (so we can retrieve whatever state directory Streams came up
+        // with automatically) we don't need to set this anymore and can update `purgeLocalStreamsState`
+        // accordingly.
+        streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
+
+        KStreamBuilder builder = new KStreamBuilder();
+
+        KStream<String, String> textLines = builder.stream(DEFAULT_INPUT_TOPIC);
+
+        KStream<String, Long> wordCounts = textLines
+            .flatMapValues(new ValueMapper<String, Iterable<String>>() {
+                @Override
+                public Iterable<String> apply(String value) {
+                    return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
+                }
+            }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
+                @Override
+                public KeyValue<String, String> apply(String key, String value) {
+                    return new KeyValue<String, String>(value, value);
+                }
+            }).countByKey("Counts")
+            .toStream();
+
+        wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC);
+
+        // Remove any state from previous test runs
+        IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
+
+        KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
+        streams.start();
+
+        // Wait briefly for the topology to be fully up and running (otherwise it might miss some or all
+        // of the input data we produce below).
+        Thread.sleep(5000);
+
+        //
+        // Step 2: Produce some input data to the input topic.
+        //
+        Properties producerConfig = new Properties();
+        producerConfig.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        producerConfig.put(ProducerConfig.ACKS_CONFIG, "all");
+        producerConfig.put(ProducerConfig.RETRIES_CONFIG, 0);
+        producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        IntegrationTestUtils.produceValuesSynchronously(DEFAULT_INPUT_TOPIC, inputValues, producerConfig);
+
+        //
+        // Step 3: Verify the application's output data.
+        //
+        Thread.sleep(5000);
+        streams.close();
+        Properties consumerConfig = new Properties();
+        consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
+        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "wordcount-integration-test-standard-consumer");
+        consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+        consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
+        List<KeyValue<String, Long>> actualWordCounts = IntegrationTestUtils.readKeyValues(DEFAULT_OUTPUT_TOPIC, consumerConfig);
+        assertThat(actualWordCounts, equalTo(expectedWordCounts));
+    }
+
+}


Mime
View raw message