kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [05/15] kafka-site git commit: MINOR: Improve Streams Dev Guide
Date Wed, 20 Dec 2017 21:23:21 GMT
http://git-wip-us.apache.org/repos/asf/kafka-site/blob/12dbff55/10/streams/developer-guide/dsl-api.html
----------------------------------------------------------------------
diff --git a/10/streams/developer-guide/dsl-api.html b/10/streams/developer-guide/dsl-api.html
new file mode 100644
index 0000000..9cfb4bc
--- /dev/null
+++ b/10/streams/developer-guide/dsl-api.html
@@ -0,0 +1,3208 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<script><!--#include virtual="../../js/templateData.js" --></script>
+
+<script id="content-template" type="text/x-handlebars-template">
+  <!-- h1>Developer Guide for Kafka Streams</h1 -->
+  <!-- div class="sub-nav-sticky">
+    <div class="sticky-top">
+      <div style="height:35px">
+        <a href="/{{version}}/documentation/streams/">Introduction</a>
+        <a class="active-menu-item" href="/{{version}}/documentation/streams/developer-guide">Developer Guide</a>
+        <a href="/{{version}}/documentation/streams/core-concepts">Concepts</a>
+        <a href="/{{version}}/documentation/streams/quickstart">Run Demo App</a>
+        <a href="/{{version}}/documentation/streams/tutorial">Tutorial: Write App</a>
+      </div>
+    </div -->
+  </div>
+
+
+    <div class="section" id="streams-dsl">
+        <span id="streams-developer-guide-dsl"></span><h1>Streams DSL<a class="headerlink" href="#streams-dsl" title="Permalink to this headline"></a></h1>
+        <p>The Kafka Streams DSL (Domain Specific Language) is built on top of the Streams Processor API. It is the recommended for
+            most users, especially beginners. Most data processing operations can be expressed in just a few lines of DSL code.</p>
+        <div class="contents local topic" id="table-of-contents">
+            <p class="topic-title first">Table of Contents</p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#overview" id="id7">Overview</a></li>
+                <li><a class="reference internal" href="#creating-source-streams-from-kafka" id="id8">Creating source streams from Kafka</a></li>
+                <li><a class="reference internal" href="#transform-a-stream" id="id9">Transform a stream</a><ul>
+                    <li><a class="reference internal" href="#stateless-transformations" id="id10">Stateless transformations</a></li>
+                    <li><a class="reference internal" href="#stateful-transformations" id="id11">Stateful transformations</a><ul>
+                        <li><a class="reference internal" href="#aggregating" id="id12">Aggregating</a></li>
+                        <li><a class="reference internal" href="#joining" id="id13">Joining</a><ul>
+                            <li><a class="reference internal" href="#join-co-partitioning-requirements" id="id14">Join co-partitioning requirements</a></li>
+                            <li><a class="reference internal" href="#kstream-kstream-join" id="id15">KStream-KStream Join</a></li>
+                            <li><a class="reference internal" href="#ktable-ktable-join" id="id16">KTable-KTable Join</a></li>
+                            <li><a class="reference internal" href="#kstream-ktable-join" id="id17">KStream-KTable Join</a></li>
+                            <li><a class="reference internal" href="#kstream-globalktable-join" id="id18">KStream-GlobalKTable Join</a></li>
+                        </ul>
+                        </li>
+                        <li><a class="reference internal" href="#windowing" id="id19">Windowing</a><ul>
+                            <li><a class="reference internal" href="#tumbling-time-windows" id="id20">Tumbling time windows</a></li>
+                            <li><a class="reference internal" href="#hopping-time-windows" id="id21">Hopping time windows</a></li>
+                            <li><a class="reference internal" href="#sliding-time-windows" id="id22">Sliding time windows</a></li>
+                            <li><a class="reference internal" href="#session-windows" id="id23">Session Windows</a></li>
+                        </ul>
+                        </li>
+                    </ul>
+                    </li>
+                    <li><a class="reference internal" href="#applying-processors-and-transformers-processor-api-integration" id="id24">Applying processors and transformers (Processor API integration)</a></li>
+                </ul>
+                </li>
+                <li><a class="reference internal" href="#writing-streams-back-to-kafka" id="id25">Writing streams back to Kafka</a></li>
+            </ul>
+        </div>
+        <div class="section" id="overview">
+            <h2><a class="toc-backref" href="#id7">Overview</a><a class="headerlink" href="#overview" title="Permalink to this headline"></a></h2>
+            <p>In comparison to the <a class="reference internal" href="processor-api.html#streams-developer-guide-processor-api"><span class="std std-ref">Processor API</span></a>, only the DSL supports:</p>
+            <ul class="simple">
+                <li>Built-in abstractions for <a class="reference internal" href="../concepts.html#streams-concepts-duality"><span class="std std-ref">streams and tables</span></a> in the form of
+                    <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">KStream</span></a>, <a class="reference internal" href="../concepts.html#streams-concepts-ktable"><span class="std std-ref">KTable</span></a>, and
+                    <a class="reference internal" href="../concepts.html#streams-concepts-globalktable"><span class="std std-ref">GlobalKTable</span></a>.  Having first-class support for streams and tables is crucial
+                    because, in practice, most use cases require not just either streams or databases/tables, but a combination of both.
+                    For example, if your use case is to create a customer 360-degree view that is updated in real-time, what your
+                    application will be doing is transforming many input <em>streams</em> of customer-related events into an output <em>table</em>
+                    that contains a continuously updated 360-degree view of your customers.</li>
+                <li>Declarative, functional programming style with
+                    <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">stateless transformations</span></a> (e.g. <code class="docutils literal"><span class="pre">map</span></code> and <code class="docutils literal"><span class="pre">filter</span></code>)
+                    as well as <a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateful"><span class="std std-ref">stateful transformations</span></a>
+                    such as <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> (e.g. <code class="docutils literal"><span class="pre">count</span></code> and <code class="docutils literal"><span class="pre">reduce</span></code>),
+                    <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a> (e.g. <code class="docutils literal"><span class="pre">leftJoin</span></code>), and
+                    <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a> (e.g. <a class="reference internal" href="#windowing-session"><span class="std std-ref">session windows</span></a>).</li>
+            </ul>
+            <p>With the DSL, you can define <a class="reference internal" href="../concepts.html#streams-concepts-processor-topology"><span class="std std-ref">processor topologies</span></a> (i.e., the logical
+                processing plan) in your application. The steps to accomplish this are:</p>
+            <ol class="arabic simple">
+                <li>Specify <a class="reference internal" href="#streams-developer-guide-dsl-sources"><span class="std std-ref">one or more input streams that are read from Kafka topics</span></a>.</li>
+                <li>Compose <a class="reference internal" href="#streams-developer-guide-dsl-transformations"><span class="std std-ref">transformations</span></a> on these streams.</li>
+                <li>Write the <a class="reference internal" href="#streams-developer-guide-dsl-destinations"><span class="std std-ref">resulting output streams back to Kafka topics</span></a>, or expose the processing results of your application directly to other applications through <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> (e.g., via a REST API).</li>
+            </ol>
+            <p>After the application is run, the defined processor topologies are continuously executed (i.e., the processing plan is put into
+                action). A step-by-step guide for writing a stream processing application using the DSL is provided below.</p>
+            <p>For a complete list of available API functionality, see also the <a class="reference internal" href="../javadocs.html#streams-javadocs"><span class="std std-ref">Kafka Streams Javadocs</span></a>.</p>
+        </div>
+        <div class="section" id="creating-source-streams-from-kafka">
+            <span id="streams-developer-guide-dsl-sources"></span><h2><a class="toc-backref" href="#id8">Creating source streams from Kafka</a><a class="headerlink" href="#creating-source-streams-from-kafka" title="Permalink to this headline"></a></h2>
+            <p>You can easily read data from Kafka topics into your application.  The following operations are supported.</p>
+            <table border="1" class="non-scrolling-table width-100-percent docutils">
+                <colgroup>
+                    <col width="22%" />
+                    <col width="78%" />
+                </colgroup>
+                <thead valign="bottom">
+                <tr class="row-odd"><th class="head">Reading from Kafka</th>
+                    <th class="head">Description</th>
+                </tr>
+                </thead>
+                <tbody valign="top">
+                <tr class="row-even"><td><p class="first"><strong>Stream</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topics</em> &rarr; KStream</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Creates a <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">KStream</span></a> from the specified Kafka input topics and interprets the data
+                        as a <a class="reference internal" href="../concepts.html#streams-concepts-kstream"><span class="std std-ref">record stream</span></a>.
+                        A <code class="docutils literal"><span class="pre">KStream</span></code> represents a <em>partitioned</em> record stream.
+                        <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#stream(java.lang.String)">(details)</a></p>
+                        <p>In the case of a KStream, the local KStream instance of every application instance will
+                            be populated with data from only <strong>a subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic partitions are read and processed.</p>
+                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.KStream</span><span class="o">;</span>
+
+<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsBuilder</span><span class="o">();</span>
+
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">stream</span><span class="o">(</span>
+    <span class="s">&quot;word-counts-input-topic&quot;</span><span class="o">,</span> <span class="cm">/* input topic */</span>
+    <span class="n">Consumed</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key serde */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">()</span>   <span class="cm">/* value serde */</span>
+    <span class="o">);</span>
+</pre></div>
+                        </div>
+                        <p>If you do not specify SerDes explicitly, the default SerDes from the
+                            <a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</p>
+                        <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code class="docutils literal"><span class="pre">stream</span></code> exist, for example to specify a regex pattern for input topics to read from).</p>
+                    </td>
+                </tr>
+                <tr class="row-odd"><td><p class="first"><strong>Table</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topic</em> &rarr; KTable</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Reads the specified Kafka input topic into a <a class="reference internal" href="../concepts.html#streams-concepts-ktable"><span class="std std-ref">KTable</span></a>.  The topic is
+                        interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
+                        (when the record value is not <code class="docutils literal"><span class="pre">null</span></code>) or as DELETE (when the value is <code class="docutils literal"><span class="pre">null</span></code>) for that key.
+                        <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#table-java.lang.String(java.lang.String)">(details)</a></p>
+                        <p>In the case of a KStream, the local KStream instance of every application instance will
+                            be populated with data from only <strong>a subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic partitions are read and processed.</p>
+                        <p>You must provide a name for the table (more precisely, for the internal
+                            <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> that backs the table).  This is required for
+                            supporting <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> against the table. When a
+                            name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
+                        <p>If you do not specify SerDes explicitly, the default SerDes from the
+                            <a class="reference internal" href="config-streams.html#streams-developer-guide-configuration"><span class="std std-ref">configuration</span></a> are used.</p>
+                        <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code class="docutils literal"><span class="pre">table</span></code> exist, for example to specify the <code class="docutils literal"><span class="pre">auto.offset.reset</span></code> policy to be used when
+                            reading from the input topic.</p>
+                    </td>
+                </tr>
+                <tr class="row-even"><td><p class="first"><strong>Global Table</strong></p>
+                    <ul class="last simple">
+                        <li><em>input topic</em> &rarr; GlobalKTable</li>
+                    </ul>
+                </td>
+                    <td><p class="first">Reads the specified Kafka input topic into a <a class="reference internal" href="../concepts.html#streams-concepts-globalktable"><span class="std std-ref">GlobalKTable</span></a>.  The topic is
+                        interpreted as a changelog stream, where records with the same key are interpreted as UPSERT aka INSERT/UPDATE
+                        (when the record value is not <code class="docutils literal"><span class="pre">null</span></code>) or as DELETE (when the value is <code class="docutils literal"><span class="pre">null</span></code>) for that key.
+                        <a class="reference external" href="../javadocs/org/apache/kafka/streams/StreamsBuilder.html#globalTable-java.lang.String(java.lang.String)">(details)</a></p>
+                        <p>In the case of a GlobalKTable, the local GlobalKTable instance of every application instance will
+                            be populated with data from only <strong>a subset</strong> of the partitions of the input topic.  Collectively, across
+                            all application instances, all input topic partitions are read and processed.</p>
+                        <p>You must provide a name for the table (more precisely, for the internal
+                            <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> that backs the table).  This is required for
+                            supporting <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a> against the table. When a
+                            name is not provided the table will not queryable and an internal name will be provided for the state store.</p>
+                        <div class="highlight-java"><div class="highlight"><pre><span></span><span class="kn">import</span> <span class="nn">org.apache.kafka.common.serialization.Serdes</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.StreamsBuilder</span><span class="o">;</span>
+<span class="kn">import</span> <span class="nn">org.apache.kafka.streams.kstream.GlobalKTable</span><span class="o">;</span>
+
+<span class="n">StreamsBuilder</span> <span class="n">builder</span> <span class="o">=</span> <span class="k">new</span> <span class="n">StreamsBuilder</span><span class="o">();</span>
+
+<span class="n">GlobalKTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">builder</span><span class="o">.</span><span class="na">globalTable</span><span class="o">(</span>
+    <span class="s">&quot;word-counts-input-topic&quot;</span><span class="o">,</span>
+    <span class="n">Materialized</span><span class="o">.&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">,</span> <span class="n">KeyValueStore</span><span class="o">&lt;</span><span class="n">Bytes</span><span class="o">,</span> <span class="kt">byte</span><span class="o">[]&gt;&gt;</span><span class="n">as</span><span class="o">(</span>
+      <span class="s">&quot;word-counts-global-store&quot;</span> <span class="cm">/* table/store name */</span><span class="o">)</span>
+      <span class="o">.</span><span class="na">withKeySerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span> <span class="cm">/* key serde */</span>
+      <span class="o">.</span><span class="na">withValueSerde</span><span class="o">(</span><span class="n">Serdes</span><span class="o">.</span><span class="na">Long</span><span class="o">())</span> <span class="cm">/* value serde */</span>
+    <span class="o">);</span>
+</pre></div>
+                        </div>
+                        <p>You <strong>must specify SerDes explicitly</strong> if the key or value types of the records in the Kafka input
+                            topics do not match the configured default SerDes. For information about configuring default SerDes, available
+                            SerDes, and implementing your own custom SerDes see <a class="reference internal" href="datatypes.html#streams-developer-guide-serdes"><span class="std std-ref">Data Types and Serialization</span></a>.</p>
+                        <p class="last">Several variants of <code class="docutils literal"><span class="pre">globalTable</span></code> exist to e.g. specify explicit SerDes.</p>
+                    </td>
+                </tr>
+                </tbody>
+            </table>
+        </div>
+        <div class="section" id="transform-a-stream">
+            <span id="streams-developer-guide-dsl-transformations"></span><h2><a class="toc-backref" href="#id9">Transform a stream</a><a class="headerlink" href="#transform-a-stream" title="Permalink to this headline"></a></h2>
+            <p>The KStream and KTable interfaces support a variety of transformation operations.
+                Each of these operations can be translated into one or more connected processors into the underlying processor topology.
+                Since KStream and KTable are strongly typed, all of these transformation operations are defined as
+                generic functions where users could specify the input and output data types.</p>
+            <p>Some KStream transformations may generate one or more KStream objects, for example:
+                - <code class="docutils literal"><span class="pre">filter</span></code> and <code class="docutils literal"><span class="pre">map</span></code> on a KStream will generate another KStream
+                - <code class="docutils literal"><span class="pre">branch</span></code> on KStream can generate multiple KStreams</p>
+            <p>Some others may generate a KTable object, for example an aggregation of a KStream also yields a KTable. This allows Kafka Streams to continuously update the computed value upon arrivals of <a class="reference internal" href="../concepts.html#streams-concepts-aggregations"><span class="std std-ref">late records</span></a> after it
+                has already been produced to the downstream transformation operators.</p>
+            <p>All KTable transformation operations can only generate another KTable. However, the Kafka Streams DSL does provide a special function
+                that converts a KTable representation into a KStream. All of these transformation methods can be chained together to compose
+                a complex processor topology.</p>
+            <p>These transformation operations are described in the following subsections:</p>
+            <ul class="simple">
+                <li><a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateless"><span class="std std-ref">Stateless transformations</span></a></li>
+                <li><a class="reference internal" href="#streams-developer-guide-dsl-transformations-stateful"><span class="std std-ref">Stateful transformations</span></a></li>
+            </ul>
+            <div class="section" id="stateless-transformations">
+                <span id="streams-developer-guide-dsl-transformations-stateless"></span><h3><a class="toc-backref" href="#id10">Stateless transformations</a><a class="headerlink" href="#stateless-transformations" title="Permalink to this headline"></a></h3>
+                <p>Stateless transformations do not require state for processing and they do not require a state store associated with
+                    the stream processor. Kafka 0.11.0 and later allows you to materialize the result from a stateless <code class="docutils literal"><span class="pre">KTable</span></code> transformation. This allows the result to be queried through <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries"><span class="std std-ref">interactive queries</span></a>. To materialize a <code class="docutils literal"><span class="pre">KTable</span></code>, each of the below stateless operations <a class="reference internal" href="interactive-queries.html#streams-developer-guide-interactive-queries-local-key-value-stores"><span class="std std-ref">can be augmented</span></a> with an optional <code class="docutils literal"><span class="pre">queryableStoreName</span></code> argument.</p>
+                <table border="1" class="non-scrolling-table width-100-percent docutils">
+                    <colgroup>
+                        <col width="22%" />
+                        <col width="78%" />
+                    </colgroup>
+                    <thead valign="bottom">
+                    <tr class="row-odd"><th class="head">Transformation</th>
+                        <th class="head">Description</th>
+                    </tr>
+                    </thead>
+                    <tbody valign="top">
+                    <tr class="row-even"><td><p class="first"><strong>Branch</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream[]</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Branch (or split) a <code class="docutils literal"><span class="pre">KStream</span></code> based on the supplied predicates into one or more <code class="docutils literal"><span class="pre">KStream</span></code> instances.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#branch-org.apache.kafka.streams.kstream.Predicate...-">details</a>)</p>
+                            <p>Predicates are evaluated in order.  A record is placed to one and only one output stream on the first match:
+                                if the n-th predicate evaluates to true, the record is placed to n-th stream.  If no predicate matches, the
+                                the record is dropped.</p>
+                            <p>Branching is useful, for example, to route records to different downstream topics.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;[]</span> <span class="n">branches</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">branch</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">&quot;A&quot;</span><span class="o">),</span> <span class="cm">/* first predicate  */</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">key</span><span class="o">.</span><span class="na">startsWith</span><span class="o">(</span><span class="s">&quot;B&quot;</span><span class="o">),</span> <span class="cm">/* second predicate */</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="kc">true</span>                 <span class="cm">/* third predicate  */</span>
+  <span class="o">);</span>
+
+<span class="c1">// KStream branches[0] contains all records whose keys start with &quot;A&quot;</span>
+<span class="c1">// KStream branches[1] contains all records whose keys start with &quot;B&quot;</span>
+<span class="c1">// KStream branches[2] contains all other records</span>
+
+<span class="c1">// Java 7 example: cf. `filter` for how to create `Predicate` instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Filter</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Evaluates a boolean function for each element and retains those for which the function returns true.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filter-org.apache.kafka.streams.kstream.Predicate-">KStream details</a>,
+                            <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filter-org.apache.kafka.streams.kstream.Predicate-">KTable details</a>)</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// A filter that selects (keeps) only positive numbers</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filter</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">);</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filter</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Predicate</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span> <span class="o">&gt;</span> <span class="mi">0</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>Inverse Filter</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Evaluates a boolean function for each element and drops those for which the function returns true.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KStream details</a>,
+                            <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#filterNot-org.apache.kafka.streams.kstream.Predicate-">KTable details</a>)</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// An inverse filter that discards any negative numbers or zero</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filterNot</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="o">);</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">onlyPositives</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">filterNot</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">Predicate</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">boolean</span> <span class="nf">test</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span> <span class="o">&lt;=</span> <span class="mi">0</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>FlatMap</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces zero, one, or more records.  You can modify the record keys and values, including
+                            their types.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMap-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p><strong>Marks the stream for data re-partitioning:</strong>
+                                Applying a grouping or a join after <code class="docutils literal"><span class="pre">flatMap</span></code> will result in re-partitioning of the records.
+                                If possible use <code class="docutils literal"><span class="pre">flatMapValues</span></code> instead, which will not cause data re-partitioning.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">Long</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">flatMap</span><span class="o">(</span>
+     <span class="c1">// Here, we generate two output records for each input record.</span>
+     <span class="c1">// We also change the key and value types.</span>
+     <span class="c1">// Example: (345L, &quot;Hello&quot;) -&gt; (&quot;HELLO&quot;, 1000), (&quot;hello&quot;, 9000)</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="o">{</span>
+      <span class="n">List</span><span class="o">&lt;</span><span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;</span> <span class="n">result</span> <span class="o">=</span> <span class="k">new</span> <span class="n">LinkedList</span><span class="o">&lt;&gt;();</span>
+      <span class="n">result</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">(),</span> <span class="mi">1000</span><span class="o">));</span>
+      <span class="n">result</span><span class="o">.</span><span class="na">add</span><span class="o">(</span><span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="mi">9000</span><span class="o">));</span>
+      <span class="k">return</span> <span class="n">result</span><span class="o">;</span>
+    <span class="o">}</span>
+  <span class="o">);</span>
+
+<span class="c1">// Java 7 example: cf. `map` for how to create `KeyValueMapper` instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>FlatMap (values only)</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces zero, one, or more records, while retaining the key of the original record.
+                            You can modify the record values and the value type.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#flatMapValues-org.apache.kafka.streams.kstream.ValueMapper-">details</a>)</p>
+                            <p><code class="docutils literal"><span class="pre">flatMapValues</span></code> is preferable to <code class="docutils literal"><span class="pre">flatMap</span></code> because it will not cause data re-partitioning.  However, you
+                                cannot modify the key or key type like <code class="docutils literal"><span class="pre">flatMap</span></code> does.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Split a sentence into words.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">sentences</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">words</span> <span class="o">=</span> <span class="n">sentences</span><span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\s+&quot;</span><span class="o">)));</span>
+
+<span class="c1">// Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Foreach</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; void</li>
+                            <li>KStream &rarr; void</li>
+                            <li>KTable &rarr; void</li>
+                        </ul>
+                    </td>
+                        <td><p class="first"><strong>Terminal operation.</strong>  Performs a stateless action on each record.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#foreach-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p>
+                            <p>You would use <code class="docutils literal"><span class="pre">foreach</span></code> to cause <em>side effects</em> based on the input data (similar to <code class="docutils literal"><span class="pre">peek</span></code>) and then <em>stop</em>
+                                <em>further processing</em> of the input data (unlike <code class="docutils literal"><span class="pre">peek</span></code>, which is not a terminal operation).</p>
+                            <p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
+                                trackable by Kafka, which means they will typically not benefit from  Kafka&#8217;s processing guarantees.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Print the contents of the KStream to the local console.</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">stream</span><span class="o">.</span><span class="na">foreach</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">key</span> <span class="o">+</span> <span class="s">&quot; =&gt; &quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">stream</span><span class="o">.</span><span class="na">foreach</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ForeachAction</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">key</span><span class="o">,</span> <span class="n">Long</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="n">key</span> <span class="o">+</span> <span class="s">&quot; =&gt; &quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>GroupByKey</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KGroupedStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Groups the records by the existing key.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupByKey--">details</a>)</p>
+                            <p>Grouping is a prerequisite for <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregating a stream or a table</span></a>
+                                and ensures that data is properly partitioned (&#8220;keyed&#8221;) for subsequent operations.</p>
+                            <p><strong>When to set explicit SerDes:</strong>
+                                Variants of <code class="docutils literal"><span class="pre">groupByKey</span></code> exist to override the configured default SerDes of your application, which <strong>you</strong>
+                                <strong>must do</strong> if the key and/or value types of the resulting <code class="docutils literal"><span class="pre">KGroupedStream</span></code> do not match the configured default
+                                SerDes.</p>
+                            <div class="admonition note">
+                                <p class="first admonition-title">Note</p>
+                                <p class="last"><strong>Grouping vs. Windowing:</strong>
+                                    A related operation is <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a>, which lets you control how to
+                                    &#8220;sub-group&#8221; the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
+                                    windowed <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> or
+                                    windowed <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a>.</p>
+                            </div>
+                            <p><strong>Causes data re-partitioning if and only if the stream was marked for re-partitioning.</strong>
+                                <code class="docutils literal"><span class="pre">groupByKey</span></code> is preferable to <code class="docutils literal"><span class="pre">groupBy</span></code> because it re-partitions data only if the stream was already marked
+                                for re-partitioning. However, <code class="docutils literal"><span class="pre">groupByKey</span></code> does not allow you to modify the key or key type like <code class="docutils literal"><span class="pre">groupBy</span></code>
+                                does.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Group by the existing key, using the application&#39;s configured</span>
+<span class="c1">// default serdes for keys and values.</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">();</span>
+
+<span class="c1">// When the key and/or value types do not match the configured</span>
+<span class="c1">// default serdes, we must explicitly specify serdes.</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupByKey</span><span class="o">(</span>
+    <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">ByteArray</span><span class="o">(),</span> <span class="cm">/* key */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>     <span class="cm">/* value */</span>
+  <span class="o">);</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>GroupBy</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KGroupedStream</li>
+                            <li>KTable &rarr; KGroupedTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Groups the records by a <em>new</em> key, which may be of a different key type.
+                            When grouping a table, you may also specify a new value and value type.
+                            <code class="docutils literal"><span class="pre">groupBy</span></code> is a shorthand for <code class="docutils literal"><span class="pre">selectKey(...).groupByKey()</span></code>.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KStream details</a>,
+                            <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#groupBy-org.apache.kafka.streams.kstream.KeyValueMapper-">KTable details</a>)</p>
+                            <p>Grouping is a prerequisite for <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregating a stream or a table</span></a>
+                                and ensures that data is properly partitioned (&#8220;keyed&#8221;) for subsequent operations.</p>
+                            <p><strong>When to set explicit SerDes:</strong>
+                                Variants of <code class="docutils literal"><span class="pre">groupBy</span></code> exist to override the configured default SerDes of your application, which <strong>you must</strong>
+                                <strong>do</strong> if the key and/or value types of the resulting <code class="docutils literal"><span class="pre">KGroupedStream</span></code> or <code class="docutils literal"><span class="pre">KGroupedTable</span></code> do not match the
+                                configured default SerDes.</p>
+                            <div class="admonition note">
+                                <p class="first admonition-title">Note</p>
+                                <p class="last"><strong>Grouping vs. Windowing:</strong>
+                                    A related operation is <a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">windowing</span></a>, which lets you control how to
+                                    &#8220;sub-group&#8221; the grouped records <em>of the same key</em> into so-called <em>windows</em> for stateful operations such as
+                                    windowed <a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">aggregations</span></a> or
+                                    windowed <a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">joins</span></a>.</p>
+                            </div>
+                            <p><strong>Always causes data re-partitioning:</strong>  <code class="docutils literal"><span class="pre">groupBy</span></code> always causes data re-partitioning.
+                                If possible use <code class="docutils literal"><span class="pre">groupByKey</span></code> instead, which will re-partition data only if required.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ examples, using lambda expressions</span>
+
+<span class="c1">// Group the stream by a new key and key type</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span><span class="o">,</span>
+    <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>  <span class="cm">/* value */</span>
+  <span class="o">);</span>
+
+<span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
+<span class="n">KGroupedTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()),</span>
+    <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
+  <span class="o">);</span>
+
+
+<span class="c1">// Java 7 examples</span>
+
+<span class="c1">// Group the stream by a new key and key type</span>
+<span class="n">KGroupedStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">groupedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span class="o">;</span>
+      <span class="o">}</span>
+    <span class="o">},</span>
+    <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">())</span>  <span class="cm">/* value */</span>
+  <span class="o">);</span>
+
+<span class="c1">// Group the table by a new key and key type, and also modify the value and value type.</span>
+<span class="n">KGroupedTable</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">groupedTable</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">groupBy</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">,</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">},</span>
+    <span class="n">Serialized</span><span class="o">.</span><span class="na">with</span><span class="o">(</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">String</span><span class="o">(),</span> <span class="cm">/* key (note: type was modified) */</span>
+      <span class="n">Serdes</span><span class="o">.</span><span class="na">Integer</span><span class="o">())</span> <span class="cm">/* value (note: type was modified) */</span>
+  <span class="o">);</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>Map</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces one record.  You can modify the record key and value, including their types.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#map-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p><strong>Marks the stream for data re-partitioning:</strong>
+                                Applying a grouping or a join after <code class="docutils literal"><span class="pre">map</span></code> will result in re-partitioning of the records.
+                                If possible use <code class="docutils literal"><span class="pre">mapValues</span></code> instead, which will not cause data re-partitioning.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="c1">// Note how we change the key and the key type (similar to `selectKey`)</span>
+<span class="c1">// as well as the value and the value type.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">KeyValue</span><span class="o">.</span><span class="na">pair</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">()));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="n">transformed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">map</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">KeyValue</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Integer</span><span class="o">&gt;</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="k">new</span> <span class="n">KeyValue</span><span class="o">&lt;&gt;(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">(),</span> <span class="n">value</span><span class="o">.</span><span class="na">length</span><span class="o">());</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Map (values only)</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                            <li>KTable &rarr; KTable</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Takes one record and produces one record, while retaining the key of the original record.
+                            You can modify the record value and the value type.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KStream details</a>,
+                            <a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#mapValues-org.apache.kafka.streams.kstream.ValueMapper-">KTable details</a>)</p>
+                            <p><code class="docutils literal"><span class="pre">mapValues</span></code> is preferable to <code class="docutils literal"><span class="pre">map</span></code> because it will not cause data re-partitioning.  However, it does not
+                                allow you to modify the key or key type like <code class="docutils literal"><span class="pre">map</span></code> does.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">uppercased</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">mapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">value</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">());</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">uppercased</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">mapValues</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ValueMapper</span><span class="o">&lt;</span><span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="n">String</span> <span class="n">s</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">s</span><span class="o">.</span><span class="na">toUpperCase</span><span class="o">();</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>Peek</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Performs a stateless action on each record, and returns an unchanged stream.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#peek-org.apache.kafka.streams.kstream.ForeachAction-">details</a>)</p>
+                            <p>You would use <code class="docutils literal"><span class="pre">peek</span></code> to cause <em>side effects</em> based on the input data (similar to <code class="docutils literal"><span class="pre">foreach</span></code>) and <em>continue</em>
+                                <em>processing</em> the input data (unlike <code class="docutils literal"><span class="pre">foreach</span></code>, which is a terminal operation).  <code class="docutils literal"><span class="pre">peek</span></code> returns the input
+                                stream as-is;  if you need to modify the input stream, use <code class="docutils literal"><span class="pre">map</span></code> or <code class="docutils literal"><span class="pre">mapValues</span></code> instead.</p>
+                            <p><code class="docutils literal"><span class="pre">peek</span></code> is helpful for use cases such as logging or tracking metrics or for debugging and troubleshooting.</p>
+                            <p><strong>Note on processing guarantees:</strong> Any side effects of an action (such as writing to external systems) are not
+                                trackable by Kafka, which means they will typically not benefit from Kafka&#8217;s processing guarantees.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">unmodifiedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">peek</span><span class="o">(</span>
+    <span class="o">(</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;key=&quot;</span> <span class="o">+</span> <span class="n">key</span> <span class="o">+</span> <span class="s">&quot;, value=&quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">));</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">unmodifiedStream</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">peek</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">ForeachAction</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="kt">void</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="n">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">&quot;key=&quot;</span> <span class="o">+</span> <span class="n">key</span> <span class="o">+</span> <span class="s">&quot;, value=&quot;</span> <span class="o">+</span> <span class="n">value</span><span class="o">);</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Print</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; void</li>
+                        </ul>
+                    </td>
+                        <td><p class="first"><strong>Terminal operation.</strong>  Prints the records to <code class="docutils literal"><span class="pre">System.out</span></code>.  See Javadocs for serde and <code class="docutils literal"><span class="pre">toString()</span></code>
+                            caveats.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#print--">details</a>)</p>
+                            <p>Calling <code class="docutils literal"><span class="pre">print()</span></code> is the same as calling <code class="docutils literal"><span class="pre">foreach((key,</span> <span class="pre">value)</span> <span class="pre">-&gt;</span> <span class="pre">System.out.println(key</span> <span class="pre">+</span> <span class="pre">&quot;,</span> <span class="pre">&quot;</span> <span class="pre">+</span> <span class="pre">value))</span></code></p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+<span class="c1">// print to sysout</span>
+<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">();</span>
+
+<span class="c1">// print to file with a custom label</span>
+<span class="n">stream</span><span class="o">.</span><span class="na">print</span><span class="o">(</span><span class="n">Printed</span><span class="o">.</span><span class="na">toFile</span><span class="o">(</span><span class="s">&quot;streams.out&quot;</span><span class="o">).</span><span class="na">withLabel</span><span class="o">(</span><span class="s">&quot;streams&quot;</span><span class="o">));</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-even"><td><p class="first"><strong>SelectKey</strong></p>
+                        <ul class="last simple">
+                            <li>KStream &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Assigns a new key &#8211; possibly of a new key type &#8211; to each record.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KStream.html#selectKey-org.apache.kafka.streams.kstream.KeyValueMapper-">details</a>)</p>
+                            <p>Calling <code class="docutils literal"><span class="pre">selectKey(mapper)</span></code> is the same as calling <code class="docutils literal"><span class="pre">map((key,</span> <span class="pre">value)</span> <span class="pre">-&gt;</span> <span class="pre">mapper(key,</span> <span class="pre">value),</span> <span class="pre">value)</span></code>.</p>
+                            <p><strong>Marks the stream for data re-partitioning:</strong>
+                                Applying a grouping or a join after <code class="docutils literal"><span class="pre">selectKey</span></code> will result in re-partitioning of the records.</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Derive a new record key from the record&#39;s value.  Note how the key type changes, too.</span>
+<span class="c1">// Java 8+ example, using lambda expressions</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">rekeyed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">selectKey</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">value</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)[</span><span class="mi">0</span><span class="o">])</span>
+
+<span class="c1">// Java 7 example</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">rekeyed</span> <span class="o">=</span> <span class="n">stream</span><span class="o">.</span><span class="na">selectKey</span><span class="o">(</span>
+    <span class="k">new</span> <span class="n">KeyValueMapper</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;()</span> <span class="o">{</span>
+      <span class="nd">@Override</span>
+      <span class="kd">public</span> <span class="n">String</span> <span class="nf">apply</span><span class="o">(</span><span class="kt">byte</span><span class="o">[]</span> <span class="n">key</span><span class="o">,</span> <span class="n">String</span> <span class="n">value</span><span class="o">)</span> <span class="o">{</span>
+        <span class="k">return</span> <span class="n">value</span><span class="o">.</span><span class="na">split</span><span class="o">(</span><span class="s">&quot; &quot;</span><span class="o">)[</span><span class="mi">0</span><span class="o">];</span>
+      <span class="o">}</span>
+    <span class="o">});</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    <tr class="row-odd"><td><p class="first"><strong>Table to Stream</strong></p>
+                        <ul class="last simple">
+                            <li>KTable &rarr; KStream</li>
+                        </ul>
+                    </td>
+                        <td><p class="first">Get the changelog stream of this table.
+                            (<a class="reference external" href="../javadocs/org/apache/kafka/streams/kstream/KTable.html#toStream--">details</a>)</p>
+                            <div class="last highlight-java"><div class="highlight"><pre><span></span><span class="n">KTable</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">table</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="c1">// Also, a variant of `toStream` exists that allows you</span>
+<span class="c1">// to select a new key for the resulting stream.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="kt">byte</span><span class="o">[],</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">stream</span> <span class="o">=</span> <span class="n">table</span><span class="o">.</span><span class="na">toStream</span><span class="o">();</span>
+</pre></div>
+                            </div>
+                        </td>
+                    </tr>
+                    </tbody>
+                </table>
+            </div>
+            <div class="section" id="stateful-transformations">
+                <span id="streams-developer-guide-dsl-transformations-stateful"></span><h3><a class="toc-backref" href="#id11">Stateful transformations</a><a class="headerlink" href="#stateful-transformations" title="Permalink to this headline"></a></h3>
+                <p id="streams-developer-guide-dsl-transformations-stateful-overview">Stateful transformations depend on state for processing inputs and producing outputs and require a <a class="reference internal" href="../architecture.html#streams-architecture-state"><span class="std std-ref">state store</span></a> associated with the stream processor. For example, in aggregating operations, a windowing state store is used to collect the latest aggregation results per
+                    window. In join operations, a windowing state store is used to collect all of the records received so far within the
+                    defined window boundary.</p>
+                <p>Note, that state stores are fault-tolerant.
+                    In case of failure, Kafka Streams guarantees to fully restore all state stores prior to resuming the processing.
+                    See <a class="reference internal" href="../architecture.html#streams-architecture-fault-tolerance"><span class="std std-ref">Fault Tolerance</span></a> for further information.</p>
+                <p>Available stateful transformations in the DSL include:</p>
+                <ul class="simple">
+                    <li><a class="reference internal" href="#streams-developer-guide-dsl-aggregating"><span class="std std-ref">Aggregating</span></a></li>
+                    <li><a class="reference internal" href="#streams-developer-guide-dsl-joins"><span class="std std-ref">Joining</span></a></li>
+                    <li><a class="reference internal" href="#streams-developer-guide-dsl-windowing"><span class="std std-ref">Windowing</span></a> (as part of aggregations and joins)</li>
+                    <li><a class="reference internal" href="#streams-developer-guide-dsl-process"><span class="std std-ref">Applying custom processors and transformers</span></a>, which may be stateful, for
+                        Processor API integration</li>
+                </ul>
+                <p>The following diagram shows their relationships:</p>
+                <div class="figure align-center" id="id2">
+                    <a class="reference internal image-reference" href="../../../images/streams-stateful_operations.png"><img alt="../../../images/streams-stateful_operations.png" src="../../../images/streams-stateful_operations.png" style="width: 400pt;" /></a>
+                    <p class="caption"><span class="caption-text">Stateful transformations in the DSL.</span></p>
+                </div>
+                <p>Here is an example of a stateful application: the WordCount algorithm.</p>
+                <p>WordCount example in Java 8+, using lambda expressions:</p>
+                <div class="highlight-java"><div class="highlight"><pre><span></span><span class="c1">// Assume the record values represent lines of text.  For the sake of this example, you can ignore</span>
+<span class="c1">// whatever may be stored in the record keys.</span>
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">String</span><span class="o">&gt;</span> <span class="n">textLines</span> <span class="o">=</span> <span class="o">...;</span>
+
+<span class="n">KStream</span><span class="o">&lt;</span><span class="n">String</span><span class="o">,</span> <span class="n">Long</span><span class="o">&gt;</span> <span class="n">wordCounts</span> <span class="o">=</span> <span class="n">textLines</span>
+    <span class="c1">// Split each text line, by whitespace, into words.  The text lines are the record</span>
+    <span class="c1">// values, i.e. you can ignore whatever data is in the record keys and thus invoke</span>
+    <span class="c1">// `flatMapValues` instead of the more generic `flatMap`.</span>
+    <span class="o">.</span><span class="na">flatMapValues</span><span class="o">(</span><span class="n">value</span> <span class="o">-&gt;</span> <span class="n">Arrays</span><span class="o">.</span><span class="na">asList</span><span class="o">(</span><span class="n">value</span><span class="o">.</span><span class="na">toLowerCase</span><span class="o">().</span><span class="na">split</span><span class="o">(</span><span class="s">&quot;\\W+&quot;</span><span class="o">)))</span>
+    <span class="c1">// Group the stream by word to ensure the key of the record is the word.</span>
+    <span class="o">.</span><span class="na">groupBy</span><span class="o">((</span><span class="n">key</span><span class="o">,</span> <span class="n">word</span><span class="o">)</span> <span class="o">-&gt;</span> <span class="n">word</span><span class="o">)</span>
+    <span class="c1">// Count the occurrences of each word (record key).</span>
+    <span class="c1">//</span>
+    <span class="c1">// This will change the stream type from `KGroupedStream&lt;String, String&gt;` to</span>
+    <span class="c1">// `KTable&lt;String, Long&gt;` (word -&gt; count).</span>
+    <span class="o">.</span><span class="na">count</span><span class="o">()</span>
+    <span class="c1">// Convert the `KTable&lt;String, Long&gt;` into a `KStream&lt;String, Long&gt;`.</span>
+    <span class="o">.</span><span class="na">toStream</span><span class="o">();</span>
+</pre></div>
+                </div>
+                <p>WordCount example in Ja

<TRUNCATED>

Mime
View raw message