kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [1/3] kafka git commit: MINOR: Update KTable JavaDoc
Date Fri, 27 Jan 2017 16:40:56 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk ec76ea343 -> 1bd11b23a


http://git-wip-us.apache.org/repos/asf/kafka/blob/1bd11b23/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
index d95af0e..bfb470a 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KTable.java
@@ -17,378 +17,832 @@
 
 package org.apache.kafka.streams.kstream;
 
+import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.internals.WindowedSerializer;
+import org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner;
 import org.apache.kafka.streams.processor.StreamPartitioner;
-import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 
 /**
  * {@link KTable} is an abstraction of a <i>changelog stream</i> from a primary-keyed table.
- * Each record in this stream is an update on the primary-keyed table with the record key as the primary key.
+ * Each record in this changelog stream is an update on the primary-keyed table with the record key as the primary key.
  * <p>
- * A {@link KTable} is either defined from one or multiple Kafka topics that are consumed message by message or
- * the result of a {@link KTable} transformation. An aggregation of a {@link KStream} also yields a {@link KTable}.
+ * A {@link KTable} is either {@link KStreamBuilder#table(String, String) defined from a single Kafka topic} that is
+ * consumed message by message or the result of a {@link KTable} transformation.
+ * An aggregation of a {@link KStream} also yields a {@link KTable}.
  * <p>
  * A {@link KTable} can be transformed record by record, joined with another {@link KTable} or {@link KStream}, or
  * can be re-partitioned and aggregated into a new {@link KTable}.
+ * <p>
+ * Some {@link KTable}s have an internal state (a {@link ReadOnlyKeyValueStore}) and are therefore queryable via the
+ * interactive queries API.
+ * For example:
+ * <pre>{@code
+ *     final KTable table = ...
+ *     ...
+ *     final KafkaStreams streams = ...;
+ *     streams.start()
+ *     ...
+ *     final String queryableStoreName = table.getStoreName(); // returns null if KTable is not queryable
+ *     ReadOnlyKeyValueStore view = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+ *     view.get(key);
+ *}</pre>
  *
  * @param <K> Type of primary keys
  * @param <V> Type of value changes
- *
  * @see KStream
+ * @see KGroupedTable
+ * @see GlobalKTable
+ * @see KStreamBuilder#table(String, String)
  */
 @InterfaceStability.Unstable
 public interface KTable<K, V> {
 
     /**
-     * Create a new instance of {@link KTable} that consists of all elements of this stream which satisfy a predicate.
-     *
-     * @param predicate     the instance of {@link Predicate}
-     *
+     * Create a new {@link KTable} that consists of all records of this {@link KTable} which satisfy the given
+     * predicate.
+     * All records that do not satisfy the predicate are dropped.
+     * For each {@link KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@link KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filter} for a <i>changelog stream</i> works different to {@link KStream#filter(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., dot not satisfied the given predicate) a tombstone record
+     * is forwarded.
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
      * @return a {@link KTable} that contains only those records that satisfy the given predicate
+     * @see #filterNot(Predicate)
      */
-    KTable<K, V> filter(Predicate<? super K, ? super V> predicate);
+    KTable<K, V> filter(final Predicate<? super K, ? super V> predicate);
 
     /**
-     * Create a new instance of {@link KTable} that consists all elements of this stream which do not satisfy a predicate.
-     *
-     * @param predicate     the instance of {@link Predicate}
-     *
-     * @return a {@link KTable} that contains only those records that do not satisfy the given predicate
+     * Create a new {@link KTable} that consists all records of this {@link KTable} which do <em>not</em> satisfy the
+     * given predicate.
+     * All records that <em>do</em> satisfy the predicate are dropped.
+     * For each {@link KTable} update the filter is evaluated on the update record to produce an update record for the
+     * result {@link KTable}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * Note that {@code filterNot} for a <i>changelog stream</i> works different to {@link KStream#filterNot(Predicate)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided filter predicate is not evaluated but the tombstone record is forwarded
+     * directly if required (i.e., if there is anything to be deleted).
+     * Furthermore, for each record that gets dropped (i.e., does satisfy the given predicate) a tombstone record is
+     * forwarded.
+     *
+     * @param predicate a filter {@link Predicate} that is applied to each record
+     * @return a {@link KTable} that contains only those records that do <em>not</em> satisfy the given predicate
+     * @see #filter(Predicate)
      */
-    KTable<K, V> filterNot(Predicate<? super K, ? super V> predicate);
+    KTable<K, V> filterNot(final Predicate<? super K, ? super V> predicate);
 
     /**
-     * Create a new instance of {@link KTable} by transforming the value of each element in this stream into a new value in the new stream.
-     *
-     * @param mapper        the instance of {@link ValueMapper}
-     * @param <V1>          the value type of the new stream
-     *
-     * @return a {@link KTable} that contains records with unmodified keys and new values of different type
+     * Create a new {@link KTable} by transforming the value of each record in this {@link KTable} into a new value
+     * (with possible new type)in the new {@link KTable}.
+     * For each {@link KTable} update the provided {@link ValueMapper} is applied to the value of the update record and
+     * computes a new value for it, resulting in an update record for the result {@link KTable}.
+     * Thus, an input record {@code <K,V>} can be transformed into an output record {@code <K:V'>}.
+     * This is a stateless record-by-record operation.
+     * <p>
+     * The example below counts the number of token of the value string.
+     * <pre>{@code
+     * KTable<String, String> inputTable = builder.table("topic");
+     * KTable<String, Integer> outputTable = inputTable.mapValue(new ValueMapper<String, Integer> {
+     *     Integer apply(String value) {
+     *         return value.split(" ").length;
+     *     }
+     * });
+     * }</pre>
+     * <p>
+     * This operation preserves data co-location with respect to the key.
+     * Thus, <em>no</em> internal data redistribution is required if a key based operator (like a join) is applied to
+     * the result {@link KTable}.
+     * <p>
+     * Note that {@code mapValues} for a <i>changelog stream</i> works different to {@link KStream#mapValues(ValueMapper)
+     * record stream filters}, because {@link KeyValue records} with {@code null} values (so-called tombstone records)
+     * have delete semantics.
+     * Thus, for tombstones the provided value-mapper is not evaluated but the tombstone record is forwarded directly to
+     * delete the corresponding record in the result {@link KTable}.
+     *
+     * @param mapper a {@link ValueMapper} that computes a new output value
+     * @param <VR>   the value type of the result {@link KTable}
+     * @return a {@link KTable} that contains records with unmodified keys and new values (possibly of different type)
      */
-    <V1> KTable<K, V1> mapValues(ValueMapper<? super V, ? extends V1> mapper);
+    <VR> KTable<K, VR> mapValues(final ValueMapper<? super V, ? extends VR> mapper);
 
 
     /**
-     * Print the elements of this stream to {@code System.out}. This function
-     * will use the generated name of the parent processor node to label the key/value pairs
-     * printed out to the console.
-     *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Print the update records of this {@link KTable} to {@code System.out}.
+     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
+     * the console.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code print()} is not applied to the internal state store and only called for each new {@link KTable}
+     * update record.
      */
     void print();
 
     /**
-     * Print the elements of this stream to {@code System.out}.  This function
-     * will use the given name to label the key/value printed out to the console.
-     *
-     * @param streamName the name used to label the key/value pairs printed out to the console
-     *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Print the update records of this {@link KTable} to {@code System.out}.
+     * This function will use the given name to label the key/value pairs printed to the console.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code print()} is not applied to the internal state store and only called for each new {@link KTable}
+     * update record.
+     *
+     * @param streamName the name used to label the key/value pairs printed to the console
      */
-    void print(String streamName);
+    void print(final String streamName);
 
     /**
-     * Print the elements of this stream to {@code System.out}
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Print the update records of this {@link KTable} to {@code System.out}.
+     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
+     * the console.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code print()} is not applied to the internal state store and only called for each new {@link KTable}
+     * update record.
+     *
+     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
      */
-    void print(Serde<K> keySerde, Serde<V> valSerde);
+    void print(final Serde<K> keySerde,
+               final Serde<V> valSerde);
 
     /**
-     * Print the elements of this stream to System.out
-     *
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param streamName the name used to label the key/value pairs printed out to the console
-     *
-     * Implementors will need to override toString for keys and values that are not of
-     * type String, Integer etc to get meaningful information.
+     * Print the update records of this {@link KTable} to {@code System.out}.
+     * This function will use the given name to label the key/value pairs printed to the console.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code print()} is not applied to the internal state store and only called for each new {@link KTable}
+     * update record.
+     *
+     * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
+     * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
+     * @param streamName the name used to label the key/value pairs printed to the console
      */
-    void print(Serde<K> keySerde, Serde<V> valSerde, String streamName);
+    void print(final Serde<K> keySerde,
+               final Serde<V> valSerde,
+               final String streamName);
 
     /**
-     * Write the elements of this stream to a file at the given path using default serializers and deserializers.
-     * @param filePath name of file to write to
+     * Write the update records of this {@link KTable} to a file at the given path.
+     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
+     * the file.
+     * <p>
+     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
+     * {@link KTable} update record.
      *
-     * Implementors will need to override {@code toString} for keys and values that are not of
-     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     * @param filePath name of file to write to
      */
-    void writeAsText(String filePath);
+    void writeAsText(final String filePath);
 
     /**
-     * Write the elements of this stream to a file at the given path.
-     *
-     * @param filePath name of file to write to
+     * Write the update records of this {@link KTable} to a file at the given path.
+     * This function will use the given name to label the key/value printed to the file.
+     * <p>
+     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
+     * {@link KTable} update record.
+     *
+     * @param filePath   name of file to write to
      * @param streamName the name used to label the key/value pairs printed out to the console
-     *
-     * Implementors will need to override {@code toString} for keys and values that are not of
-     * type {@link String}, {@link Integer} etc. to get meaningful information.
      */
-    void writeAsText(String filePath, String streamName);
+    void writeAsText(final String filePath,
+                     final String streamName);
 
     /**
-     * Write the elements of this stream to a file at the given path.
+     * Write the update records of this {@link KTable} to a file at the given path.
+     * This function will use the generated name of the parent processor node to label the key/value pairs printed to
+     * the file.
+     * <p>
+     * The provided serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
+     * {@link KTable} update record.
      *
      * @param filePath name of file to write to
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     *
-     * Implementors will need to override {@code toString} for keys and values that are not of
-     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
      */
-    void  writeAsText(String filePath, Serde<K> keySerde, Serde<V> valSerde);
+    void  writeAsText(final String filePath,
+                      final Serde<K> keySerde,
+                      final Serde<V> valSerde);
 
     /**
-     * @param filePath name of file to write to
-     * @param streamName the name used to label the key/value pairs printed out to the console
-     * @param keySerde key serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
-     * @param valSerde value serde used to send key-value pairs,
-     *                 if not specified the default serde defined in the configs will be used
+     * Write the update records of this {@link KTable} to a file at the given path.
+     * This function will use the given name to label the key/value printed to the file.
+     * <p>
+     * The default serde will be used to deserialize the key or value in case the type is {@code byte[]} before calling
+     * {@code toString()} on the deserialized object.
+     * <p>
+     * Implementors will need to override {@code toString()} for keys and values that are not of type {@link String},
+     * {@link Integer} etc. to get meaningful information.
+     * <p>
+     * Note that {@code writeAsText()} is not applied to the internal state store and only called for each new
+     * {@link KTable} update record.
      *
-     * Implementors will need to override {@code toString} for keys and values that are not of
-     * type {@link String}, {@link Integer} etc. to get meaningful information.
+     * @param filePath name of file to write to
+     * @param streamName the name used to label the key/value pairs printed to the console
+     * @param keySerde key serde used to deserialize key if type is {@code byte[]},
+     * @param valSerde value serde used to deserialize value if type is {@code byte[]},
      */
-
-    void writeAsText(String filePath, String streamName, Serde<K> keySerde, Serde<V> valSerde);
+    void writeAsText(final String filePath,
+                     final String streamName,
+                     final Serde<K> keySerde,
+                     final Serde<V> valSerde);
 
     /**
-     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
-     * using default serializers and deserializers and producer's {@link DefaultPartitioner}.
-     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String, String)}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Perform an action on each update record of this {@link KTable}.
+     * Note that this is a terminal operation that returns void.
+     * <p>
+     * Note that {@code foreach()} is not applied to the internal state store and only called for each new
+     * {@link KTable} update record.
      *
-     * @param topic         the topic name
-     * @param storeName     the state store name used for this KTable
-     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
+     * @param action an action to perform on each record
      */
-    KTable<K, V> through(String topic, String storeName);
+    void foreach(final ForeachAction<? super K, ? super V> action);
 
     /**
-     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic using default serializers
-     * and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * This is equivalent to calling {@link #to(String)} and {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(String, String)}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Convert this changelog stream to a {@link KStream}.
+     * <p>
+     * Note that this is a logical operation and only changes the "interpretation" of the stream, i.e., each record of
+     * this changelog stream is no longer treated as an update record (cf. {@link KStream} vs {@link KTable}).
      *
-     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
-     *                     if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic        the topic name
-     * @param storeName    the state store name used for this KTable
-     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
+     * @return a {@link KStream} that contains the same records as this {@link KTable}
      */
-    KTable<K, V> through(StreamPartitioner<? super K, ? super V> partitioner, String topic, String storeName);
+    KStream<K, V> toStream();
 
     /**
-     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic.
-     * If {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer}
-     * for the key {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
-     * &mdash; otherwise producer's {@link DefaultPartitioner} is used.
-     * This is equivalent to calling {@link #to(Serde, Serde, String)} and
-     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String, String)}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
-     *
-     * @param keySerde     key serde used to send key-value pairs,
-     *                     if not specified the default key serde defined in the configuration will be used
-     * @param valSerde     value serde used to send key-value pairs,
-     *                     if not specified the default value serde defined in the configuration will be used
-     * @param topic        the topic name
-     * @param storeName    the state store name used for this KTable
-     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
+     * Convert this changelog stream to a {@link KStream} using the given {@link KeyValueMapper} to select the new key.
+     * <p>
+     * For example, you can compute the new key as the length of the value string.
+     * <pre>{@code
+     * KTable<String, String> table = builder.table("topic");
+     * KTable<Integer, String> keyedStream = table.toStream(new KeyValueMapper<String, String, Integer> {
+     *     Integer apply(String key, String value) {
+     *         return value.length();
+     *     }
+     * });
+     * }</pre>
+     * Setting a new key might result in an internal data redistribution if a key based operator (like an aggregation or
+     * join) is applied to the result {@link KStream}.
+     * <p>
+     * This operation is equivalent to calling
+     * {@code table.}{@link #toStream() toStream}{@code ().}{@link KStream#selectKey(KeyValueMapper) selectKey(KeyValueMapper)}.
+     * <p>
+     * Note that {@link #toStream()} is a logical operation and only changes the "interpretation" of the stream, i.e.,
+     * each record of this changelog stream is no longer treated as an update record (cf. {@link KStream} vs {@link KTable}).
+     *
+     * @param mapper a {@link KeyValueMapper} that computes a new key for each record
+     * @param <KR> the new key type of the result stream
+     * @return a {@link KStream} that contains the same records as this {@link KTable}
      */
-    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, String topic, String storeName);
+    <KR> KStream<KR, V> toStream(final KeyValueMapper<? super K, ? super V, ? extends KR> mapper);
 
     /**
-     * Materialize this stream to a topic, also creates a new instance of {@link KTable} from the topic
-     * using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
-     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)} and
-     * {@link org.apache.kafka.streams.kstream.KStreamBuilder#table(Serde, Serde, String, String)}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
-     *
-     * @param keySerde     key serde used to send key-value pairs,
-     *                     if not specified the default key serde defined in the configuration will be used
-     * @param valSerde     value serde used to send key-value pairs,
-     *                     if not specified the default value serde defined in the configuration will be used
-     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
-     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
-     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
-     *                     &mdash; otherwise {@link DefaultPartitioner} will be used
-     * @param topic        the topic name
-     * @param storeName    the state store name used for this KTable
-     * @return a new {@link KTable} that contains the exact same records as this {@link KTable}
+     * Materialize this changelog stream to a topic and creates a new {@link KTable} from the topic using default
+     * serializers and deserializers and producer's {@link DefaultPartitioner}.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(String) #to(someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param topic     the topic name
+     * @param storeName the state store name used for the result {@link KTable}
+     * @return a {@link KTable} that contains the exact same (and potentially repartitioned) records as this {@link KTable}
      */
-    KTable<K, V> through(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic, String storeName);
+    KTable<K, V> through(final String topic,
+                         final String storeName);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config
-     * and producer's {@link DefaultPartitioner}.
-     *
-     * @param topic         the topic name
+     * Materialize this changelog stream to a topic and creates a new {@link KTable} from the topic using default
+     * serializers and deserializers and a customizable {@link StreamPartitioner} to determine the distribution of
+     * records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(StreamPartitioner, String) #to(partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified producer's {@link DefaultPartitioner} will be used
+     * @param topic       the topic name
+     * @param storeName   the state store name used for the result {@link KTable}
+     * @return a {@link KTable} that contains the exact same (and potentially repartitioned) records as this {@link KTable}
      */
-    void to(String topic);
+    KTable<K, V> through(final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic,
+                         final String storeName);
 
     /**
-     * Materialize this stream to a topic using default serializers specified in the config
-     * and a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
-     *
-     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
-     *                     if not specified producer's {@link DefaultPartitioner} will be used
-     * @param topic        the topic name
+     * Materialize this changelog stream to a topic and creates a new {@link KTable} from the topic.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, String) #to(keySerde, valueSerde, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param keySerde  key serde used to send key-value pairs,
+     *                  if not specified the default key serde defined in the configuration will be used
+     * @param valSerde  value serde used to send key-value pairs,
+     *                  if not specified the default value serde defined in the configuration will be used
+     * @param topic     the topic name
+     * @param storeName the state store name used for the result {@link KTable}
+     * @return a {@link KTable} that contains the exact same (and potentially repartitioned) records as this {@link KTable}
      */
-    void to(StreamPartitioner<? super K, ? super V> partitioner, String topic);
+    KTable<K, V> through(final Serde<K> keySerde, Serde<V> valSerde,
+                         final String topic,
+                         final String storeName);
 
     /**
-     * Materialize this stream to a topic. If {@code keySerde} provides a
-     * {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
-     * {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} is used
-     * &mdash; otherwise producer's {@link DefaultPartitioner} is used.
-     *
-     * @param keySerde     key serde used to send key-value pairs,
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valSerde     value serde used to send key-value pairs,
-     *                     if not specified the default serde defined in the configs will be used
-     * @param topic        the topic name
+     * Materialize this changelog stream to a topic and creates a new {@link KTable} from the topic using a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * This is equivalent to calling {@link #to(Serde, Serde, StreamPartitioner, String)
+     * #to(keySerde, valueSerde, partitioner, someTopicName)} and
+     * {@link KStreamBuilder#table(String, String) KStreamBuilder#table(someTopicName, queryableStoreName)}.
+     * <p>
+     * The resulting {@link KTable} will be materialized in a local state store with the given store name (cf.
+     * {@link KStreamBuilder#table(String, String)})
+     *
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
+     *                    be used
+     * @param topic      the topic name
+     * @param storeName  the state store name used for the result {@link KTable}
+     * @return a {@link KTable} that contains the exact same (and potentially repartitioned) records as this {@link KTable}
      */
-    void to(Serde<K> keySerde, Serde<V> valSerde, String topic);
+    KTable<K, V> through(final Serde<K> keySerde,
+                         final Serde<V> valSerde,
+                         final StreamPartitioner<? super K, ? super V> partitioner,
+                         final String topic,
+                         final String storeName);
 
     /**
-     * Materialize this stream to a topic using a customizable {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * Materialize this changelog stream to a topic using default serializers and deserializers and producer's
+     * {@link DefaultPartitioner}.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
      *
-     * @param keySerde     key serde used to send key-value pairs,
-     *                     if not specified the default serde defined in the configs will be used
-     * @param valSerde     value serde used to send key-value pairs,
-     *                     if not specified the default serde defined in the configs will be used
-     * @param partitioner  the function used to determine how records are distributed among partitions of the topic,
-     *                     if not specified and {@code keySerde} provides a {@link org.apache.kafka.streams.kstream.internals.WindowedSerializer} for the key
-     *                     {@link org.apache.kafka.streams.kstream.internals.WindowedStreamPartitioner} will be used
-     *                     &mdash; otherwise {@link DefaultPartitioner} will be used
-     * @param topic        the topic name
+     * @param topic the topic name
      */
-    void to(Serde<K> keySerde, Serde<V> valSerde, StreamPartitioner<? super K, ? super V> partitioner, String topic);
+    void to(final String topic);
 
     /**
-     * Convert this stream to a new instance of {@link KStream}.
-     *
-     * @return a {@link KStream} that contains the same records as this {@link KTable};
-     *         the records are no longer treated as updates on a primary-keyed table,
-     *         but rather as normal key-value pairs in a record stream
+     * Materialize this changelog stream to a topic using default serializers and deserializers and a customizable
+     * {@link StreamPartitioner} to determine the distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     *
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified producer's {@link DefaultPartitioner} will be used
+     * @param topic       the topic name
      */
-    KStream<K, V> toStream();
+    void to(final StreamPartitioner<? super K, ? super V> partitioner,
+            final String topic);
 
     /**
-     *  Convert this stream to a new instance of {@link KStream} using the given {@link KeyValueMapper} to select
-     *  the new key.
-     *
-     * @param mapper  @param mapper  the instance of {@link KeyValueMapper}
-     * @param <K1> the new key type
+     * Materialize this changelog stream to a topic.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     * <p>
+     * If {@code keySerde} provides a {@link WindowedSerializer} for the key {@link WindowedStreamPartitioner} is
+     * used&mdash;otherwise producer's {@link DefaultPartitioner} is used.
      *
-     * @return a {@link KStream} that contains the transformed records from this {@link KTable};
-     *         the records are no longer treated as updates on a primary-keyed table,
-     *         but rather as normal key-value pairs in a record stream
+     * @param keySerde key serde used to send key-value pairs,
+     *                 if not specified the default key serde defined in the configuration will be used
+     * @param valSerde value serde used to send key-value pairs,
+     *                 if not specified the default value serde defined in the configuration will be used
+     * @param topic    the topic name
      */
-    <K1> KStream<K1, V> toStream(KeyValueMapper<? super K, ? super V, ? extends K1> mapper);
+    void to(final Serde<K> keySerde,
+            final Serde<V> valSerde,
+            final String topic);
 
     /**
-     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Inner Join.
-     *
-     * @param other         the instance of {@link KTable} joined with this stream
-     * @param joiner        the instance of {@link ValueJoiner}
-     * @param <V1>          the value type of the other stream
-     * @param <R>           the value type of the new stream
-     *
-     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
-     *         one for each matched record-pair with the same key
+     * Materialize this changelog stream to a topic using a customizable {@link StreamPartitioner} to determine the
+     * distribution of records to partitions.
+     * The specified topic should be manually created before it is used (i.e., before the Kafka Streams application is
+     * started).
+     *
+     * @param keySerde    key serde used to send key-value pairs,
+     *                    if not specified the default key serde defined in the configuration will be used
+     * @param valSerde    value serde used to send key-value pairs,
+     *                    if not specified the default value serde defined in the configuration will be used
+     * @param partitioner the function used to determine how records are distributed among partitions of the topic,
+     *                    if not specified and {@code keySerde} provides a {@link WindowedSerializer} for the key
+     *                    {@link WindowedStreamPartitioner} will be used&mdash;otherwise {@link DefaultPartitioner} will
+     *                    be used
+     * @param topic      the topic name
      */
-    <V1, R> KTable<K, R> join(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
+    void to(final Serde<K> keySerde,
+            final Serde<V> valSerde,
+            final StreamPartitioner<? super K, ? super V> partitioner,
+            final String topic);
 
     /**
-     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Outer Join.
-     *
-     * @param other         the instance of {@link KTable} joined with this stream
-     * @param joiner        the instance of {@link ValueJoiner}
-     * @param <V1>          the value type of the other stream
-     * @param <R>           the value type of the new stream
-     *
-     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
-     *         one for each matched record-pair with the same key
+     * Re-groups the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers
+     * and deserializers.
+     * Each {@link KeyValue} pair of this {@link KTable} is mapped to a new {@link KeyValue} pair by applying the
+     * provided {@link KeyValueMapper}.
+     * Re-grouping a {@link KTable} is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedTable}).
+     * The {@link KeyValueMapper} selects a new key and value (with should both have unmodified type).
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * an internally generated name, and "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * All data of this {@link KTable} will be redistributed through the repartitioning topic by writing all update
+     * records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned
+     * on the new key.
+     * <p>
+     * If the key or value type is changed, it is recommended to use {@link #groupBy(KeyValueMapper, Serde, Serde)}
+     * instead.
+     *
+     * @param selector a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
+     * @param <KR>     the key type of the result {@link KGroupedTable}
+     * @param <VR>     the value type of the result {@link KGroupedTable}
+     * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@link KTable}
      */
-    <V1, R> KTable<K, R> outerJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
+    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector);
 
     /**
-     * Combine values of this stream with another {@link KTable} stream's elements of the same key using Left Join.
-     *
-     * @param other         the instance of {@link KTable} joined with this stream
-     * @param joiner        the instance of {@link ValueJoiner}
-     * @param <V1>          the value type of the other stream
-     * @param <R>           the value type of the new stream
-     *
-     * @return a {@link KTable} that contains join-records for each key and values computed by the given {@link ValueJoiner},
-     *         one for each matched record-pair with the same key
+     * Re-groups the records of this {@link KTable} using the provided {@link KeyValueMapper}.
+     * Each {@link KeyValue} pair of this {@link KTable} is mapped to a new {@link KeyValue} pair by applying the
+     * provided {@link KeyValueMapper}.
+     * Re-grouping a {@link KTable} is required before an aggregation operator can be applied to the data
+     * (cf. {@link KGroupedTable}).
+     * The {@link KeyValueMapper} selects a new key and value (both with potentially different type).
+     * If the new record key is {@code null} the record will not be included in the resulting {@link KGroupedTable}
+     * <p>
+     * Because a new key is selected, an internal repartitioning topic will be created in Kafka.
+     * This topic will be named "${applicationId}-XXX-repartition", where "applicationId" is user-specified in
+     * {@link  StreamsConfig} via parameter {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is
+     * an internally generated name, and "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * All data of this {@link KTable} will be redistributed through the repartitioning topic by writing all update
+     * records to and rereading all update records from it, such that the resulting {@link KGroupedTable} is partitioned
+     * on the new key.
+     *
+     * @param selector   a {@link KeyValueMapper} that computes a new grouping key and value to be aggregated
+     * @param keySerde   key serdes for materializing this stream,
+     *                   if not specified the default serdes defined in the configs will be used
+     * @param valueSerde value serdes for materializing this stream,
+     *                   if not specified the default serdes defined in the configs will be used
+     * @param <KR>       the key type of the result {@link KGroupedTable}
+     * @param <VR>       the value type of the result {@link KGroupedTable}
+     * @return a {@link KGroupedTable} that contains the re-grouped records of the original {@link KTable}
      */
-    <V1, R> KTable<K, R> leftJoin(KTable<K, V1> other, ValueJoiner<? super V, ? super V1, ? extends R> joiner);
-
+    <KR, VR> KGroupedTable<KR, VR> groupBy(final KeyValueMapper<? super K, ? super V, KeyValue<KR, VR>> selector,
+                                           final Serde<KR> keySerde,
+                                           final Serde<VR> valueSerde);
 
     /**
-     * Group the records of this {@link KTable} using the provided {@link KeyValueMapper}.
-     * 
-     * @param selector      select the grouping key and value to be aggregated
-     * @param keySerde      key serdes for materializing this stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param valueSerde    value serdes for materializing this stream,
-     *                      if not specified the default serdes defined in the configs will be used
-     * @param <K1>          the key type of the {@link KGroupedTable}
-     * @param <V1>          the value type of the {@link KGroupedTable}
-     *
-     * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
+     * Join records of this {@link KTable} with another {@link KTable}'s records using non-windowed inner equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * The result is an ever updating {@link KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@link KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@link KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@link KTable} the result gets updated.
+     * <p>
+     * For each {@link KTable} record that finds a corresponding record in the other {@link KTable} the provided
+     * {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded
+     * directly to delete a record in the result {@link KTable} if required (i.e., if there is anything to be deleted).
+     * <p>
+     * A {@link KTable} input record key cannot be {@code null}.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:C&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(C,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:C&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams need to be co-partitioned on the join key.
+     * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
+     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen both input {@link KTable}s.
+     * For this case, all data of a {@link KTable} will be redistributed through the repartitioning topic by writing all
+     * update records to and rereading all update records from it, such that the join input {@link KTable} is
+     * partitioned correctly on its key.
+     *
+     * @param other  the other {@link KTable} to be joined with this {@link KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@link KTable}
+     * @param <VR>   the value type of the result {@link KTable}
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key
+     * @see #leftJoin(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
      */
-    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector, Serde<K1> keySerde, Serde<V1> valueSerde);
+    <VO, VR> KTable<K, VR> join(final KTable<K, VO> other,
+                                final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
 
     /**
-     * Group the records of this {@link KTable} using the provided {@link KeyValueMapper} and default serializers and deserializers.
-     *
-     * @param selector      select the grouping key and value to be aggregated
-     * @param <K1>          the key type of the {@link KGroupedTable}
-     * @param <V1>          the value type of the {@link KGroupedTable}
-     *
-     * @return a {@link KGroupedTable} that contains the re-partitioned records of this {@link KTable}
+     * Join records of this {@link KTable} (left input) with another {@link KTable}'s (right input) records using
+     * non-windowed left equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join}, all records from left {@link KTable} will produce
+     * an output record (cf. below).
+     * The result is an ever updating {@link KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@link KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@link KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@link KTable} the result gets updated.
+     * <p>
+     * For each {@link KTable} record that finds a corresponding record in the other {@link KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record of left {@link KTable} that does not find a corresponding record in the
+     * right {@link KTable}'s state the provided {@link ValueJoiner} will be called with {@code rightValue =
+     * null} to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * For example, for left input tombstones the provided value-joiner is not called but a tombstone record is
+     * forwarded directly to delete a record in the result {@link KTable} if required (i.e., if there is anything to be
+     * deleted).
+     * <p>
+     * A {@link KTable} input record key cannot be {@code null}.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * </tr>
+     * </table>
+     * Both input streams need to be co-partitioned on the join key.
+     * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
+     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen both input {@link KTable}s.
+     * For this case, all data of a {@link KTable} will be redistributed through the repartitioning topic by writing all
+     * update records to and rereading all update records from it, such that the join input {@link KTable} is
+     * partitioned correctly on its key.
+     *
+     * @param other  the other {@link KTable} to be joined with this {@link KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@link KTable}
+     * @param <VR>   the value type of the result {@link KTable}
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * left {@link KTable}
+     * @see #join(KTable, ValueJoiner)
+     * @see #outerJoin(KTable, ValueJoiner)
      */
-    <K1, V1> KGroupedTable<K1, V1> groupBy(KeyValueMapper<? super K, ? super V, KeyValue<K1, V1>> selector);
+    <VO, VR> KTable<K, VR> leftJoin(final KTable<K, VO> other,
+                                    final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
 
     /**
-     * Perform an action on each element of {@link KTable}.
-     * Note that this is a terminal operation that returns void.
-     *
-     * @param action an action to perform on each element
+     * Join records of this {@link KTable} (left input) with another {@link KTable}'s (right input) records using
+     * non-windowed outer equi join.
+     * The join is a primary key join with join attribute {@code thisKTable.key == otherKTable.key}.
+     * In contrast to {@link #join(KTable, ValueJoiner) inner-join} or {@link #leftJoin(KTable, ValueJoiner) left-join},
+     * all records from both input {@link KTable}s will produce an output record (cf. below).
+     * The result is an ever updating {@link KTable} that represents the <em>current</em> (i.e., processing time) result
+     * of the join.
+     * <p>
+     * The join is computed by (1) updating the internal state of one {@link KTable} and (2) performing a lookup for a
+     * matching record in the <em>current</em> (i.e., processing time) internal state of the other {@link KTable}.
+     * This happens in a symmetric way, i.e., for each update of either {@code this} or the {@code other} input
+     * {@link KTable} the result gets updated.
+     * <p>
+     * For each {@link KTable} record that finds a corresponding record in the other {@link KTable}'s state the
+     * provided {@link ValueJoiner} will be called to compute a value (with arbitrary type) for the result record.
+     * Additionally, for each record that does not find a corresponding record in the corresponding other
+     * {@link KTable}'s state the provided {@link ValueJoiner} will be called with {@code null} value for the
+     * corresponding other value to compute a value (with arbitrary type) for the result record.
+     * The key of the result record is the same as for both joining input records.
+     * <p>
+     * Note that {@link KeyValue records} with {@code null} values (so-called tombstone records) have delete semantics.
+     * Thus, for input tombstones the provided value-joiner is not called but a tombstone record is forwarded directly
+     * to delete a record in the result {@link KTable} if required (i.e., if there is anything to bedeleted).
+     * <p>
+     * A {@link KTable} input record key cannot be {@code null}.
+     * <p>
+     * Example:
+     * <table border='1'>
+     * <tr>
+     * <th>thisKTable</th>
+     * <th>thisState</th>
+     * <th>otherKTable</th>
+     * <th>otherState</th>
+     * <th>result update record</th>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:ValueJoiner(A,null)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td>&lt;K1:A&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(A,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:b&gt;</td>
+     * <td>&lt;K1:ValueJoiner(null,b)&gt;</td>
+     * </tr>
+     * <tr>
+     * <td></td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * <td></td>
+     * <td>&lt;K1:null&gt;</td>
+     * </tr>
+     * </table>
+     * Both input streams need to be co-partitioned on the join key.
+     * If this requirement is not met, Kafka Streams will automatically repartition the data, i.e., it will create an
+     * internal repartitioning topic in Kafka and write and re-read the data via this topic before the actual join.
+     * The repartitioning topic will be named "${applicationId}-XXX-repartition", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "XXX" is an internally generated name, and
+     * "-repartition" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     * <p>
+     * Repartitioning can happen both input {@link KTable}s.
+     * For this case, all data of a {@link KTable} will be redistributed through the repartitioning topic by writing all
+     * update records to and rereading all update records from it, such that the join input {@link KTable} is
+     * partitioned correctly on its key.
+     *
+     * @param other  the other {@link KTable} to be joined with this {@link KTable}
+     * @param joiner a {@link ValueJoiner} that computes the join result for a pair of matching records
+     * @param <VO>   the value type of the other {@link KTable}
+     * @param <VR>   the value type of the result {@link KTable}
+     * @return a {@link KTable} that contains join-records for each key and values computed by the given
+     * {@link ValueJoiner}, one for each matched record-pair with the same key plus one for each non-matching record of
+     * both {@link KTable}s
+     * @see #join(KTable, ValueJoiner)
+     * @see #leftJoin(KTable, ValueJoiner)
      */
-    void foreach(ForeachAction<? super K, ? super V> action);
+    <VO, VR> KTable<K, VR> outerJoin(final KTable<K, VO> other,
+                                     final ValueJoiner<? super V, ? super VO, ? extends VR> joiner);
 
     /**
-     * Get the name of the local state store used for materializing this {@link KTable}
-     * @return the underlying state store name, or {@code null} if KTable does not have one
+     * Get the name of the local state store used for materializing this {@link KTable}.
+     *
+     * @return the underlying state store name, or {@code null} if this {@link KTable} is not materialized
      */
     String getStoreName();
 }


Mime
View raw message