kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From guozh...@apache.org
Subject [2/3] kafka git commit: MINOR: Update KTable JavaDoc
Date Fri, 27 Jan 2017 16:41:06 GMT
http://git-wip-us.apache.org/repos/asf/kafka/blob/e992aca9/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
index 9e23c07..c736a85 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedTable.java
@@ -19,139 +19,488 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.QueryableStoreType;
 
 /**
- * {@link KGroupedTable} is an abstraction of a <i>grouped changelog stream</i>
from a primary-keyed table,
+ * {@link KGroupedTable} is an abstraction of a <i>re-grouped changelog stream</i>
from a primary-keyed table,
  * usually on a different grouping key than the original primary key.
  * <p>
- * It is an intermediate representation after a re-grouping of a {@link KTable} before an
aggregation is applied
- * to the new partitions resulting in a new {@link KTable}.
+ * It is an intermediate representation after a re-grouping of a {@link KTable} before an
aggregation is applied to the
+ * new partitions resulting in a new {@link KTable}.
+ * <p>
+ * A {@link KGroupedTable} must be obtained from a {@link KTable} via {@link KTable#groupBy(KeyValueMapper)
+ * groupBy(...)}.
  *
- * @param <K> Type of primary keys
- * @param <V> Type of value changes
+ * @param <K> Type of keys
+ * @param <V> Type of values
+ * @see KTable
  */
 @InterfaceStability.Unstable
 public interface KGroupedTable<K, V> {
 
     /**
-     * Combine updating values of this stream by the selected key into a new instance of
{@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
mapped} to
+     * the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param adder         the instance of {@link Reducer} for addition
-     * @param subtractor    the instance of {@link Reducer} for subtraction
-     * @param storeName     the name of the underlying {@link KTable} state store
-     * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
-     *         containing aggregated values for each key
+     * @param storeName the name of the underlying {@link KTable} state store
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, V> reduce(Reducer<V> adder,
-                        Reducer<V> subtractor,
-                        String storeName);
+    KTable<K, Long> count(final String storeName);
 
     /**
-     * Combine updating values of this stream by the selected key into a new instance of
{@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Count number of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
mapped} to
+     * the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param adder         the instance of {@link Reducer} for addition
-     * @param subtractor    the instance of {@link Reducer} for subtraction
-     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a {@link KTable} with the same key and value types as this {@link KGroupedTable},
-     *         containing aggregated values for each key
+     * @param storeSupplier user defined state store supplier
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link
Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
      */
-    KTable<K, V> reduce(Reducer<V> adder,
-                        Reducer<V> subtractor,
-                        final StateStoreSupplier<KeyValueStore> storeSupplier);
+    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Aggregate updating values of this stream by the selected key into a new instance of
{@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of
the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
+     * The specified {@link Reducer adder} is applied for each update record and computes
a new aggregate using the
+     * current aggregate and the record's value by adding the new record to the aggregate.
+     * The specified {@link Reducer substractor} is applied for each "replaced" record of
the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
+     * record from the aggregate.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions
like sum.
+     * For sum, the adder and substractor would work as follows:
+     * <pre>{@code
+     * public class SumAdder implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer newValue) {
+     *     return currentAgg + newValue;
+     *   }
+     * }
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param adder         the instance of {@link Aggregator} for addition
-     * @param subtractor    the instance of {@link Aggregator} for subtraction
-     * @param aggValueSerde value serdes for materializing the aggregated table,
-     *                      if not specified the default serdes defined in the configs will
be used
-     * @param storeName     the name of the underlying {@link KTable} state store
-     * @param <T>           the value type of the aggregated {@link KTable}
-     * @return a {@link KTable} with same key and aggregated value type {@code T},
-     *         containing aggregated values for each key
+     * public class SumSubtractor implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer oldValue) {
+     *     return currentAgg - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param adder      a {@link Reducer} that adds a new value to the aggregate result
+     * @param subtractor a {@link Reducer} that removed an old value from the aggregate result
+     * @param storeName  the name of the underlying {@link KTable} state store
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
      */
-    <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<? super K, ? super V, T> adder,
-                               Aggregator<? super K, ? super V, T> subtractor,
-                               Serde<T> aggValueSerde,
-                               String storeName);
+    KTable<K, V> reduce(final Reducer<V> adder,
+                        final Reducer<V> subtractor,
+                        final String storeName);
 
     /**
-     * Aggregate updating values of this stream by the selected key into a new instance of
{@link KTable}
-     * using default serializers and deserializers.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Combine the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable}.
+     * Records with {@code null} key are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of
the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Aggregator, Serde, String)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
+     * The specified {@link Reducer adder} is applied for each update record and computes
a new aggregate using the
+     * current aggregate and the record's value by adding the new record to the aggregate.
+     * The specified {@link Reducer substractor} is applied for each "replaced" record of
the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
+     * record from the aggregate.
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate
will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, Reducer, String)} can be used to compute aggregate functions
like sum.
+     * For sum, the adder and substractor would work as follows:
+     * <pre>{@code
+     * public class SumAdder implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer newValue) {
+     *     return currentAgg + newValue;
+     *   }
+     * }
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param adder         the instance of {@link Aggregator} for addition
-     * @param subtractor   the instance of {@link Aggregator} for subtraction
-     * @param storeName     the name of the underlying {@link KTable} state store
-     * @param <T>           the value type of the aggregated {@link KTable}
-     * @return a {@link KTable} with same key and aggregated value type {@code T},
-     *         containing aggregated values for each key
+     * public class SumSubtractor implements Reducer<Integer> {
+     *   public Integer apply(Integer currentAgg, Integer oldValue) {
+     *     return currentAgg - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param adder         a {@link Reducer} that adds a new value to the aggregate result
+     * @param subtractor    a {@link Reducer} that removed an old value from the aggregate
result
+     * @param storeSupplier user defined state store supplier
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
      */
-    <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<? super K, ? super V, T> adder,
-                               Aggregator<? super K, ? super V, T> subtractor,
-                               String storeName);
+    KTable<K, V> reduce(final Reducer<V> adder,
+                        final Reducer<V> subtractor,
+                        final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
-     * Aggregate updating values of this stream by the selected key into a new instance of
{@link KTable}
-     * using default serializers and deserializers.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers
and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining
via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * If the result value type does not match the {@link StreamsConfig#VALUE_SERDE_CLASS_CONFIG
default value
+     * serde} you should use {@link KGroupedTable#aggregate(Initializer, Aggregator, Aggregator,
Serde, String)
+     * aggregate(Initializer, Aggregator, Aggregator, Serde, String)}.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input
record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first
record.
+     * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes
a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation
result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record
of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to
compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * // in this example, LongSerde.class must be set as default value serde in StreamsConfig
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param initializer   the instance of {@link Initializer}
-     * @param adder         the instance of {@link Aggregator} for addition
-     * @param subtractor    the instance of {@link Aggregator} for subtraction
-     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @param <T>           the value type of the aggregated {@link KTable}
-     * @return a {@link KTable} with same key and aggregated value type {@code T},
-     *         containing aggregated values for each key
+     * @param initializer a {@link Initializer} that provides an initial aggregate result
value
+     * @param adder       a {@link Aggregator} that adds a new record to the aggregate result
+     * @param subtractor  a {@link Aggregator} that removed an old record from the aggregate
result
+     * @param storeName   the name of the underlying {@link KTable} state store
+     * @param <VR>        the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
      */
-    <T> KTable<K, T> aggregate(Initializer<T> initializer,
-                               Aggregator<? super K, ? super V, T> adder,
-                               Aggregator<? super K, ? super V, T> subtractor,
-                               final StateStoreSupplier<KeyValueStore> storeSupplier);
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor,
+                                 final String storeName);
 
     /**
-     * Count number of records of this stream by the selected key into a new instance of
{@link KTable}.
-     * The resulting {@link KTable} will be materialized in a local state
-     * store with the given store name. Also a changelog topic named "${applicationId}-${storeName}-changelog"
-     * will be automatically created in Kafka for failure recovery, where "applicationID"
-     * is specified by the user in {@link org.apache.kafka.streams.StreamsConfig}.
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers
and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining
via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * that can be queried using the provided {@code storeName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input
record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first
record.
+     * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes
a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation
result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record
of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to
compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
+     *
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
+     * @param initializer   a {@link Initializer} that provides an initial aggregate result
value
+     * @param adder         a {@link Aggregator} that adds a new record to the aggregate
result
+     * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate
result
+     * @param aggValueSerde aggregate value serdes for materializing the aggregated table,
+     *                      if not specified the default serdes defined in the configs will
be used
      * @param storeName     the name of the underlying {@link KTable} state store
-     * @return a {@link KTable} with same key and {@link Long} value type as this {@link
KGroupedTable},
-     *         containing the number of values for each key
+     * @param <VR>          the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
      */
-    KTable<K, Long> count(String storeName);
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor,
+                                 final Serde<VR> aggValueSerde,
+                                 final String storeName);
 
     /**
-     * Count number of records of this stream by the selected key into a new instance of
{@link KTable}.
-     * The resulting {@link KTable} will be materialized in a state
-     * store provided by the {@link StateStoreSupplier}.
+     * Aggregate the value of records of the original {@link KTable} that got {@link KTable#groupBy(KeyValueMapper)
+     * mapped} to the same key into a new instance of {@link KTable} using default serializers
and deserializers.
+     * Records with {@code null} key are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, Reducer, String) combining
via reduce(...)} as it,
+     * for example, allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating
materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog
stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input
record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first
record.
+     * Each update to the original {@link KTable} results in a two step update of the result
{@link KTable}.
+     * The specified {@link Aggregator adder} is applied for each update record and computes
a new aggregate using the
+     * current aggregate (or for the very first record using the intermediate aggregation
result provided via the
+     * {@link Initializer}) and the record's value by adding the new record to the aggregate.
+     * The specified {@link Aggregator substractor} is applied for each "replaced" record
of the original {@link KTable}
+     * and computes a new aggregate using the current aggregate and the record's value by
"removing" the "replaced"
+     * record from the aggregate.
+     * Thus, {@code aggregate(Initializer, Aggregator, Aggregator, String)} can be used to
compute aggregate functions
+     * like sum.
+     * For sum, the initializer, adder, and substractor would work as follows:
+     * <pre>{@code
+     * public class SumInitializer implements Initializer<Long> {
+     *   public Long apply() {
+     *     return 0L;
+     *   }
+     * }
+     *
+     * public class SumAdder implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer newValue, Long aggregate) {
+     *     return aggregate + newValue;
+     *   }
+     * }
      *
-     * @param storeSupplier user defined state store supplier {@link StateStoreSupplier}
-     * @return a {@link KTable} with same key and {@link Long} value type as this {@link
KGroupedTable},
-     * containing the number of values for each key
+     * public class SumSubstractor implements Aggregator<String, Integer, Long> {
+     *   public Long apply(String key, Integer oldValue, Long aggregate) {
+     *     return aggregate - oldValue;
+     *   }
+     * }
+     * }</pre>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate
consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct
keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration}
parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String storeName = storeSupplier.name();
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(storeName, QueryableStoreTypes.<String,
Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is
shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()}
to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that
will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${storeName}-changelog", where
"applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "storeName" is
the
+     * provide {@code storeName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   a {@link Initializer} that provides an initial aggregate result
value
+     * @param adder         a {@link Aggregator} that adds a new record to the aggregate
result
+     * @param subtractor    a {@link Aggregator} that removed an old record from the aggregate
result
+     * @param storeSupplier user defined state store supplier
+     * @param <VR>          the value type of the aggregated {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and
values that represent the
+     * latest (rolling) aggregate for each key
      */
-    KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> adder,
+                                 final Aggregator<? super K, ? super V, VR> subtractor,
+                                 final StateStoreSupplier<KeyValueStore> storeSupplier);
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/e992aca9/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
index 2b8e040..15eb058 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
@@ -31,14 +31,14 @@ import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.TopologyBuilder;
 
 /**
- * {@link KStream} is an abstraction of a <i>record stream</i> of key-value pairs,
i.e., each record is an independent
- * entity/event in the real world.
+ * {@link KStream} is an abstraction of a <i>record stream</i> of {@link KeyValue}
pairs, i.e., each record is an
+ * independent entity/event in the real world.
  * For example a user X might buy two items I1 and I2, and thus there might be two records
{@code <K:I1>, <K:I2>}
  * in the stream.
  * <p>
- * A {@link KStream} is either defined from one or multiple Kafka topics that are consumed
message by message or the
- * result of a {@link KStream} transformation.
- * A {@link KTable} can also be converted into a {@link KStream}.
+ * A {@link KStream} is either {@link KStreamBuilder#stream(String...) defined from one or
multiple Kafka topics} that
+ * are consumed message by message or the result of a {@link KStream} transformation.
+ * A {@link KTable} can also be {@link KTable#toStream() converted} into a {@link KStream}.
  * <p>
  * A {@link KStream} can be transformed record by record, joined with another {@link KStream},
{@link KTable},
  * {@link GlobalKTable}, or can be aggregated into a {@link KTable}.
@@ -51,6 +51,7 @@ import org.apache.kafka.streams.processor.TopologyBuilder;
  * @param <V> Type of values
  * @see KTable
  * @see KGroupedStream
+ * @see KStreamBuilder#stream(String...)
  */
 @SuppressWarnings("unused")
 @InterfaceStability.Unstable
@@ -265,8 +266,8 @@ public interface KStream<K, V> {
 
     /**
      * Print the records of this stream to {@code System.out}.
-     * This function will use the generated name of the parent processor node to label the
key/value pairs printed out
-     * to the console.
+     * This function will use the generated name of the parent processor node to label the
key/value pairs printed to
+     * the console.
      * <p>
      * The default serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -278,7 +279,7 @@ public interface KStream<K, V> {
 
     /**
      * Print the records of this stream to {@code System.out}.
-     * This function will use the given name to label the key/value pairs printed out to
the console.
+     * This function will use the given name to label the key/value pairs printed to the
console.
      * <p>
      * The default serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -286,14 +287,14 @@ public interface KStream<K, V> {
      * Implementors will need to override {@code toString()} for keys and values that are
not of type {@link String},
      * {@link Integer} etc. to get meaningful information.
      *
-     * @param streamName the name used to label the key/value pairs printed out to the console
+     * @param streamName the name used to label the key/value pairs printed to the console
      */
     void print(final String streamName);
 
     /**
      * Print the records of this stream to {@code System.out}.
-     * This function will use the generated name of the parent processor node to label the
key/value pairs printed out
-     * to the console.
+     * This function will use the generated name of the parent processor node to label the
key/value pairs printed to
+     * the console.
      * <p>
      * The provided serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -318,7 +319,7 @@ public interface KStream<K, V> {
      *
      * @param keySerde   key serde used to deserialize key if type is {@code byte[]},
      * @param valSerde   value serde used to deserialize value if type is {@code byte[]},
-     * @param streamName the name used to label the key/value pairs printed out to the console
+     * @param streamName the name used to label the key/value pairs printed to the console
      */
     void print(final Serde<K> keySerde,
                final Serde<V> valSerde,
@@ -326,8 +327,8 @@ public interface KStream<K, V> {
 
     /**
      * Write the records of this stream to a file at the given path.
-     * This function will use the generated name of the parent processor node to label the
key/value pairs printed out
-     * to the file.
+     * This function will use the generated name of the parent processor node to label the
key/value pairs printed to
+     * the file.
      * <p>
      * The default serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -341,7 +342,7 @@ public interface KStream<K, V> {
 
     /**
      * Write the records of this stream to a file at the given path.
-     * This function will use the given name to label the key/value printed out to the file.
+     * This function will use the given name to label the key/value printed to the file.
      * <p>
      * The default serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -357,8 +358,8 @@ public interface KStream<K, V> {
 
     /**
      * Write the records of this stream to a file at the given path.
-     * This function will use the generated name of the parent processor node to label the
key/value pairs printed out
-     * to the file.
+     * This function will use the generated name of the parent processor node to label the
key/value pairs printed to
+     * the file.
      * <p>
      * The provided serde will be used to deserialize the key or value in case the type is
{@code byte[]} before calling
      * {@code toString()} on the deserialized object.
@@ -376,7 +377,7 @@ public interface KStream<K, V> {
 
     /**
      * Write the records of this stream to a file at the given path.
-     * This function will use the given name to label the key/value printed out to the file.
+     * This function will use the given name to label the key/value printed to the file.
      * <p>
      * The provided serde will be used to deserialize the key or value in case the type is
{@code byte[]}
      * before calling {@code toString()} on the deserialized object.


Mime
View raw message