kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From damian...@apache.org
Subject kafka git commit: KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream
Date Mon, 18 Sep 2017 10:54:19 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 346d0ca53 -> d83252eba


KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream

Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to `KGroupedStream`.
Refactor common parts between `KGroupedStream` and `WindowedKStream`

Author: Damian Guy <damian.guy@gmail.com>

Reviewers: Matthias J. Sax <matthias@confluent.io>, Guozhang Wang <wangguoz@gmail.com>

Closes #3827 from dguy/kafka-5654


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83252eb
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83252eb
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83252eb

Branch: refs/heads/trunk
Commit: d83252ebaeeca5bf19584908d95b424beb31b12e
Parents: 346d0ca
Author: Damian Guy <damian.guy@gmail.com>
Authored: Mon Sep 18 11:54:14 2017 +0100
Committer: Damian Guy <damian.guy@gmail.com>
Committed: Mon Sep 18 11:54:14 2017 +0100

----------------------------------------------------------------------
 .../kafka/streams/kstream/KGroupedStream.java   | 210 ++++++++++++++++++-
 .../GroupedStreamAggregateBuilder.java          |  76 +++++++
 .../kstream/internals/KGroupedStreamImpl.java   | 127 +++++++----
 .../streams/kstream/internals/KStreamImpl.java  |  25 +--
 .../kstream/internals/MaterializedInternal.java |  13 +-
 .../kstream/internals/WindowedKStreamImpl.java  |  57 ++---
 .../internals/KGroupedStreamImplTest.java       | 106 ++++++++++
 7 files changed, 515 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
index f12c2b2..08916ef 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
@@ -146,6 +147,38 @@ public interface KGroupedStream<K, V> {
     KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
+     * Count the number of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // counting words
+     * String queryableStoreName = "count-store"; // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that
+     * represent the latest (rolling) count (i.e., number of records) for each key
+     */
+    KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
      * Count the number of records in this stream by the grouped key and the defined windows.
      * Records with {@code null} key or value are ignored.
      * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f.
@@ -395,7 +428,7 @@ public interface KGroupedStream<K, V> {
      * and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param reducer   a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
      */
@@ -452,12 +485,14 @@ public interface KGroupedStream<K, V> {
      * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
      *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
-     * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII
+     * @param reducer               a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @param queryableStoreName    the name of the underlying {@link KTable} state store; valid characters are ASCII
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprectated use {@link #reduce(Reducer, Materialized)}
      */
+    @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
                         final String queryableStoreName);
 
@@ -507,15 +542,69 @@ public interface KGroupedStream<K, V> {
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param reducer   a {@link Reducer} that computes a new aggregate result
+     * @param reducer       a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprectated use {@link #reduce(Reducer, Materialized)}
      */
+    @Deprecated
     KTable<K, V> reduce(final Reducer<V> reducer,
                         final StateStoreSupplier<KeyValueStore> storeSupplier);
 
     /**
+     * Combine the value of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Combining implies that the type of the aggregate result is the same as the type of the input value
+     * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}).
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * provided by the given {@code storeSupplier}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current
+     * aggregate (first argument) and the record's value (second argument):
+     * <pre>{@code
+     * // At the example of a Reducer<Long>
+     * new Reducer<Long>() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }</pre>
+     * <p>
+     * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's
+     * value as-is.
+     * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or
+     * max.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}.
+     * <pre>{@code
+     * KafkaStreams streams = ... // compute sum
+     * String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     *
+     * @param reducer       a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    KTable<K, V> reduce(final Reducer<V> reducer,
+                        final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized);
+
+    /**
      * Combine the number of records in this stream by the grouped key and the defined windows.
      * Records with {@code null} key or value are ignored.
      * Combining implies that the type of the aggregate result is the same as the type of the input value
@@ -678,7 +767,7 @@ public interface KGroupedStream<K, V> {
      * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
      * query the value of the key on a parallel running instance of your Kafka Streams application.
      *
-     * @param reducer       a {@link Reducer} that computes a new aggregate result
+     * @param reducer       a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param windows       the specification of the aggregation {@link Windows}
      * @param storeSupplier user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@@ -743,7 +832,7 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
      * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     * @param reducer           the instance of {@link Reducer}
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @param queryableStoreName     the name of the state store created from this operation; valid characters are ASCII
      * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}.
@@ -778,7 +867,7 @@ public interface KGroupedStream<K, V> {
      * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
-     * @param reducer           the instance of {@link Reducer}
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
      * the latest (rolling) aggregate for each key within a window
@@ -841,7 +930,7 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
      * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
      * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
-     * @param reducer           the instance of {@link Reducer}
+     * @param reducer           a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}.
      * @param sessionWindows    the specification of the aggregation {@link SessionWindows}
      * @param storeSupplier     user defined state store supplier. Cannot be {@code null}.
      * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent
@@ -905,7 +994,9 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde,
@@ -935,6 +1026,105 @@ public interface KGroupedStream<K, V> {
      * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
      * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
      * <p>
+     * To query the local {@link KeyValueStore} it must be obtained via
+     * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}:
+     * <pre>{@code
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * ReadOnlyKeyValueStore<String,Long> localStore = streams.store(queryableStoreName, QueryableStoreTypes.<String, Long>keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }</pre>
+     * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to
+     * query the value of the key on a parallel running instance of your Kafka Streams application.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII
+     * alphanumerics, '.', '_' and '-'.
+     * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the
+     * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param materialized  an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}.
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> aggregator,
+                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
+
+
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count(String)}).
+     * <p>
+     * The default value serde from config will be used for serializing the result.
+     * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}.
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
+     * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
+     * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
+     * user-specified in {@link StreamsConfig} via parameter
+     * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name
+     * and "-changelog" is a fixed suffix.
+     * Note that the internal store name may not be queriable through Interactive Queries.
+     * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}.
+     *
+     * @param initializer   an {@link Initializer} that computes an initial intermediate aggregation result
+     * @param aggregator    an {@link Aggregator} that computes a new aggregate result
+     * @param <VR>          the value type of the resulting {@link KTable}
+     * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
+     * latest (rolling) aggregate for each key
+     */
+    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                 final Aggregator<? super K, ? super V, VR> aggregator);
+    /**
+     * Aggregate the values of records in this stream by the grouped key.
+     * Records with {@code null} key or value are ignored.
+     * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example,
+     * allows the result to have a different type than the input values.
+     * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view)
+     * that can be queried using the provided {@code queryableStoreName}.
+     * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream.
+     * <p>
+     * The specified {@link Initializer} is applied once directly before the first input record is processed to
+     * provide an initial intermediate aggregation result that is used to process the first record.
+     * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current
+     * aggregate (or for the very first record using the intermediate aggregation result provided via the
+     * {@link Initializer}) and the record's value.
+     * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like
+     * count (c.f. {@link #count(String)}).
+     * <p>
+     * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to
+     * the same key.
+     * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of
+     * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for
+     * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and
+     * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}.
+     * <p>
      * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka.
      * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is
      * user-specified in {@link StreamsConfig} via parameter
@@ -950,7 +1140,9 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final Serde<VR> aggValueSerde);
@@ -999,7 +1191,9 @@ public interface KGroupedStream<K, V> {
      * @param <VR>          the value type of the resulting {@link KTable}
      * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the
      * latest (rolling) aggregate for each key
+     * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)}
      */
+    @Deprecated
     <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                  final Aggregator<? super K, ? super V, VR> aggregator,
                                  final StateStoreSupplier<KeyValueStore> storeSupplier);

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
new file mode 100644
index 0000000..ef0cdfc
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.state.StoreBuilder;
+
+import java.util.Collections;
+import java.util.Set;
+
+class GroupedStreamAggregateBuilder<K, V> {
+    private final InternalStreamsBuilder builder;
+    private final Serde<K> keySerde;
+    private final Serde<V> valueSerde;
+    private final boolean repartitionRequired;
+    private final Set<String> sourceNodes;
+    private final String name;
+
+    GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder,
+                                  final Serde<K> keySerde,
+                                  final Serde<V> valueSerde,
+                                  final boolean repartitionRequired,
+                                  final Set<String> sourceNodes,
+                                  final String name) {
+
+        this.builder = builder;
+        this.keySerde = keySerde;
+        this.valueSerde = valueSerde;
+        this.repartitionRequired = repartitionRequired;
+        this.sourceNodes = sourceNodes;
+        this.name = name;
+    }
+
+    <T> KTable<K, T> build(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+                           final String functionName,
+                           final StoreBuilder storeBuilder,
+                           final boolean isQueryable) {
+        final String aggFunctionName = builder.newName(functionName);
+        final String sourceName = repartitionIfRequired(storeBuilder.name());
+        builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName);
+        builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
+
+        return new KTableImpl<>(
+                builder,
+                aggFunctionName,
+                aggregateSupplier,
+                sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName),
+                storeBuilder.name(),
+                isQueryable);
+    }
+
+    /**
+     * @return the new sourceName if repartitioned. Otherwise the name of this stream
+     */
+    private String repartitionIfRequired(final String queryableStoreName) {
+        if (!repartitionRequired) {
+            return this.name;
+        }
+        return KStreamImpl.createReparitionedSource(builder, keySerde, valueSerde, queryableStoreName, name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
index 57114b5..1fab2c5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java
@@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.kstream.Aggregator;
 import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.SessionWindows;
@@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows;
 import org.apache.kafka.streams.processor.StateStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.SessionStore;
+import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.WindowStore;
 
 import java.util.Collections;
@@ -46,6 +49,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
     private final boolean repartitionRequired;
+    private final Initializer<Long> countInitializer = new Initializer<Long>() {
+        @Override
+        public Long apply() {
+            return 0L;
+        }
+    };
+    private final Aggregator<K, V, Long> countAggregator = new Aggregator<K, V, Long>() {
+        @Override
+        public Long apply(K aggKey, V value, Long aggregate) {
+            return aggregate + 1;
+        }
+    };
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
     private boolean isQueryable = true;
 
     KGroupedStreamImpl(final InternalStreamsBuilder builder,
@@ -55,6 +71,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                        final Serde<V> valSerde,
                        final boolean repartitionRequired) {
         super(builder, name, sourceNodes);
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder,
+                                                                    keySerde,
+                                                                    valSerde,
+                                                                    repartitionRequired,
+                                                                    sourceNodes,
+                                                                    name);
         this.keySerde = keySerde;
         this.valSerde = valSerde;
         this.repartitionRequired = repartitionRequired;
@@ -91,6 +113,19 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                 storeSupplier);
     }
 
+    @Override
+    public KTable<K, V> reduce(final Reducer<V> reducer,
+                               final Materialized<K, V, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(reducer, "reducer can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        final MaterializedInternal<K, V, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        return doAggregate(
+                new KStreamReduce<K, V>(materializedInternal.storeName(), reducer),
+                REDUCE_NAME,
+                materializedInternal);
+    }
+
 
     @Override
     public <W extends Window> KTable<Windowed<K>, V> reduce(final Reducer<V> reducer,
@@ -131,6 +166,41 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     }
 
     @Override
+    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
+                                        final Aggregator<? super K, ? super V, VR> aggregator,
+                                        final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        Objects.requireNonNull(materialized, "materialized can't be null");
+        return aggregateMaterialized(initializer, aggregator, materialized);
+    }
+
+    private <VR> KTable<K, VR> aggregateMaterialized(final Initializer<VR> initializer,
+                                                     final Aggregator<? super K, ? super V, VR> aggregator,
+                                                     final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized) {
+        final MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal
+                = new MaterializedInternal<>(materialized);
+        return doAggregate(
+                new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
+                AGGREGATE_NAME,
+                materializedInternal);
+    }
+
+    @Override
+    public <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer, final Aggregator<? super K, ? super V, VR> aggregator) {
+        Objects.requireNonNull(initializer, "initializer can't be null");
+        Objects.requireNonNull(aggregator, "aggregator can't be null");
+        final String storeName = builder.newStoreName(AGGREGATE_NAME);
+
+        MaterializedInternal<K, VR, KeyValueStore<Bytes, byte[]>> materializedInternal =
+                new MaterializedInternal<>(Materialized.<K, VR, KeyValueStore<Bytes, byte[]>>as(storeName), false);
+        return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator),
+                           AGGREGATE_NAME,
+                           materializedInternal);
+
+    }
+
+    @Override
     public <T> KTable<K, T> aggregate(final Initializer<T> initializer,
                                       final Aggregator<? super K, ? super V, T> aggregator,
                                       final Serde<T> aggValueSerde) {
@@ -198,17 +268,12 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
 
     @Override
     public KTable<K, Long> count(final StateStoreSupplier<KeyValueStore> storeSupplier) {
-        return aggregate(new Initializer<Long>() {
-            @Override
-            public Long apply() {
-                return 0L;
-            }
-        }, new Aggregator<K, V, Long>() {
-            @Override
-            public Long apply(K aggKey, V value, Long aggregate) {
-                return aggregate + 1;
-            }
-        }, storeSupplier);
+        return aggregate(countInitializer, countAggregator, storeSupplier);
+    }
+
+    @Override
+    public KTable<K, Long> count(final Materialized<K, Long, KeyValueStore<Bytes, byte[]>> materialized) {
+        return aggregate(countInitializer, countAggregator, materialized);
     }
 
     @Override
@@ -227,17 +292,8 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
     public <W extends Window> KTable<Windowed<K>, Long> count(final Windows<W> windows,
                                                               final StateStoreSupplier<WindowStore> storeSupplier) {
         return aggregate(
-                new Initializer<Long>() {
-                    @Override
-                    public Long apply() {
-                        return 0L;
-                    }
-                }, new Aggregator<K, V, Long>() {
-                    @Override
-                    public Long apply(K aggKey, V value, Long aggregate) {
-                        return aggregate + 1;
-                    }
-                },
+                countInitializer,
+                countAggregator,
                 windows,
                 storeSupplier);
     }
@@ -320,18 +376,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
                                            final StateStoreSupplier<SessionStore> storeSupplier) {
         Objects.requireNonNull(sessionWindows, "sessionWindows can't be null");
         Objects.requireNonNull(storeSupplier, "storeSupplier can't be null");
-        final Initializer<Long> initializer = new Initializer<Long>() {
-            @Override
-            public Long apply() {
-                return 0L;
-            }
-        };
-        final Aggregator<K, V, Long> aggregator = new Aggregator<K, V, Long>() {
-            @Override
-            public Long apply(final K aggKey, final V value, final Long aggregate) {
-                return aggregate + 1;
-            }
-        };
         final Merger<K, Long> sessionMerger = new Merger<K, Long>() {
             @Override
             public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) {
@@ -339,7 +383,7 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
             }
         };
 
-        return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
+        return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier);
     }
 
 
@@ -397,6 +441,17 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier);
     }
 
+
+    private <T> KTable<K, T> doAggregate(final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
+                                         final String functionName,
+                                         final MaterializedInternal<K, T, KeyValueStore<Bytes, byte[]>> materializedInternal) {
+
+        final StoreBuilder<KeyValueStore<K, T>> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal)
+                .materialize();
+        return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable());
+
+    }
+
     private <T> KTable<K, T> doAggregate(
             final KStreamAggProcessorSupplier<K, ?, V, T> aggregateSupplier,
             final String functionName,
@@ -426,6 +481,6 @@ class KGroupedStreamImpl<K, V> extends AbstractStream<K> implements KGroupedStre
         if (!repartitionRequired) {
             return this.name;
         }
-        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
+        return KStreamImpl.createReparitionedSource(builder, keySerde, valSerde, queryableStoreName, name);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
index 7201a00..6ebbd14 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java
@@ -605,37 +605,38 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V
      */
     private KStreamImpl<K, V> repartitionForJoin(final Serde<K> keySerde,
                                                  final Serde<V> valSerde) {
-        String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null);
+        String repartitionedSourceName = createReparitionedSource(builder, keySerde, valSerde, null, name);
         return new KStreamImpl<>(builder, repartitionedSourceName, Collections
             .singleton(repartitionedSourceName), false);
     }
 
-    static <K1, V1> String createReparitionedSource(final AbstractStream<K1> stream,
+    static <K1, V1> String createReparitionedSource(final InternalStreamsBuilder builder,
                                                     final Serde<K1> keySerde,
                                                     final Serde<V1> valSerde,
-                                                    final String topicNamePrefix) {
+                                                    final String topicNamePrefix,
+                                                    final String name) {
         Serializer<K1> keySerializer = keySerde != null ? keySerde.serializer() : null;
         Serializer<V1> valSerializer = valSerde != null ? valSerde.serializer() : null;
         Deserializer<K1> keyDeserializer = keySerde != null ? keySerde.deserializer() : null;
         Deserializer<V1> valDeserializer = valSerde != null ? valSerde.deserializer() : null;
-        String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name;
+        String baseName = topicNamePrefix != null ? topicNamePrefix : name;
 
         String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX;
-        String sinkName = stream.builder.newName(SINK_NAME);
-        String filterName = stream.builder.newName(FILTER_NAME);
-        String sourceName = stream.builder.newName(SOURCE_NAME);
+        String sinkName = builder.newName(SINK_NAME);
+        String filterName = builder.newName(FILTER_NAME);
+        String sourceName = builder.newName(SOURCE_NAME);
 
-        stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
-        stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
+        builder.internalTopologyBuilder.addInternalTopic(repartitionTopic);
+        builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate<K1, V1>() {
             @Override
             public boolean test(final K1 key, final V1 value) {
                 return key != null;
             }
-        }, false), stream.name);
+        }, false), name);
 
-        stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
+        builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer,
             null, filterName);
-        stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
+        builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(),
             keyDeserializer, valDeserializer, repartitionTopic);
 
         return sourceName;

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
index d7ebc65..0ee610f 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java
@@ -25,8 +25,15 @@ import java.util.Map;
 
 public class MaterializedInternal<K, V, S extends StateStore> extends Materialized<K, V, S> {
 
+    private final boolean queryable;
+
     public MaterializedInternal(final Materialized<K, V, S> materialized) {
+        this(materialized, true);
+    }
+
+    MaterializedInternal(final Materialized<K, V, S> materialized, final boolean queryable) {
         super(materialized);
+        this.queryable = queryable;
     }
 
     public String storeName() {
@@ -56,7 +63,11 @@ public class MaterializedInternal<K, V, S extends StateStore> extends Materializ
         return topicConfig;
     }
 
-    public boolean cachingEnabled() {
+    boolean cachingEnabled() {
         return cachingEnabled;
     }
+
+    public boolean isQueryable() {
+        return queryable;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
index 28666b8..b6e38f1 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.WindowStore;
 
-import java.util.Collections;
 import java.util.Objects;
 import java.util.Set;
 
@@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU
 public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<K> implements WindowedKStream<K, V> {
 
     private final Windows<W> windows;
-    private final boolean repartitionRequired;
     private final Serde<K> keySerde;
     private final Serde<V> valSerde;
+    private final GroupedStreamAggregateBuilder<K, V> aggregateBuilder;
 
     WindowedKStreamImpl(final Windows<W> windows,
                         final InternalStreamsBuilder builder,
@@ -55,8 +54,8 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
         Objects.requireNonNull(windows, "windows can't be null");
         this.valSerde = valSerde;
         this.keySerde = keySerde;
-        this.repartitionRequired = repartitionRequired;
         this.windows = windows;
+        this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name);
     }
 
     @Override
@@ -76,38 +75,34 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
                 Serdes.Long());
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public <VR> KTable<Windowed<K>, VR> aggregate(final Initializer<VR> initializer,
                                                   final Aggregator<? super K, ? super V, VR> aggregator,
                                                   final Serde<VR> aggValueSerde) {
         Objects.requireNonNull(initializer, "initializer can't be null");
         Objects.requireNonNull(aggregator, "aggregator can't be null");
-        final String aggFunctionName = builder.newName(AGGREGATE_NAME);
         final String storeName = builder.newStoreName(AGGREGATE_NAME);
-        return doAggregate(aggValueSerde,
-                           aggFunctionName,
-                           storeName,
-                           new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator));
+        return (KTable<Windowed<K>, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator),
+                                                                AGGREGATE_NAME,
+                                                                windowStoreBuilder(storeName, aggValueSerde),
+                                                                false);
     }
 
+    @SuppressWarnings("unchecked")
     @Override
     public KTable<Windowed<K>, V> reduce(final Reducer<V> reducer) {
         Objects.requireNonNull(reducer, "reducer can't be null");
         final String storeName = builder.newStoreName(REDUCE_NAME);
-        return doAggregate(valSerde,
-                           builder.newName(REDUCE_NAME),
-                           storeName,
-                           new KStreamWindowReduce<>(windows, storeName, reducer));
+        return (KTable<Windowed<K>, V>) aggregateBuilder.build(new KStreamWindowReduce<K, V, W>(windows, storeName, reducer),
+                                                               REDUCE_NAME,
+                                                               windowStoreBuilder(storeName, valSerde),
+                                                               false);
 
     }
 
-    @SuppressWarnings("unchecked")
-    private <VR> KTable<Windowed<K>, VR> doAggregate(final Serde<VR> aggValueSerde,
-                                                     final String aggFunctionName,
-                                                     final String storeName,
-                                                     final KStreamAggProcessorSupplier aggSupplier) {
-        final String sourceName = repartitionIfRequired(storeName);
-        final StoreBuilder<WindowStore<K, VR>> storeBuilder = Stores.windowStoreBuilder(
+    private <VR> StoreBuilder<WindowStore<K, VR>> windowStoreBuilder(final String storeName, final Serde<VR> aggValueSerde) {
+        return Stores.windowStoreBuilder(
                 Stores.persistentWindowStore(
                         storeName,
                         windows.maintainMs(),
@@ -115,29 +110,7 @@ public class WindowedKStreamImpl<K, V, W extends Window> extends AbstractStream<
                         windows.size(),
                         false),
                 keySerde,
-                aggValueSerde)
-                .withCachingEnabled();
-
-        builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName);
-        builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName);
-
-        return new KTableImpl<>(
-                builder,
-                aggFunctionName,
-                aggSupplier,
-                sourceName.equals(this.name) ? sourceNodes
-                        : Collections.singleton(sourceName),
-                storeName,
-                false);
+                aggValueSerde).withCachingEnabled();
     }
 
-    /**
-     * @return the new sourceName if repartitioned. Otherwise the name of this stream
-     */
-    private String repartitionIfRequired(final String queryableStoreName) {
-        if (!repartitionRequired) {
-            return this.name;
-        }
-        return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName);
-    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
index bc65e09..efa027c 100644
--- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.Consumed;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer;
 import org.apache.kafka.streams.kstream.KGroupedStream;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Merger;
 import org.apache.kafka.streams.kstream.Reducer;
 import org.apache.kafka.streams.kstream.Serialized;
@@ -486,6 +488,110 @@ public class KGroupedStreamImplTest {
         groupedStream.count(SessionWindows.with(90), (StateStoreSupplier<SessionStore>) null);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() {
+        groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() {
+        groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test(expected = NullPointerException.class)
+    public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() {
+        groupedStream.count((Materialized) null);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldCountAndMaterializeResults() {
+        groupedStream.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count")
+                                    .withKeySerde(Serdes.String())
+                                    .withValueSerde(Serdes.Long()));
+
+        processData();
+
+        final KeyValueStore<String, Long> count = (KeyValueStore<String, Long>) driver.allStateStores().get("count");
+
+        assertThat(count.get("1"), equalTo(3L));
+        assertThat(count.get("2"), equalTo(1L));
+        assertThat(count.get("3"), equalTo(2L));
+    }
+
+
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldReduceAndMaterializeResults() {
+        groupedStream.reduce(MockReducer.STRING_ADDER,
+                             Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("reduce")
+                                    .withKeySerde(Serdes.String())
+                                    .withValueSerde(Serdes.String()));
+
+        processData();
+
+        final KeyValueStore<String, String> reduced = (KeyValueStore<String, String>) driver.allStateStores().get("reduce");
+
+        assertThat(reduced.get("1"), equalTo("A+C+D"));
+        assertThat(reduced.get("2"), equalTo("B"));
+        assertThat(reduced.get("3"), equalTo("E+F"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAggregateAndMaterializeResults() {
+        groupedStream.aggregate(MockInitializer.STRING_INIT,
+                                MockAggregator.TOSTRING_ADDER,
+                                Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("aggregate")
+                                        .withKeySerde(Serdes.String())
+                                        .withValueSerde(Serdes.String()));
+
+        processData();
+
+        final KeyValueStore<String, String> aggregate = (KeyValueStore<String, String>) driver.allStateStores().get("aggregate");
+
+        assertThat(aggregate.get("1"), equalTo("0+A+C+D"));
+        assertThat(aggregate.get("2"), equalTo("0+B"));
+        assertThat(aggregate.get("3"), equalTo("0+E+F"));
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldAggregateWithDefaultSerdes() {
+        final Map<String, String> results = new HashMap<>();
+        groupedStream.aggregate(MockInitializer.STRING_INIT,
+                                MockAggregator.TOSTRING_ADDER)
+                .toStream()
+                .foreach(new ForeachAction<String, String>() {
+                    @Override
+                    public void apply(final String key, final String value) {
+                        results.put(key, value);
+                    }
+                });
+
+        processData();
+
+        assertThat(results.get("1"), equalTo("0+A+C+D"));
+        assertThat(results.get("2"), equalTo("0+B"));
+        assertThat(results.get("3"), equalTo("0+E+F"));
+    }
+
+    private void processData() {
+        driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0);
+        driver.setTime(0);
+        driver.process(TOPIC, "1", "A");
+        driver.process(TOPIC, "2", "B");
+        driver.process(TOPIC, "1", "C");
+        driver.process(TOPIC, "1", "D");
+        driver.process(TOPIC, "3", "E");
+        driver.process(TOPIC, "3", "F");
+        driver.flushState();
+    }
+
     private void doCountWindowed(final List<KeyValue<Windowed<String>, Long>> results) {
         driver.setUp(builder, TestUtils.tempDirectory(), 0);
         driver.setTime(0);


Mime
View raw message