kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: MINOR: Stateless transformation documentation
Date Thu, 24 Aug 2017 13:53:43 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 5b99a288c -> 5e65c0da4


MINOR: Stateless transformation documentation

Needs to come after https://github.com/apache/kafka/pull/3701
Originally reviewed as part of #3490.

Author: Eno Thereska <eno.thereska@gmail.com>

Reviewers: Damian Guy <damian.guy@gmail.com>

Closes #3711 from enothereska/minor-docs-stateless


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5e65c0da
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5e65c0da
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5e65c0da

Branch: refs/heads/trunk
Commit: 5e65c0da45f3a86a25087c4d2e2b501e81e0a285
Parents: 5b99a28
Author: Eno Thereska <eno.thereska@gmail.com>
Authored: Thu Aug 24 14:53:39 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Thu Aug 24 14:53:39 2017 +0100

----------------------------------------------------------------------
 docs/streams/developer-guide.html | 504 +++++++++++++++++++++++++++++----
 1 file changed, 454 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/5e65c0da/docs/streams/developer-guide.html
----------------------------------------------------------------------
diff --git a/docs/streams/developer-guide.html b/docs/streams/developer-guide.html
index e3acf53..e26f6da 100644
--- a/docs/streams/developer-guide.html
+++ b/docs/streams/developer-guide.html
@@ -571,7 +571,458 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         </tbody>
     </table>
 
-    <h4><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing
a stream</a></h4>
+    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform
a stream</a></h4>
+    <p>
+    <code>KStream</code> and <code>KTable</code> support a variety
of transformation operations. Each of these operations
+    can be translated into one or more connected processors into the underlying processor
topology. Since <code>KStream</code>
+    and <code>KTable</code> are strongly typed, all these transformation operations
are defined as generics functions where
+    users could specify the input and output data types.
+    </p>
+    <p>
+    Some <code>KStream</code> transformations may generate one or more <code>KStream</code>
objects (e.g., filter and
+    map on <code>KStream</code> generate another <code>KStream</code>,
while branch on <code>KStream</code> can generate
+    multiple <code>KStream</code> instances) while some others may generate a
<code>KTable</code> object (e.g., aggregation) interpreted
+    as the changelog stream to the resulted relation. This allows Kafka Streams to continuously
update the computed value upon arrival
+    of late records after it has already been produced to the downstream transformation operators.
As for <code>KTable</code>,
+    all its transformation operations can only generate another <code>KTable</code>
(though the Kafka Streams DSL does
+    provide a special function to convert a <code>KTable</code> representation
into a <code>KStream</code>, which we will
+    describe later). Nevertheless, all these transformation methods can be chained together
to compose a complex processor topology.
+    </p>
+    <p>
+    We describe these transformation operations in the following subsections, categorizing
them into two categories:
+    stateless and stateful transformations.
+    </p>
+    <h5><a id="streams_dsl_transformations_stateless" href="#streams_dsl_transformations_stateless">Stateless
transformations</a></h5>
+    <p>
+    Stateless transformations, by definition, do not depend on any state for processing,
and hence implementation-wise they do not
+    require a state store associated with the stream processor.
+    </p>
+    <table class="data-table" border="1">
+        <tbody><tr>
+            <th>Transformation</th>
+            <th>Description</th>
+        </tr>
+        <tr>
+            <td><b>Branch</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Branch (or split) a <code>KStream</code> based on the supplied
predicates into one or more <code>KStream</code> instances.
+                </p>
+                <p>
+                Predicates are evaluated in order. A record is placed to one and only one
output stream on the first match:
+                if the n-th predicate evaluates to true, the record is placed to n-th stream.
If no predicate matches,
+                the record is dropped.
+                </p>
+                <p>
+                Branching is useful, for example, to route records to different downstream
topics.
+                </p>
+                <pre class="brush: java;">
+                    KStream&lt;String, Long&gt; stream = ...;
+                    KStream&lt;String, Long&gt;[] branches = stream.branch(
+                            (key, value) -> key.startsWith("A"), /* first predicate  */
+                            (key, value) -> key.startsWith("B"), /* second predicate */
+                            (key, value) -> true                 /* third predicate  */
+                    );
+                    // KStream branches[0] contains all records whose keys start with "A"
+                    // KStream branches[1] contains all records whose keys start with "B"
+                    // KStream branches[2] contains all other records
+                    // Java 7 example: cf. `filter` for how to create `Predicate` instances
+	        </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Filter</b>: <code>KStream &rarr; KStream or
KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and retains those for which
the function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+
+                     // A filter that selects (keeps) only positive numbers
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filter((key,
value) -> value > 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filter(
+                       new Predicate&lt;String, Long&gt;() {
+                         @Override
+                         public boolean test(String key, Long value) {
+                           return value > 0;
+                         }
+                       });
+	            </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Inverse Filter</b>: <code>KStream &rarr; KStream
or KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Evaluates a boolean function for each element and drops those for which the
function returns true.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;String, Long&gt; stream = ...;
+
+                     // An inverse filter that discards any negative numbers or zero
+                     // Java 8+ example, using lambda expressions
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot((key,
value) -> value <= 0);
+
+                     // Java 7 example
+                     KStream&lt;String, Long&gt; onlyPositives = stream.filterNot(
+                      new Predicate&lt;String, Long&gt;() {
+                        @Override
+                        public boolean test(String key, Long value) {
+                            return value <= 0;
+                        }
+                     });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap</b>: <code>KStream &rarr; KStream
</code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records. You can modify
the record keys and values,
+                including their types.
+                </p>
+
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping or a join
after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use <code>flatMapValues</code>
instead, which will not cause data re-partitioning.
+                </p>
+                <pre class="brush: java;">
+                     KStream&lt;Long, String> stream = ...;
+                     KStream&lt;String, Integer&gt; transformed = stream.flatMap(
+                         // Here, we generate two output records for each input record.
+                         // We also change the key and value types.
+                         // Example: (345L, "Hello") -> ("HELLO", 1000), ("hello", 9000)
+                         (key, value) -> {
+                             List&lt;KeyValue&lt;String, Integer&gt;&gt;
result = new LinkedList&lt;&gt;();
+                             result.add(KeyValue.pair(value.toUpperCase(), 1000));
+                             result.add(KeyValue.pair(value.toLowerCase(), 9000));
+                             return result;
+                         }
+                     );
+                     // Java 7 example: cf. `map` for how to create `KeyValueMapper` instances
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>FlatMap (values only)</b>: <code>KStream &rarr;
KStream </code></td>
+            <td>
+                <p>
+                Takes one record and produces zero, one, or more records, while retaining
the key of the original record.
+                You can modify the record values and the value type.
+                </p>
+                <p>
+                <code>flatMapValues</code> is preferable to <code>flatMap</code>
because it will not cause data re-partitioning. However,
+                it does not allow you to modify the key or key type like <code>flatMap</code>
does.
+                </p>
+                <pre class="brush: java;">
+                   // Split a sentence into words.
+                   KStream&lt;byte[], String&gt; sentences = ...;
+                   KStream&lt;byte[], String&gt; words = sentences.flatMapValues(value
-> Arrays.asList(value.split("\\s+")));
+
+                   // Java 7 example: cf. `mapValues` for how to create `ValueMapper` instances
+               </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Foreach</b>: <code>KStream &rarr; void or
KTable &rarr; void </code></td>
+            <td>
+                <p>
+                Terminal operation. Performs a stateless action on each record.
+                </p>
+                <p>
+                Note on processing guarantees: Any side effects of an action (such as writing
to external systems)
+                are not trackable by Kafka, which means they will typically not benefit from
Kafka's processing guarantees.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;String, Long&gt; stream = ...;
+
+                       // Print the contents of the KStream to the local console.
+                       // Java 8+ example, using lambda expressions
+                       stream.foreach((key, value) -> System.out.println(key + " =>
" + value));
+
+                       // Java 7 example
+                       stream.foreach(
+                           new ForeachAction&lt;String, Long&gt;() {
+                               @Override
+                               public void apply(String key, Long value) {
+                                 System.out.println(key + " => " + value);
+                               }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupByKey</b>: <code>KStream &rarr; KGroupedStream
</code></td>
+            <td>
+                <p>
+                Groups the records by the existing key.
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table and ensures
that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of <code>groupByKey</code>
exist to override the configured default serdes of
+                your application, which you must do if the key and/or value types of the
resulting <code>KGroupedStream</code> do
+                not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, which lets you
control how to "sub-group" the
+                grouped records of the same key into so-called windows for stateful operations
such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                Causes data re-partitioning if and only if the stream was marked for re-partitioning.
<code>groupByKey</code> is
+                preferable to <code>groupBy</code> because it re-partitions data
only if the stream was already marked for re-partitioning.
+                However, <code>groupByKey</code> does not allow you to modify
the key or key type like <code>groupBy</code> does.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Group by the existing key, using the application's configured
+                       // default serdes for keys and values.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey();
+
+                       // When the key and/or value types do not match the configured
+                       // default serdes, we must explicitly specify serdes.
+                       KGroupedStream&lt;byte[], String&gt; groupedStream = stream.groupByKey(
+                           Serdes.ByteArray(), /* key */
+                           Serdes.String()     /* value */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>GroupBy</b>: <code>KStream &rarr; KGroupedStream
or KTable &rarr; KGroupedTable</code></td>
+            <td>
+                <p>
+                Groups the records by a new key, which may be of a different key type. When
grouping a table,
+                you may also specify a new value and value type. groupBy is a shorthand for
selectKey(...).groupByKey().
+                </p>
+                <p>
+                Grouping is a prerequisite for aggregating a stream or a table and ensures
that data is properly
+                partitioned ("keyed") for subsequent operations.
+                </p>
+                <p>
+                <b>When to set explicit serdes</b>: Variants of groupBy exist
to override the configured default serdes of your
+                application, which you must do if the key and/or value types of the resulting
KGroupedStream or
+                KGroupedTable do not match the configured default serdes.
+                </p>
+                <p>
+                <b>Note:</b>
+                Grouping vs. Windowing: A related operation is windowing, which lets you
control how to “sub-group” the
+                grouped records of the same key into so-called windows for stateful operations
such as windowed aggregations
+                or windowed joins.
+                </p>
+                <p>
+                <b>Always causes data re-partitioning:</b> groupBy always causes
data re-partitioning. If possible use groupByKey
+                instead, which will re-partition data only if required.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       KTable&lt;byte[], String&gt; table = ...;
+
+                       // Java 8+ examples, using lambda expressions
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
+                           (key, value) -> value,
+                           Serdes.String(), /* key (note: type was modified) */
+                           Serdes.String()  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also modify the
value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
+                           (key, value) -> KeyValue.pair(value, value.length()),
+                           Serdes.String(), /* key (note: type was modified) */
+                           Serdes.Integer() /* value (note: type was modified) */
+                       );
+
+
+                       // Java 7 examples
+
+                       // Group the stream by a new key and key type
+                       KGroupedStream&lt;String, String&gt; groupedStream = stream.groupBy(
+                           new KeyValueMapper&lt;byte[], String, String&gt;&gt;()
{
+                               @Override
+                               public String apply(byte[] key, String value) {
+                                  return value;
+                               }
+                           },
+                           Serdes.String(), /* key (note: type was modified) */
+                           Serdes.String()  /* value */
+                       );
+
+                       // Group the table by a new key and key type, and also modify the
value and value type.
+                       KGroupedTable&lt;String, Integer&gt; groupedTable = table.groupBy(
+                            new KeyValueMapper&lt;byte[], String, KeyValue&lt;String,
Integer&gt;&gt;() {
+                            @Override
+                                public KeyValue&lt;String, Integer&gt; apply(byte[]
key, String value) {
+                                   return KeyValue.pair(value, value.length());
+                                }
+                            },
+                            Serdes.String(), /* key (note: type was modified) */
+                            Serdes.Integer() /* value (note: type was modified) */
+                       );
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record. You can modify the record key and
value, including their types.
+                </p>
+
+                <p>
+                <b>Marks the stream for data re-partitioning:</b> Applying a
grouping or a join after <code>flatMap</code> will result in
+                re-partitioning of the records. If possible use <code>mapValues</code>
instead, which will not cause data re-partitioning.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       // Note how we change the key and the key type (similar to `selectKey`)
+                       // as well as the value and the value type.
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           (key, value) -> KeyValue.pair(value.toLowerCase(), value.length()));
+
+                       // Java 7 example
+                       KStream&lt;String, Integer&gt; transformed = stream.map(
+                           new KeyValueMapper&lt;byte[], String, KeyValue&lt;String,
Integer&gt;&gt;() {
+                           @Override
+                           public KeyValue&lt;String, Integer&gt; apply(byte[] key,
String value) {
+                               return new KeyValue&lt;&gt;(value.toLowerCase(), value.length());
+                           }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Map (values only)</b>: <code>KStream &rarr;
KStream or KTable &rarr; KTable</code></td>
+            <td>
+                <p>
+                Takes one record and produces one record, while retaining the key of the
original record. You can modify
+                the record value and the value type.
+                </p>
+                <p>
+                <code>mapValues</code> is preferable to <code>map</code>
because it will not cause data re-partitioning. However, it does not
+                allow you to modify the key or key type like <code>map</code>
does.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String> stream = ...;
+
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(value
-> value.toUpperCase());
+
+                       // Java 7 example
+                       KStream&lt;byte[], String&gt; uppercased = stream.mapValues(
+                          new ValueMapper&lt;String&gt;() {
+                          @Override
+                          public String apply(String s) {
+                             return s.toUpperCase();
+                          }
+                       });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Print</b>: <code>KStream &rarr; void or KTable
&rarr; void</code></td>
+            <td>
+                <p>
+                Terminal operation. Prints the records to <code>System.out</code>.
See Javadocs for serde and <code>toString()</code> caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.print();
+
+                       // Several variants of `print` exist to e.g. override the
+                       // default serdes for record keys and record values.
+                       stream.print(Serdes.ByteArray(), Serdes.String());
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>SelectKey</b>: <code>KStream &rarr; KStream</code></td>
+            <td>
+                <p>
+                Assigns a new key, possibly of a new key type, to each record.
+                </p>
+                <p>
+                Marks the stream for data re-partitioning: Applying a grouping or a join
after <code>flatMap</code> will result in
+                re-partitioning of the records.
+                </p>
+
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+
+                       // Derive a new record key from the record's value.  Note how the
key type changes, too.
+                       // Java 8+ example, using lambda expressions
+                       KStream&lt;String, String&gt; rekeyed = stream.selectKey((key,
value) -> value.split(" ")[0])
+
+                       // Java 7 example
+                       KStream&lt;String, String&gt; rekeyed = stream.selectKey(
+                           new KeyValueMapper&lt;byte[], String, String&gt;() {
+                           @Override
+                           public String apply(byte[] key, String value) {
+                              return value.split(" ")[0];
+                           }
+                         });
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>Table to Stream</b>: <code>KTable &rarr; KStream</code></td>
+            <td>
+                <p>
+                Converts this table into a stream.
+                </p>
+                <pre class="brush: java;">
+                       KTable&lt;byte[], String> table = ...;
+
+                       // Also, a variant of `toStream` exists that allows you
+                       // to select a new key for the resulting stream.
+                       KStream&lt;byte[], String> stream = table.toStream();
+                </pre>
+            </td>
+        </tr>
+        <tr>
+            <td><b>WriteAsText</b>: <code>KStream &rarr; void
or KTable &rarr; void</code></td>
+            <td>
+                <p>
+                Terminal operation. Write the records to a file. See Javadocs for serde and
<code>toString()</code> caveats.
+                </p>
+                <pre class="brush: java;">
+                       KStream&lt;byte[], String&gt; stream = ...;
+                       stream.writeAsText("/path/to/local/output.txt");
+
+                       // Several variants of `writeAsText` exist to e.g. override the
+                       // default serdes for record keys and record values.
+                       stream.writeAsText("/path/to/local/output.txt", Serdes.ByteArray(),
Serdes.String());
+                </pre>
+            </td>
+        </tr>
+        </tbody>
+    </table>
+
+
+    <h5><a id="streams_dsl_transformations_stateful" href="#streams_dsl_transformations_stateful">Stateful
transformations</a></h5>
+    Stateless transformations, by definition, do not depend on any state for processing,
and hence implementation-wise they do not
+    require a state store associated with the stream processor.Stateful transformations,
by definition, depend on state for processing
+    inputs and producing outputs, and hence implementation-wise they require a state store
associated with the stream processor. For
+    example, in aggregating operations, a windowing state store is used to store the latest
aggregation results per window; in join
+    operations, a windowing state store is used to store all the records received so far
within the defined window boundary.
+
+
+
+    <h6><a id="streams_dsl_windowing" href="#streams_dsl_windowing">Windowing
a stream</a></h6>
     A stream processor may need to divide data records into time buckets, i.e. to <b>window</b>
the stream by time. This is usually needed for join and aggregation operations, etc. Kafka
Streams currently defines the following types of windows:
     <ul>
         <li><b>Hopping time windows</b> are windows based on time intervals.
They model fixed-sized, (possibly) overlapping windows. A hopping window is defined by two
properties: the window's size and its advance interval (aka "hop"). The advance interval specifies
by how much a window moves forward relative to the previous one. For example, you can configure
a hopping window with a size 5 minutes and an advance interval of 1 minute. Since hopping
windows can overlap a data record may belong to more than one such windows.</li>
@@ -597,7 +1048,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         Kafka Streams is able to properly handle late-arriving records.
     </p>
 
-    <h4><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h4>
+    <h6><a id="streams_dsl_joins" href="#streams_dsl_joins">Join multiple streams</a></h6>
     A <b>join</b> operation merges two streams based on the keys of their data
records, and yields a new stream. A join over record streams usually needs to be performed
on a windowing basis because otherwise the number of records that must be maintained for performing
the join may grow indefinitely. In Kafka Streams, you may perform the following join operations:
     <ul>
         <li><b>KStream-to-KStream Joins</b> are always windowed joins,
since otherwise the memory and state required to compute the join would grow infinitely in
size. Here, a newly received record from one of the streams is joined with the other stream's
records within the specified window interval to produce one result for each matching pair
based on user-provided <code>ValueJoiner</code>. A new <code>KStream</code>
instance representing the result stream of the join is returned from this operator.</li>
@@ -615,7 +1066,7 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
     Depending on the operands the following join operations are supported: <b>inner
joins</b>, <b>outer joins</b> and <b>left joins</b>.
     Their <a href="https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics">semantics</a>
are similar to the corresponding operators in relational databases.
 
-    <h5><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate
a stream</a></h5>
+    <h6><a id="streams_dsl_aggregations" href="#streams_dsl_aggregations">Aggregate
a stream</a></h6>
     An <b>aggregation</b> operation takes one input stream, and yields a new
stream by combining multiple input records into a single output record. Examples of aggregations
are computing counts or sum. An aggregation over record streams usually needs to be performed
on a windowing basis because otherwise the number of records that must be maintained for performing
the aggregation may grow indefinitely.
 
     <p>
@@ -624,53 +1075,6 @@ Note that in the <code>WordCountProcessor</code> implementation,
users need to r
         When such late arrival happens, the aggregating <code>KStream</code>
or <code>KTable</code> simply emits a new aggregate value. Because the output
is a <code>KTable</code>, the new value is considered to overwrite the old value
with the same key in subsequent processing steps.
     </p>
 
-    <h4><a id="streams_dsl_transform" href="#streams_dsl_transform">Transform
a stream</a></h4>
-
-    <p>
-        Besides join and aggregation operations, there is a list of other transformation
operations provided for <code>KStream</code> and <code>KTable</code>
respectively.
-        Each of these operations may generate either one or more <code>KStream</code>
and <code>KTable</code> objects and
-        can be translated into one or more connected processors into the underlying processor
topology.
-        All these transformation methods can be chained together to compose a complex processor
topology.
-        Since <code>KStream</code> and <code>KTable</code> are strongly
typed, all these transformation operations are defined as
-        generics functions where users could specify the input and output data types.
-    </p>
-
-    <p>
-        Among these transformations, <code>filter</code>, <code>map</code>,
<code>mapValues</code>, etc, are stateless
-        transformation operations and can be applied to both <code>KStream</code>
and <code>KTable</code>,
-        where users can usually pass a customized function to these functions as a parameter,
such as <code>Predicate</code> for <code>filter</code>,
-        <code>KeyValueMapper</code> for <code>map</code>, etc:
-
-    </p>
-
-    <pre class="brush: java;">
-    // written in Java 8+, using lambda expressions
-    KStream&lt;String, GenericRecord&gt; mapped = source1.mapValue(record -> record.get("category"));
-    </pre>
-
-    <p>
-        Stateless transformations, by definition, do not depend on any state for processing,
and hence implementation-wise
-        they do not require a state store associated with the stream processor; Stateful
transformations, on the other hand,
-        require accessing an associated state for processing and producing outputs.
-        For example, in <code>join</code> and <code>aggregate</code>
operations, a windowing state is usually used to store all the received records
-        within the defined window boundary so far. The operators can then access these accumulated
records in the store and compute
-        based on them.
-    </p>
-
-    <pre class="brush: java;">
-    // written in Java 8+, using lambda expressions
-    KTable&lt;Windowed&lt;String&gt;, Long&gt; counts = source1.groupByKey().aggregate(
-    () -> 0L,  // initial value
-    (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
-    TimeWindows.of("counts", 5000L).advanceBy(1000L), // intervals in milliseconds
-    Serdes.Long() // serde for aggregated value
-    );
-
-    KStream&lt;String, String&gt; joined = source1.leftJoin(source2,
-    (record1, record2) -> record1.get("user") + "-" + record2.get("region");
-    );
-    </pre>
-
     <h4><a id="streams_dsl_sink" href="#streams_dsl_sink">Write streams back
to Kafka</a></h4>
 
     <p>


Mime
View raw message