Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 284B5200CF6 for ; Mon, 18 Sep 2017 12:54:24 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 26D011609DB; Mon, 18 Sep 2017 10:54:24 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 47FCF1609D8 for ; Mon, 18 Sep 2017 12:54:22 +0200 (CEST) Received: (qmail 50583 invoked by uid 500); 18 Sep 2017 10:54:21 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 50574 invoked by uid 99); 18 Sep 2017 10:54:21 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 18 Sep 2017 10:54:21 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4F9EBE08F0; Mon, 18 Sep 2017 10:54:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: damianguy@apache.org To: commits@kafka.apache.org Message-Id: <8400ef0e964d46929eeec2c0b209aded@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream Date: Mon, 18 Sep 2017 10:54:19 +0000 (UTC) archived-at: Mon, 18 Sep 2017 10:54:24 -0000 Repository: kafka Updated Branches: refs/heads/trunk 346d0ca53 -> d83252eba KAFKA-5654; add materialized count, reduce, aggregate to KGroupedStream Add overloads of `count`, `reduce`, and `aggregate` that are `Materialized` to `KGroupedStream`. Refactor common parts between `KGroupedStream` and `WindowedKStream` Author: Damian Guy Reviewers: Matthias J. Sax , Guozhang Wang Closes #3827 from dguy/kafka-5654 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d83252eb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d83252eb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d83252eb Branch: refs/heads/trunk Commit: d83252ebaeeca5bf19584908d95b424beb31b12e Parents: 346d0ca Author: Damian Guy Authored: Mon Sep 18 11:54:14 2017 +0100 Committer: Damian Guy Committed: Mon Sep 18 11:54:14 2017 +0100 ---------------------------------------------------------------------- .../kafka/streams/kstream/KGroupedStream.java | 210 ++++++++++++++++++- .../GroupedStreamAggregateBuilder.java | 76 +++++++ .../kstream/internals/KGroupedStreamImpl.java | 127 +++++++---- .../streams/kstream/internals/KStreamImpl.java | 25 +-- .../kstream/internals/MaterializedInternal.java | 13 +- .../kstream/internals/WindowedKStreamImpl.java | 57 ++--- .../internals/KGroupedStreamImplTest.java | 106 ++++++++++ 7 files changed, 515 insertions(+), 99 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java index f12c2b2..08916ef 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KGroupedStream.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsConfig; @@ -146,6 +147,38 @@ public interface KGroupedStream { KTable count(final StateStoreSupplier storeSupplier); /** + * Count the number of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // counting words
+     * String queryableStoreName = "count-store"; // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-word";
+     * Long countForWord = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys and {@link Long} values that + * represent the latest (rolling) count (i.e., number of records) for each key + */ + KTable count(final Materialized> materialized); + + /** * Count the number of records in this stream by the grouped key and the defined windows. * Records with {@code null} key or value are ignored. * The specified {@code windows} define either hopping time windows that can be overlapping or tumbling (c.f. @@ -395,7 +428,7 @@ public interface KGroupedStream { * and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key */ @@ -452,12 +485,14 @@ public interface KGroupedStream { * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. * - * @param reducer a {@link Reducer} that computes a new aggregate result - * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param queryableStoreName the name of the underlying {@link KTable} state store; valid characters are ASCII * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer)} ()}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprectated use {@link #reduce(Reducer, Materialized)} */ + @Deprecated KTable reduce(final Reducer reducer, final String queryableStoreName); @@ -507,15 +542,69 @@ public interface KGroupedStream { * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprectated use {@link #reduce(Reducer, Materialized)} */ + @Deprecated KTable reduce(final Reducer reducer, final StateStoreSupplier storeSupplier); /** + * Combine the value of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Combining implies that the type of the aggregate result is the same as the type of the input value + * (c.f. {@link #aggregate(Initializer, Aggregator, Materialized)}). + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * provided by the given {@code storeSupplier}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Reducer} is applied for each input record and computes a new aggregate using the current + * aggregate (first argument) and the record's value (second argument): + *

{@code
+     * // At the example of a Reducer
+     * new Reducer() {
+     *   @Override
+     *   public Long apply(Long aggValue, Long currValue) {
+     *     return aggValue + currValue;
+     *   }
+     * }
+ *

+ * If there is no current aggregate the {@link Reducer} is not applied and the new aggregate will be the record's + * value as-is. + * Thus, {@code reduce(Reducer, StateStoreSupplier)} can be used to compute aggregate functions like sum, min, or + * max. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}. + *

{@code
+     * KafkaStreams streams = ... // compute sum
+     * String queryableStoreName = "storeName" // the queryableStoreName should be the name of the store as defined by the Materialized instance
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-key";
+     * Long sumForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + * + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable reduce(final Reducer reducer, + final Materialized> materialized); + + /** * Combine the number of records in this stream by the grouped key and the defined windows. * Records with {@code null} key or value are ignored. * Combining implies that the type of the aggregate result is the same as the type of the input value @@ -678,7 +767,7 @@ public interface KGroupedStream { * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to * query the value of the key on a parallel running instance of your Kafka Streams application. * - * @param reducer a {@link Reducer} that computes a new aggregate result + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param windows the specification of the aggregation {@link Windows} * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent @@ -743,7 +832,7 @@ public interface KGroupedStream { * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @param queryableStoreName the name of the state store created from this operation; valid characters are ASCII * alphanumerics, '.', '_' and '-'. If {@code null} then this will be equivalent to {@link KGroupedStream#reduce(Reducer, SessionWindows)} ()}. @@ -778,7 +867,7 @@ public interface KGroupedStream { * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent * the latest (rolling) aggregate for each key within a window @@ -841,7 +930,7 @@ public interface KGroupedStream { * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. - * @param reducer the instance of {@link Reducer} + * @param reducer a {@link Reducer} that computes a new aggregate result. Cannot be {@code null}. * @param sessionWindows the specification of the aggregation {@link SessionWindows} * @param storeSupplier user defined state store supplier. Cannot be {@code null}. * @return a windowed {@link KTable} that contains "update" records with unmodified keys, and values that represent @@ -905,7 +994,9 @@ public interface KGroupedStream { * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated KTable aggregate(final Initializer initializer, final Aggregator aggregator, final Serde aggValueSerde, @@ -935,6 +1026,105 @@ public interface KGroupedStream { * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. *

+ * To query the local {@link KeyValueStore} it must be obtained via + * {@link KafkaStreams#store(String, QueryableStoreType) KafkaStreams#store(...)}: + *

{@code
+     * KafkaStreams streams = ... // some aggregation on value type double
+     * ReadOnlyKeyValueStore localStore = streams.store(queryableStoreName, QueryableStoreTypes.keyValueStore());
+     * String key = "some-key";
+     * Long aggForKey = localStore.get(key); // key must be local (application state is shared over all running Kafka Streams instances)
+     * }
+ * For non-local keys, a custom RPC mechanism must be implemented using {@link KafkaStreams#allMetadata()} to + * query the value of the key on a parallel running instance of your Kafka Streams application. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * Therefore, the store name must be a valid Kafka topic name and cannot contain characters other than ASCII + * alphanumerics, '.', '_' and '-'. + * The changelog topic will be named "${applicationId}-${queryableStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "queryableStoreName" is the + * provide {@code queryableStoreName}, and "-changelog" is a fixed suffix. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param materialized an instance of {@link Materialized} used to materialize a state store. Cannot be {@code null}. + * @param the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized); + + + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator)} can be used to compute aggregate functions like + * count (c.f. {@link #count(String)}). + *

+ * The default value serde from config will be used for serializing the result. + * If a different serde is required then you should use {@link #aggregate(Initializer, Aggregator, Materialized)}. + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

+ * For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. + * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is + * user-specified in {@link StreamsConfig} via parameter + * {@link StreamsConfig#APPLICATION_ID_CONFIG APPLICATION_ID_CONFIG}, "internalStoreName" is an internal name + * and "-changelog" is a fixed suffix. + * Note that the internal store name may not be queriable through Interactive Queries. + * You can retrieve all generated internal topic names via {@link KafkaStreams#toString()}. + * + * @param initializer an {@link Initializer} that computes an initial intermediate aggregation result + * @param aggregator an {@link Aggregator} that computes a new aggregate result + * @param the value type of the resulting {@link KTable} + * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the + * latest (rolling) aggregate for each key + */ + KTable aggregate(final Initializer initializer, + final Aggregator aggregator); + /** + * Aggregate the values of records in this stream by the grouped key. + * Records with {@code null} key or value are ignored. + * Aggregating is a generalization of {@link #reduce(Reducer, String) combining via reduce(...)} as it, for example, + * allows the result to have a different type than the input values. + * The result is written into a local {@link KeyValueStore} (which is basically an ever-updating materialized view) + * that can be queried using the provided {@code queryableStoreName}. + * Furthermore, updates to the store are sent downstream into a {@link KTable} changelog stream. + *

+ * The specified {@link Initializer} is applied once directly before the first input record is processed to + * provide an initial intermediate aggregation result that is used to process the first record. + * The specified {@link Aggregator} is applied for each input record and computes a new aggregate using the current + * aggregate (or for the very first record using the intermediate aggregation result provided via the + * {@link Initializer}) and the record's value. + * Thus, {@code aggregate(Initializer, Aggregator, Serde, String)} can be used to compute aggregate functions like + * count (c.f. {@link #count(String)}). + *

+ * Not all updates might get sent downstream, as an internal cache is used to deduplicate consecutive updates to + * the same key. + * The rate of propagated updates depends on your input data rate, the number of distinct keys, the number of + * parallel running Kafka Streams instances, and the {@link StreamsConfig configuration} parameters for + * {@link StreamsConfig#CACHE_MAX_BYTES_BUFFERING_CONFIG cache size}, and + * {@link StreamsConfig#COMMIT_INTERVAL_MS_CONFIG commit intervall}. + *

* For failure and recovery the store will be backed by an internal changelog topic that will be created in Kafka. * The changelog topic will be named "${applicationId}-${internalStoreName}-changelog", where "applicationId" is * user-specified in {@link StreamsConfig} via parameter @@ -950,7 +1140,9 @@ public interface KGroupedStream { * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated KTable aggregate(final Initializer initializer, final Aggregator aggregator, final Serde aggValueSerde); @@ -999,7 +1191,9 @@ public interface KGroupedStream { * @param the value type of the resulting {@link KTable} * @return a {@link KTable} that contains "update" records with unmodified keys, and values that represent the * latest (rolling) aggregate for each key + * @deprecated use {@link #aggregate(Initializer, Aggregator, Materialized)} */ + @Deprecated KTable aggregate(final Initializer initializer, final Aggregator aggregator, final StateStoreSupplier storeSupplier); http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java new file mode 100644 index 0000000..ef0cdfc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/GroupedStreamAggregateBuilder.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.state.StoreBuilder; + +import java.util.Collections; +import java.util.Set; + +class GroupedStreamAggregateBuilder { + private final InternalStreamsBuilder builder; + private final Serde keySerde; + private final Serde valueSerde; + private final boolean repartitionRequired; + private final Set sourceNodes; + private final String name; + + GroupedStreamAggregateBuilder(final InternalStreamsBuilder builder, + final Serde keySerde, + final Serde valueSerde, + final boolean repartitionRequired, + final Set sourceNodes, + final String name) { + + this.builder = builder; + this.keySerde = keySerde; + this.valueSerde = valueSerde; + this.repartitionRequired = repartitionRequired; + this.sourceNodes = sourceNodes; + this.name = name; + } + + KTable build(final KStreamAggProcessorSupplier aggregateSupplier, + final String functionName, + final StoreBuilder storeBuilder, + final boolean isQueryable) { + final String aggFunctionName = builder.newName(functionName); + final String sourceName = repartitionIfRequired(storeBuilder.name()); + builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggregateSupplier, sourceName); + builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName); + + return new KTableImpl<>( + builder, + aggFunctionName, + aggregateSupplier, + sourceName.equals(this.name) ? sourceNodes : Collections.singleton(sourceName), + storeBuilder.name(), + isQueryable); + } + + /** + * @return the new sourceName if repartitioned. Otherwise the name of this stream + */ + private String repartitionIfRequired(final String queryableStoreName) { + if (!repartitionRequired) { + return this.name; + } + return KStreamImpl.createReparitionedSource(builder, keySerde, valueSerde, queryableStoreName, name); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java index 57114b5..1fab2c5 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImpl.java @@ -18,10 +18,12 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.kstream.Aggregator; import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.SessionWindows; @@ -32,6 +34,7 @@ import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.SessionStore; +import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.WindowStore; import java.util.Collections; @@ -46,6 +49,19 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre private final Serde keySerde; private final Serde valSerde; private final boolean repartitionRequired; + private final Initializer countInitializer = new Initializer() { + @Override + public Long apply() { + return 0L; + } + }; + private final Aggregator countAggregator = new Aggregator() { + @Override + public Long apply(K aggKey, V value, Long aggregate) { + return aggregate + 1; + } + }; + private final GroupedStreamAggregateBuilder aggregateBuilder; private boolean isQueryable = true; KGroupedStreamImpl(final InternalStreamsBuilder builder, @@ -55,6 +71,12 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre final Serde valSerde, final boolean repartitionRequired) { super(builder, name, sourceNodes); + this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, + keySerde, + valSerde, + repartitionRequired, + sourceNodes, + name); this.keySerde = keySerde; this.valSerde = valSerde; this.repartitionRequired = repartitionRequired; @@ -91,6 +113,19 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre storeSupplier); } + @Override + public KTable reduce(final Reducer reducer, + final Materialized> materialized) { + Objects.requireNonNull(reducer, "reducer can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + final MaterializedInternal> materializedInternal + = new MaterializedInternal<>(materialized); + return doAggregate( + new KStreamReduce(materializedInternal.storeName(), reducer), + REDUCE_NAME, + materializedInternal); + } + @Override public KTable, V> reduce(final Reducer reducer, @@ -131,6 +166,41 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre } @Override + public KTable aggregate(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized) { + Objects.requireNonNull(initializer, "initializer can't be null"); + Objects.requireNonNull(aggregator, "aggregator can't be null"); + Objects.requireNonNull(materialized, "materialized can't be null"); + return aggregateMaterialized(initializer, aggregator, materialized); + } + + private KTable aggregateMaterialized(final Initializer initializer, + final Aggregator aggregator, + final Materialized> materialized) { + final MaterializedInternal> materializedInternal + = new MaterializedInternal<>(materialized); + return doAggregate( + new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), + AGGREGATE_NAME, + materializedInternal); + } + + @Override + public KTable aggregate(final Initializer initializer, final Aggregator aggregator) { + Objects.requireNonNull(initializer, "initializer can't be null"); + Objects.requireNonNull(aggregator, "aggregator can't be null"); + final String storeName = builder.newStoreName(AGGREGATE_NAME); + + MaterializedInternal> materializedInternal = + new MaterializedInternal<>(Materialized.>as(storeName), false); + return doAggregate(new KStreamAggregate<>(materializedInternal.storeName(), initializer, aggregator), + AGGREGATE_NAME, + materializedInternal); + + } + + @Override public KTable aggregate(final Initializer initializer, final Aggregator aggregator, final Serde aggValueSerde) { @@ -198,17 +268,12 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre @Override public KTable count(final StateStoreSupplier storeSupplier) { - return aggregate(new Initializer() { - @Override - public Long apply() { - return 0L; - } - }, new Aggregator() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1; - } - }, storeSupplier); + return aggregate(countInitializer, countAggregator, storeSupplier); + } + + @Override + public KTable count(final Materialized> materialized) { + return aggregate(countInitializer, countAggregator, materialized); } @Override @@ -227,17 +292,8 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre public KTable, Long> count(final Windows windows, final StateStoreSupplier storeSupplier) { return aggregate( - new Initializer() { - @Override - public Long apply() { - return 0L; - } - }, new Aggregator() { - @Override - public Long apply(K aggKey, V value, Long aggregate) { - return aggregate + 1; - } - }, + countInitializer, + countAggregator, windows, storeSupplier); } @@ -320,18 +376,6 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre final StateStoreSupplier storeSupplier) { Objects.requireNonNull(sessionWindows, "sessionWindows can't be null"); Objects.requireNonNull(storeSupplier, "storeSupplier can't be null"); - final Initializer initializer = new Initializer() { - @Override - public Long apply() { - return 0L; - } - }; - final Aggregator aggregator = new Aggregator() { - @Override - public Long apply(final K aggKey, final V value, final Long aggregate) { - return aggregate + 1; - } - }; final Merger sessionMerger = new Merger() { @Override public Long apply(final K aggKey, final Long aggOne, final Long aggTwo) { @@ -339,7 +383,7 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre } }; - return aggregate(initializer, aggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier); + return aggregate(countInitializer, countAggregator, sessionMerger, sessionWindows, Serdes.Long(), storeSupplier); } @@ -397,6 +441,17 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre return aggregate(initializer, aggregator, sessionMerger, sessionWindows, valSerde, storeSupplier); } + + private KTable doAggregate(final KStreamAggProcessorSupplier aggregateSupplier, + final String functionName, + final MaterializedInternal> materializedInternal) { + + final StoreBuilder> storeBuilder = new KeyValueStoreMaterializer<>(materializedInternal) + .materialize(); + return aggregateBuilder.build(aggregateSupplier, functionName, storeBuilder, materializedInternal.isQueryable()); + + } + private KTable doAggregate( final KStreamAggProcessorSupplier aggregateSupplier, final String functionName, @@ -426,6 +481,6 @@ class KGroupedStreamImpl extends AbstractStream implements KGroupedStre if (!repartitionRequired) { return this.name; } - return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName); + return KStreamImpl.createReparitionedSource(builder, keySerde, valSerde, queryableStoreName, name); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 7201a00..6ebbd14 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -605,37 +605,38 @@ public class KStreamImpl extends AbstractStream implements KStream repartitionForJoin(final Serde keySerde, final Serde valSerde) { - String repartitionedSourceName = createReparitionedSource(this, keySerde, valSerde, null); + String repartitionedSourceName = createReparitionedSource(builder, keySerde, valSerde, null, name); return new KStreamImpl<>(builder, repartitionedSourceName, Collections .singleton(repartitionedSourceName), false); } - static String createReparitionedSource(final AbstractStream stream, + static String createReparitionedSource(final InternalStreamsBuilder builder, final Serde keySerde, final Serde valSerde, - final String topicNamePrefix) { + final String topicNamePrefix, + final String name) { Serializer keySerializer = keySerde != null ? keySerde.serializer() : null; Serializer valSerializer = valSerde != null ? valSerde.serializer() : null; Deserializer keyDeserializer = keySerde != null ? keySerde.deserializer() : null; Deserializer valDeserializer = valSerde != null ? valSerde.deserializer() : null; - String baseName = topicNamePrefix != null ? topicNamePrefix : stream.name; + String baseName = topicNamePrefix != null ? topicNamePrefix : name; String repartitionTopic = baseName + REPARTITION_TOPIC_SUFFIX; - String sinkName = stream.builder.newName(SINK_NAME); - String filterName = stream.builder.newName(FILTER_NAME); - String sourceName = stream.builder.newName(SOURCE_NAME); + String sinkName = builder.newName(SINK_NAME); + String filterName = builder.newName(FILTER_NAME); + String sourceName = builder.newName(SOURCE_NAME); - stream.builder.internalTopologyBuilder.addInternalTopic(repartitionTopic); - stream.builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate() { + builder.internalTopologyBuilder.addInternalTopic(repartitionTopic); + builder.internalTopologyBuilder.addProcessor(filterName, new KStreamFilter<>(new Predicate() { @Override public boolean test(final K1 key, final V1 value) { return key != null; } - }, false), stream.name); + }, false), name); - stream.builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, + builder.internalTopologyBuilder.addSink(sinkName, repartitionTopic, keySerializer, valSerializer, null, filterName); - stream.builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), + builder.internalTopologyBuilder.addSource(null, sourceName, new FailOnInvalidTimestamp(), keyDeserializer, valDeserializer, repartitionTopic); return sourceName; http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java index d7ebc65..0ee610f 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/MaterializedInternal.java @@ -25,8 +25,15 @@ import java.util.Map; public class MaterializedInternal extends Materialized { + private final boolean queryable; + public MaterializedInternal(final Materialized materialized) { + this(materialized, true); + } + + MaterializedInternal(final Materialized materialized, final boolean queryable) { super(materialized); + this.queryable = queryable; } public String storeName() { @@ -56,7 +63,11 @@ public class MaterializedInternal extends Materializ return topicConfig; } - public boolean cachingEnabled() { + boolean cachingEnabled() { return cachingEnabled; } + + public boolean isQueryable() { + return queryable; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java index 28666b8..b6e38f1 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedKStreamImpl.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.Stores; import org.apache.kafka.streams.state.WindowStore; -import java.util.Collections; import java.util.Objects; import java.util.Set; @@ -40,9 +39,9 @@ import static org.apache.kafka.streams.kstream.internals.KGroupedStreamImpl.REDU public class WindowedKStreamImpl extends AbstractStream implements WindowedKStream { private final Windows windows; - private final boolean repartitionRequired; private final Serde keySerde; private final Serde valSerde; + private final GroupedStreamAggregateBuilder aggregateBuilder; WindowedKStreamImpl(final Windows windows, final InternalStreamsBuilder builder, @@ -55,8 +54,8 @@ public class WindowedKStreamImpl extends AbstractStream< Objects.requireNonNull(windows, "windows can't be null"); this.valSerde = valSerde; this.keySerde = keySerde; - this.repartitionRequired = repartitionRequired; this.windows = windows; + this.aggregateBuilder = new GroupedStreamAggregateBuilder<>(builder, keySerde, valSerde, repartitionRequired, sourceNodes, name); } @Override @@ -76,38 +75,34 @@ public class WindowedKStreamImpl extends AbstractStream< Serdes.Long()); } + @SuppressWarnings("unchecked") @Override public KTable, VR> aggregate(final Initializer initializer, final Aggregator aggregator, final Serde aggValueSerde) { Objects.requireNonNull(initializer, "initializer can't be null"); Objects.requireNonNull(aggregator, "aggregator can't be null"); - final String aggFunctionName = builder.newName(AGGREGATE_NAME); final String storeName = builder.newStoreName(AGGREGATE_NAME); - return doAggregate(aggValueSerde, - aggFunctionName, - storeName, - new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator)); + return (KTable, VR>) aggregateBuilder.build(new KStreamWindowAggregate<>(windows, storeName, initializer, aggregator), + AGGREGATE_NAME, + windowStoreBuilder(storeName, aggValueSerde), + false); } + @SuppressWarnings("unchecked") @Override public KTable, V> reduce(final Reducer reducer) { Objects.requireNonNull(reducer, "reducer can't be null"); final String storeName = builder.newStoreName(REDUCE_NAME); - return doAggregate(valSerde, - builder.newName(REDUCE_NAME), - storeName, - new KStreamWindowReduce<>(windows, storeName, reducer)); + return (KTable, V>) aggregateBuilder.build(new KStreamWindowReduce(windows, storeName, reducer), + REDUCE_NAME, + windowStoreBuilder(storeName, valSerde), + false); } - @SuppressWarnings("unchecked") - private KTable, VR> doAggregate(final Serde aggValueSerde, - final String aggFunctionName, - final String storeName, - final KStreamAggProcessorSupplier aggSupplier) { - final String sourceName = repartitionIfRequired(storeName); - final StoreBuilder> storeBuilder = Stores.windowStoreBuilder( + private StoreBuilder> windowStoreBuilder(final String storeName, final Serde aggValueSerde) { + return Stores.windowStoreBuilder( Stores.persistentWindowStore( storeName, windows.maintainMs(), @@ -115,29 +110,7 @@ public class WindowedKStreamImpl extends AbstractStream< windows.size(), false), keySerde, - aggValueSerde) - .withCachingEnabled(); - - builder.internalTopologyBuilder.addProcessor(aggFunctionName, aggSupplier, sourceName); - builder.internalTopologyBuilder.addStateStore(storeBuilder, aggFunctionName); - - return new KTableImpl<>( - builder, - aggFunctionName, - aggSupplier, - sourceName.equals(this.name) ? sourceNodes - : Collections.singleton(sourceName), - storeName, - false); + aggValueSerde).withCachingEnabled(); } - /** - * @return the new sourceName if repartitioned. Otherwise the name of this stream - */ - private String repartitionIfRequired(final String queryableStoreName) { - if (!repartitionRequired) { - return this.name; - } - return KStreamImpl.createReparitionedSource(this, keySerde, valSerde, queryableStoreName); - } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d83252eb/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java index bc65e09..efa027c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedStreamImplTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.Consumed; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; @@ -27,6 +28,7 @@ import org.apache.kafka.streams.kstream.Initializer; import org.apache.kafka.streams.kstream.KGroupedStream; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.Merger; import org.apache.kafka.streams.kstream.Reducer; import org.apache.kafka.streams.kstream.Serialized; @@ -486,6 +488,110 @@ public class KGroupedStreamImplTest { groupedStream.count(SessionWindows.with(90), (StateStoreSupplier) null); } + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnReduceWhenMaterializedIsNull() { + groupedStream.reduce(MockReducer.STRING_ADDER, (Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnAggregateWhenMaterializedIsNull() { + groupedStream.aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, (Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test(expected = NullPointerException.class) + public void shouldThrowNullPointerOnCountWhenMaterializedIsNull() { + groupedStream.count((Materialized) null); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldCountAndMaterializeResults() { + groupedStream.count(Materialized.>as("count") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.Long())); + + processData(); + + final KeyValueStore count = (KeyValueStore) driver.allStateStores().get("count"); + + assertThat(count.get("1"), equalTo(3L)); + assertThat(count.get("2"), equalTo(1L)); + assertThat(count.get("3"), equalTo(2L)); + } + + + + @SuppressWarnings("unchecked") + @Test + public void shouldReduceAndMaterializeResults() { + groupedStream.reduce(MockReducer.STRING_ADDER, + Materialized.>as("reduce") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())); + + processData(); + + final KeyValueStore reduced = (KeyValueStore) driver.allStateStores().get("reduce"); + + assertThat(reduced.get("1"), equalTo("A+C+D")); + assertThat(reduced.get("2"), equalTo("B")); + assertThat(reduced.get("3"), equalTo("E+F")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAggregateAndMaterializeResults() { + groupedStream.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER, + Materialized.>as("aggregate") + .withKeySerde(Serdes.String()) + .withValueSerde(Serdes.String())); + + processData(); + + final KeyValueStore aggregate = (KeyValueStore) driver.allStateStores().get("aggregate"); + + assertThat(aggregate.get("1"), equalTo("0+A+C+D")); + assertThat(aggregate.get("2"), equalTo("0+B")); + assertThat(aggregate.get("3"), equalTo("0+E+F")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAggregateWithDefaultSerdes() { + final Map results = new HashMap<>(); + groupedStream.aggregate(MockInitializer.STRING_INIT, + MockAggregator.TOSTRING_ADDER) + .toStream() + .foreach(new ForeachAction() { + @Override + public void apply(final String key, final String value) { + results.put(key, value); + } + }); + + processData(); + + assertThat(results.get("1"), equalTo("0+A+C+D")); + assertThat(results.get("2"), equalTo("0+B")); + assertThat(results.get("3"), equalTo("0+E+F")); + } + + private void processData() { + driver.setUp(builder, TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), 0); + driver.setTime(0); + driver.process(TOPIC, "1", "A"); + driver.process(TOPIC, "2", "B"); + driver.process(TOPIC, "1", "C"); + driver.process(TOPIC, "1", "D"); + driver.process(TOPIC, "3", "E"); + driver.process(TOPIC, "3", "F"); + driver.flushState(); + } + private void doCountWindowed(final List, Long>> results) { driver.setUp(builder, TestUtils.tempDirectory(), 0); driver.setTime(0);