Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 254D71180A for ; Sat, 20 Sep 2014 13:11:10 +0000 (UTC) Received: (qmail 42389 invoked by uid 500); 20 Sep 2014 13:11:10 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 42333 invoked by uid 500); 20 Sep 2014 13:11:10 -0000 Mailing-List: contact commits-help@flink.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.incubator.apache.org Delivered-To: mailing list commits@flink.incubator.apache.org Received: (qmail 42306 invoked by uid 99); 20 Sep 2014 13:11:09 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Sep 2014 13:11:09 +0000 X-ASF-Spam-Status: No, hits=-2000.7 required=5.0 tests=ALL_TRUSTED,RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO mail.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with SMTP; Sat, 20 Sep 2014 13:11:05 +0000 Received: (qmail 41889 invoked by uid 99); 20 Sep 2014 13:10:44 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 20 Sep 2014 13:10:44 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id 4E028A2077F; Sat, 20 Sep 2014 13:10:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: mbalassi@apache.org To: commits@flink.incubator.apache.org Date: Sat, 20 Sep 2014 13:10:49 -0000 Message-Id: <9bc10cc8b7044096ac8dd5309437e47f@git.apache.org> In-Reply-To: <71119b52c5884cc6ad3ff528ad13c2f1@git.apache.org> References: <71119b52c5884cc6ad3ff528ad13c2f1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [06/18] git commit: [streaming] window and batch operator added to DataStream + Documentation updated accordingly X-Virus-Checked: Checked by ClamAV on apache.org [streaming] window and batch operator added to DataStream + Documentation updated accordingly Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/5f601cf9 Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/5f601cf9 Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/5f601cf9 Branch: refs/heads/master Commit: 5f601cf9b18fef0b54a92e42405c0179e639f5da Parents: 47d02a0 Author: gyfora Authored: Mon Sep 8 02:05:20 2014 +0200 Committer: mbalassi Committed: Sat Sep 20 13:42:04 2014 +0200 ---------------------------------------------------------------------- docs/streaming_guide.md | 50 ++-- .../api/datastream/BatchedDataStream.java | 238 +++++++++++++++++++ .../streaming/api/datastream/DataStream.java | 197 ++++++--------- .../api/datastream/GroupedDataStream.java | 8 +- .../api/datastream/WindowDataStream.java | 87 +++++++ .../GroupedWindowGroupReduceInvokable.java | 4 +- .../operator/GroupedWindowReduceInvokable.java | 19 +- .../operator/WindowGroupReduceInvokable.java | 12 +- .../operator/WindowReduceInvokable.java | 18 +- .../api/invokable/util/DefaultTimeStamp.java | 39 +++ .../api/invokable/util/DefaultTimestamp.java | 34 --- .../streaming/api/invokable/util/TimeStamp.java | 46 ++++ .../streaming/api/invokable/util/Timestamp.java | 38 --- .../WindowGroupReduceInvokableTest.java | 19 +- .../operator/WindowReduceInvokableTest.java | 18 +- .../ml/IncrementalLearningSkeleton.java | 6 +- 16 files changed, 561 insertions(+), 272 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/docs/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md index 04e2f2e..6ed53df 100644 --- a/docs/streaming_guide.md +++ b/docs/streaming_guide.md @@ -242,42 +242,42 @@ Merges two or more `DataStream` instances creating a new DataStream containing a dataStream.merge(otherStream1, otherStream2…) ``` +### Grouped operators + +Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`: + +#### Reduce on GroupedDataStream +When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream. + +### Aggregations + +The Flink streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion. + +Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)` + +For every incoming tuple the selected field is replaced with the current aggregated value. If the aggregations are used without defining field position, 0 is used as default. + ### Window/Batch operators Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default. -#### Window reduce -The transformation calls a user-defined `GroupReduceFunction` on records received during the predefined time window. The window is shifted after each reduce call. -A window reduce that sums the elements in the last minute with 10 seconds stepsize: +When applied to grouped data streams the operators applied will be executed on groups of elements grouped by the selected key position. -```java -dataStream.windowReduce(new GroupReduceFunction() { - @Override - public void reduce(Iterable values, Collector out) throws Exception { - Integer sum = 0; - for(Integer val: values){ - sum+=val; - } - } - }, 60000, 10000); -``` +#### Reduce on windowed/batched data streams +The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations. -#### Batch reduce -The transformation calls a `GroupReduceFunction` for each data batch of the predefined size. The batch slides by the predefined number of elements after each call. Works similarly to window reduce. +A window reduce that sums the elements in the last minute with 10 seconds slide interval: ```java -dataStream.batchReduce(reducer, batchSize, slideSize) +dataStream.window(60000, 10000).sum(); ``` -### Grouped operators - -Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`: - -#### Reduce on GroupedDataStream -When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream. +#### ReduceGroup on windowed/batched data streams +The transformation calls a `GroupReduceFunction` for each data batch or data window. The batch/window slides by the predefined number of elements/time after each call. -#### Window/Batchreduce on GroupedDataStream -Similarly to the grouped reduce operator the window and batch reduce operators work the same way as in the non-grouped case except that in a data window/batch every `GroupReduceFunction` call will receive data elements for only the same keys. +```java +dataStream.batch(1000, 100).reduceGroup(reducer); +``` ### Co operators http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java new file mode 100755 index 0000000..0aa5de6 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java @@ -0,0 +1,238 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.java.functions.RichGroupReduceFunction; +import org.apache.flink.api.java.functions.RichReduceFunction; +import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.MinAggregationFunction; +import org.apache.flink.streaming.api.function.aggregation.SumAggregationFunction; +import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.GroupedBatchReduceInvokable; +import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; +import org.apache.flink.types.TypeInformation; + +/** + * A {@link BatchedDataStream} represents a data stream whose elements are + * batched together in a sliding batch. operations like + * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)} + * are applied for each batch and the batch is slid afterwards. + * + * @param + * The output type of the {@link BatchedDataStream} + */ +public class BatchedDataStream { + + protected DataStream dataStream; + protected boolean isGrouped; + protected int keyPosition; + protected long batchSize; + protected long slideSize; + + protected BatchedDataStream(DataStream dataStream, long batchSize, long slideSize) { + if (dataStream instanceof GroupedDataStream) { + this.isGrouped = true; + this.keyPosition = ((GroupedDataStream) dataStream).keyPosition; + } else { + this.isGrouped = false; + } + this.dataStream = dataStream.copy(); + this.batchSize = batchSize; + this.slideSize = slideSize; + } + + protected BatchedDataStream(BatchedDataStream batchedDataStream) { + this.dataStream = batchedDataStream.dataStream.copy(); + this.isGrouped = batchedDataStream.isGrouped; + this.keyPosition = batchedDataStream.keyPosition; + this.batchSize = batchedDataStream.batchSize; + this.slideSize = batchedDataStream.slideSize; + } + + /** + * Groups the elements of the {@link BatchedDataStream} by the given key + * position to be used with grouped operators. + * + * @param keyPosition + * The position of the field on which the + * {@link BatchedDataStream} will be grouped. + * @return The transformed {@link BatchedDataStream} + */ + public BatchedDataStream groupBy(int keyPosition) { + return new BatchedDataStream(dataStream.groupBy(keyPosition), batchSize, slideSize); + } + + /** + * Applies a reduce transformation on every sliding batch/window of the data + * stream. If the data stream is grouped then the reducer is applied on + * every group of elements sharing the same key. This type of reduce is much + * faster than reduceGroup since the reduce function can be applied + * incrementally. The user can also extend the {@link RichReduceFunction} to + * gain access to other features provided by the {@link RichFuntion} + * interface. + * + * @param reducer + * The {@link ReduceFunction} that will be called for every + * element of the input values in the batch/window. + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator reduce(ReduceFunction reducer) { + return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, + ReduceFunction.class, 0), new FunctionTypeWrapper(reducer, + ReduceFunction.class, 0), getReduceInvokable(reducer)); + } + + /** + * Applies a reduceGroup transformation on preset batches/windows of the + * DataStream. The transformation calls a {@link GroupReduceFunction} for + * each batch/window. Each GroupReduceFunction call can return any number of + * elements including none. The user can also extend + * {@link RichGroupReduceFunction} to gain access to other features provided + * by the {@link RichFuntion} interface. + * + * @param reducer + * The {@link GroupReduceFunction} that will be called for every + * batch/window. + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator reduceGroup(GroupReduceFunction reducer) { + return dataStream.addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, + GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, GroupReduceFunction.class, + 1), getGroupReduceInvokable(reducer)); + } + + /** + * Applies an aggregation that sums every sliding batch/window of the data + * stream at the given position. + * + * @param positionToSum + * The position in the data point to sum + * @return The transformed DataStream. + */ + @SuppressWarnings("unchecked") + public SingleOutputStreamOperator sum(int positionToSum) { + dataStream.checkFieldRange(positionToSum); + return aggregate((AggregationFunction) SumAggregationFunction.getSumFunction( + positionToSum, dataStream.getClassAtPos(positionToSum))); + } + + /** + * Syntactic sugar for sum(0) + * + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator sum() { + return sum(0); + } + + /** + * Applies an aggregation that that gives the minimum of every sliding + * batch/window of the data stream at the given position. + * + * @param positionToMin + * The position in the data point to minimize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator min(int positionToMin) { + dataStream.checkFieldRange(positionToMin); + return aggregate(new MinAggregationFunction(positionToMin)); + } + + /** + * Syntactic sugar for min(0) + * + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator min() { + return min(0); + } + + /** + * Applies an aggregation that gives the maximum of every sliding + * batch/window of the data stream at the given position. + * + * @param positionToMax + * The position in the data point to maximize + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator max(int positionToMax) { + dataStream.checkFieldRange(positionToMax); + return aggregate(new MaxAggregationFunction(positionToMax)); + } + + /** + * Syntactic sugar for max(0) + * + * @return The transformed DataStream. + */ + public SingleOutputStreamOperator max() { + return max(0); + } + + /** + * Gets the output type. + * + * @return The output type. + */ + public TypeInformation getOutputType() { + return dataStream.getOutputType(); + } + + private SingleOutputStreamOperator aggregate(AggregationFunction aggregate) { + BatchReduceInvokable invokable = getReduceInvokable(aggregate); + + SingleOutputStreamOperator returnStream = dataStream.addFunction("batchReduce", + aggregate, null, null, invokable); + + dataStream.jobGraphBuilder.setTypeWrappersFrom(dataStream.getId(), returnStream.getId()); + return returnStream; + } + + protected BatchReduceInvokable getReduceInvokable(ReduceFunction reducer) { + BatchReduceInvokable invokable; + if (isGrouped) { + invokable = new GroupedBatchReduceInvokable(reducer, batchSize, slideSize, + keyPosition); + } else { + invokable = new BatchReduceInvokable(reducer, batchSize, slideSize); + } + return invokable; + } + + protected BatchGroupReduceInvokable getGroupReduceInvokable( + GroupReduceFunction reducer) { + BatchGroupReduceInvokable invokable; + if (isGrouped) { + invokable = new GroupedBatchGroupReduceInvokable(reducer, batchSize, slideSize, + keyPosition); + } else { + invokable = new BatchGroupReduceInvokable(reducer, batchSize, slideSize); + } + return invokable; + } + + protected BatchedDataStream copy() { + return new BatchedDataStream(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 70348d6..bebda91 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -26,12 +26,10 @@ import org.apache.commons.lang3.SerializationUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.functions.RichFilterFunction; import org.apache.flink.api.java.functions.RichFlatMapFunction; -import org.apache.flink.api.java.functions.RichGroupReduceFunction; import org.apache.flink.api.java.functions.RichMapFunction; import org.apache.flink.api.java.functions.RichReduceFunction; import org.apache.flink.api.java.typeutils.TupleTypeInfo; @@ -49,14 +47,12 @@ import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByBatches; import org.apache.flink.streaming.api.function.sink.WriteSinkFunctionByMillis; import org.apache.flink.streaming.api.invokable.SinkInvokable; import org.apache.flink.streaming.api.invokable.StreamOperatorInvokable; -import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.FilterInvokable; import org.apache.flink.streaming.api.invokable.operator.FlatMapInvokable; import org.apache.flink.streaming.api.invokable.operator.MapInvokable; import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable; -import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.partitioner.BroadcastPartitioner; import org.apache.flink.streaming.partitioner.DistributePartitioner; import org.apache.flink.streaming.partitioner.FieldsPartitioner; @@ -385,160 +381,113 @@ public class DataStream { ReduceFunction.class, 0), new StreamReduceInvokable(reducer)); } - public GroupedDataStream groupBy(int keyPosition) { - return new GroupedDataStream(this, keyPosition); - } - /** - * Applies a reduce transformation on preset chunks of the DataStream. The - * transformation calls a {@link GroupReduceFunction} for each tuple batch - * of the predefined size. Each GroupReduceFunction call can return any - * number of elements including none. The user can also extend - * {@link RichGroupReduceFunction} to gain access to other features provided - * by the {@link RichFuntion} interface. + * Groups the elements of a {@link DataStream} by the given key position to + * be used with grouped operators like + * {@link GroupedDataStream#reduce(ReduceFunction)} * - * - * @param reducer - * The GroupReduceFunction that is called for each tuple batch. - * @param batchSize - * The number of tuples grouped together in the batch. - * @param - * output type - * @return The transformed {@link DataStream}. + * @param keyPosition + * The position of the field on which the {@link DataStream} will + * be grouped. + * @return The transformed {@link DataStream} */ - public SingleOutputStreamOperator batchReduce(GroupReduceFunction reducer, - long batchSize) { - return batchReduce(reducer, batchSize, batchSize); + public GroupedDataStream groupBy(int keyPosition) { + return new GroupedDataStream(this, keyPosition); } /** - * Applies a reduce transformation on preset sliding chunks of the - * DataStream. The transformation calls a {@link GroupReduceFunction} for - * each tuple batch of the predefined size. The tuple batch gets slid by the - * given number of tuples. Each GroupReduceFunction call can return any - * number of elements including none. The user can also extend - * {@link RichGroupReduceFunction} to gain access to other features provided - * by the {@link RichFuntion} interface. - * + * Collects the data stream elements into sliding batches creating a new + * {@link BatchedDataStream}. The user can apply transformations like + * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup} + * or aggregations on the {@link BatchedDataStream}. * - * @param reducer - * The GroupReduceFunction that is called for each tuple batch. * @param batchSize - * The number of tuples grouped together in the batch. + * The number of elements in each batch at each operator * @param slideSize - * The number of tuples the batch is slid by. - * @param - * output type - * @return The transformed {@link DataStream}. + * The number of elements with which the batches are slid by + * after each transformation. + * @return The transformed {@link DataStream} */ - public SingleOutputStreamOperator batchReduce(GroupReduceFunction reducer, - long batchSize, long slideSize) { + public BatchedDataStream batch(long batchSize, long slideSize) { if (batchSize < 1) { throw new IllegalArgumentException("Batch size must be positive"); } if (slideSize < 1) { throw new IllegalArgumentException("Slide size must be positive"); } - - FunctionTypeWrapper inTypeWrapper = new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 0); - FunctionTypeWrapper outTypeWrapper = new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 1); - - return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper, - new BatchGroupReduceInvokable(reducer, batchSize, slideSize)); + return new BatchedDataStream(this, batchSize, slideSize); } /** - * Applies a reduce transformation on preset "time" chunks of the - * DataStream. The transformation calls a {@link GroupReduceFunction} on - * records received during the predefined time window. The window is shifted - * after each reduce call. Each GroupReduceFunction call can return any - * number of elements including none.The user can also extend - * {@link RichGroupReduceFunction} to gain access to other features provided - * by the {@link RichFuntion} interface. - * + * Collects the data stream elements into sliding batches creating a new + * {@link BatchedDataStream}. The user can apply transformations like + * {@link BatchedDataStream#reduce}, {@link BatchedDataStream#reduceGroup} + * or aggregations on the {@link BatchedDataStream}. * - * @param reducer - * The GroupReduceFunction that is called for each time window. - * @param windowSize - * SingleOutputStreamOperator The time window to run the reducer - * on, in milliseconds. - * @param - * output type - * @return The transformed DataStream. - */ - public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, - long windowSize) { - return windowReduce(reducer, windowSize, windowSize); - } - - /** - * Applies a reduce transformation on preset "time" chunks of the - * DataStream. The transformation calls a {@link GroupReduceFunction} on - * records received during the predefined time window. The window is shifted - * after each reduce call. Each GroupReduceFunction call can return any - * number of elements including none.The user can also extend - * {@link RichGroupReduceFunction} to gain access to other features provided - * by the {@link RichFuntion} interface. - * - * - * @param reducer - * The GroupReduceFunction that is called for each time window. - * @param windowSize - * SingleOutputStreamOperator The time window to run the reducer - * on, in milliseconds. - * @param slideInterval - * The time interval, batch is slid by. - * @param - * output type - * @return The transformed DataStream. + * @param batchSize + * The number of elements in each batch at each operator + * @return The transformed {@link DataStream} */ - public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, - long windowSize, long slideInterval) { - return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp()); + public BatchedDataStream batch(long batchSize) { + return batch(batchSize, batchSize); } /** - * Applies a reduce transformation on preset "time" chunks of the - * DataStream. The transformation calls a {@link GroupReduceFunction} on - * records received during the predefined time window. The window is shifted - * after each reduce call. Each GroupReduceFunction call can return any - * number of elements including none. The time is determined by a - * user-defined timestamp. The user can also extend - * {@link RichGroupReduceFunction} to gain access to other features provided - * by the {@link RichFuntion} interface. + * Collects the data stream elements into sliding windows creating a new + * {@link WindowDataStream}. The user can apply transformations like + * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or + * aggregations on the {@link WindowDataStream}. * - * - * @param reducer - * The GroupReduceFunction that is called for each time window. * @param windowSize - * SingleOutputStreamOperator The time window to run the reducer - * on, in milliseconds. + * The length of the window in milliseconds. * @param slideInterval - * The time interval, batch is slid by. + * The number of milliseconds with which the windows are slid by + * after each transformation. * @param timestamp - * Timestamp function to retrieve a timestamp from an element. - * @param - * output type - * @return The transformed DataStream. + * User defined function for extracting time-stamps from each + * element + * @return The transformed {@link DataStream} */ - public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, - long windowSize, long slideInterval, Timestamp timestamp) { + public WindowDataStream window(long windowSize, long slideInterval, + TimeStamp timestamp) { if (windowSize < 1) { throw new IllegalArgumentException("Window size must be positive"); } if (slideInterval < 1) { throw new IllegalArgumentException("Slide interval must be positive"); } + return new WindowDataStream(this, windowSize, slideInterval, timestamp); + } - FunctionTypeWrapper inTypeWrapper = new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 0); - FunctionTypeWrapper outTypeWrapper = new FunctionTypeWrapper(reducer, - GroupReduceFunction.class, 1); + /** + * Collects the data stream elements into sliding windows creating a new + * {@link WindowDataStream}. The user can apply transformations like + * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or + * aggregations on the {@link WindowDataStream}. + * + * @param windowSize + * The length of the window in milliseconds. + * @param slideInterval + * The number of milliseconds with which the windows are slid by + * after each transformation. + * @return The transformed {@link DataStream} + */ + public WindowDataStream window(long windowSize, long slideInterval) { + return window(windowSize, slideInterval, new DefaultTimeStamp()); + } - return addFunction("batchReduce", reducer, inTypeWrapper, outTypeWrapper, - new WindowGroupReduceInvokable(reducer, windowSize, slideInterval, timestamp)); + /** + * Collects the data stream elements into sliding windows creating a new + * {@link WindowDataStream}. The user can apply transformations like + * {@link WindowDataStream#reduce}, {@link WindowDataStream#reduceGroup} or + * aggregations on the {@link WindowDataStream}. + * + * @param windowSize + * The length of the window in milliseconds. + * @return The transformed {@link DataStream} + */ + public WindowDataStream window(long windowSize) { + return window(windowSize, windowSize); } /** @@ -1115,7 +1064,7 @@ public class DataStream { * * @return The copy */ - protected DataStream copy(){ + protected DataStream copy() { return new DataStream(this); } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java index e513f2d..138a6f8 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/GroupedDataStream.java @@ -25,8 +25,8 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction; import org.apache.flink.streaming.api.invokable.operator.GroupedBatchGroupReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.GroupReduceInvokable; import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable; -import org.apache.flink.streaming.api.invokable.util.DefaultTimestamp; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper; @@ -166,7 +166,7 @@ public class GroupedDataStream extends DataStream { */ public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, long windowSize, long slideInterval) { - return windowReduce(reducer, windowSize, slideInterval, new DefaultTimestamp()); + return windowReduce(reducer, windowSize, slideInterval, new DefaultTimeStamp()); } /** @@ -191,7 +191,7 @@ public class GroupedDataStream extends DataStream { * @return The transformed DataStream. */ public SingleOutputStreamOperator windowReduce(GroupReduceFunction reducer, - long windowSize, long slideInterval, Timestamp timestamp) { + long windowSize, long slideInterval, TimeStamp timestamp) { return addFunction("batchReduce", reducer, new FunctionTypeWrapper(reducer, GroupReduceFunction.class, 0), new FunctionTypeWrapper(reducer, GroupReduceFunction.class, 1), new GroupedWindowGroupReduceInvokable(reducer, http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java new file mode 100755 index 0000000..4756050 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowDataStream.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.api.common.functions.GroupReduceFunction; +import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.streaming.api.invokable.operator.BatchGroupReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.BatchReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.GroupedWindowGroupReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.GroupedWindowReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable; +import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; + +/** + * A {@link WindowDataStream} represents a data stream whose elements are + * batched together in a sliding window. operations like + * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)} + * are applied for each window and the window is slid afterwards. + * + * @param + * The output type of the {@link WindowDataStream} + */ +public class WindowDataStream extends BatchedDataStream { + + TimeStamp timeStamp; + + protected WindowDataStream(DataStream dataStream, long windowSize, long slideInterval, + TimeStamp timeStamp) { + super(dataStream, windowSize, slideInterval); + this.timeStamp = timeStamp; + } + + protected WindowDataStream(WindowDataStream windowDataStream) { + super(windowDataStream); + this.timeStamp = windowDataStream.timeStamp; + } + + public WindowDataStream groupBy(int keyPosition) { + return new WindowDataStream(dataStream.groupBy(keyPosition), batchSize, slideSize, + timeStamp); + } + + protected BatchReduceInvokable getReduceInvokable(ReduceFunction reducer) { + BatchReduceInvokable invokable; + if (isGrouped) { + invokable = new GroupedWindowReduceInvokable(reducer, batchSize, slideSize, + keyPosition, timeStamp); + } else { + invokable = new WindowReduceInvokable(reducer, batchSize, slideSize, timeStamp); + } + return invokable; + } + + protected BatchGroupReduceInvokable getGroupReduceInvokable( + GroupReduceFunction reducer) { + BatchGroupReduceInvokable invokable; + if (isGrouped) { + invokable = new GroupedWindowGroupReduceInvokable(reducer, batchSize, + slideSize, keyPosition, timeStamp); + } else { + invokable = new WindowGroupReduceInvokable(reducer, batchSize, slideSize, + timeStamp); + } + return invokable; + } + + public WindowDataStream copy() { + return new WindowDataStream(this); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java index 4027b78..865dced 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowGroupReduceInvokable.java @@ -22,7 +22,7 @@ import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; import org.apache.flink.streaming.state.MutableTableState; @@ -33,7 +33,7 @@ public class GroupedWindowGroupReduceInvokable extends WindowGroupReduc private MutableTableState> values; public GroupedWindowGroupReduceInvokable(GroupReduceFunction reduceFunction, long windowSize, - long slideInterval, int keyPosition, Timestamp timestamp) { + long slideInterval, int keyPosition, TimeStamp timestamp) { super(reduceFunction, windowSize, slideInterval, timestamp); this.keyPosition = keyPosition; this.reducer = reduceFunction; http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java index e202e86..df94843 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowReduceInvokable.java @@ -18,33 +18,26 @@ package org.apache.flink.streaming.api.invokable.operator; import java.io.IOException; -import java.util.Map; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; -import org.apache.flink.streaming.state.SlidingWindowState; public class GroupedWindowReduceInvokable extends GroupedBatchReduceInvokable { private static final long serialVersionUID = 1L; - protected transient SlidingWindowState> state; - private Timestamp timestamp; + private TimeStamp timestamp; private long startTime; private long nextRecordTime; public GroupedWindowReduceInvokable(ReduceFunction reduceFunction, long windowSize, - long slideInterval, Timestamp timestamp, int keyPosition) { + long slideInterval, int keyPosition, TimeStamp timestamp) { super(reduceFunction, windowSize, slideInterval, keyPosition); this.timestamp = timestamp; + this.startTime = timestamp.getStartTime(); } - - @Override - protected void initializeAtFirstRecord() { - startTime = nextRecordTime - (nextRecordTime % granularity); - } - + @Override protected StreamRecord getNextRecord() throws IOException { reuse = recordIterator.next(reuse); @@ -53,7 +46,7 @@ public class GroupedWindowReduceInvokable extends GroupedBatchReduceInvokab } return reuse; } - + @Override protected boolean batchNotFull() { if (nextRecordTime < startTime + granularity) { http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java index 7b4317a..03c19d4 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokable.java @@ -20,24 +20,20 @@ package org.apache.flink.streaming.api.invokable.operator; import java.io.IOException; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; public class WindowGroupReduceInvokable extends BatchGroupReduceInvokable { private static final long serialVersionUID = 1L; private long startTime; private long nextRecordTime; - private Timestamp timestamp; + private TimeStamp timestamp; public WindowGroupReduceInvokable(GroupReduceFunction reduceFunction, long windowSize, - long slideInterval, Timestamp timestamp) { + long slideInterval, TimeStamp timestamp) { super(reduceFunction, windowSize, slideInterval); this.timestamp = timestamp; - } - - @Override - protected void initializeAtFirstRecord() { - startTime = nextRecordTime - (nextRecordTime % granularity); + this.startTime = timestamp.getStartTime(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java index 0f13397..bd51c65 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokable.java @@ -21,25 +21,21 @@ import java.io.IOException; import java.util.Iterator; import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.api.streamrecord.StreamRecord; public class WindowReduceInvokable extends BatchReduceInvokable { private static final long serialVersionUID = 1L; private long startTime; private long nextRecordTime; - private Timestamp timestamp; + private TimeStamp timestamp; private String nullElement = "nullElement"; public WindowReduceInvokable(ReduceFunction reduceFunction, long windowSize, - long slideInterval, Timestamp timestamp) { + long slideInterval, TimeStamp timestamp) { super(reduceFunction, windowSize, slideInterval); this.timestamp = timestamp; - } - - @Override - protected void initializeAtFirstRecord() { - startTime = nextRecordTime - (nextRecordTime % granularity); + this.startTime = timestamp.getStartTime(); } protected StreamRecord getNextRecord() throws IOException { @@ -59,7 +55,7 @@ public class WindowReduceInvokable extends BatchReduceInvokable { return false; } } - + @Override protected void collectOneUnit() throws Exception { OUT reduced = null; @@ -71,9 +67,9 @@ public class WindowReduceInvokable extends BatchReduceInvokable { resetReuse(); } } - if(reduced!=null){ + if (reduced != null) { state.pushBack(reduced); - }else{ + } else { state.pushBack(nullElement); } } http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java new file mode 100644 index 0000000..b6186e1 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.util; + +/** + * Default timestamp function that uses the Java System.currentTimeMillis() + * method to retrieve a timestamp. + * + * @param + * Type of the inputs of the reducing function. + */ +public class DefaultTimeStamp implements TimeStamp { + private static final long serialVersionUID = 1L; + + @Override + public long getTimestamp(T value) { + return System.currentTimeMillis(); + } + + @Override + public long getStartTime() { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java deleted file mode 100644 index 8276a01..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimestamp.java +++ /dev/null @@ -1,34 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.util; - -/** - * Default timestamp function that uses the Java System.currentTimeMillis() - * method to retrieve a timestamp. - * - * @param - * Type of the inputs of the reducing function. - */ -public class DefaultTimestamp implements Timestamp { - private static final long serialVersionUID = 1L; - - @Override - public long getTimestamp(T value) { - return System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java new file mode 100644 index 0000000..27447d7 --- /dev/null +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.invokable.util; + +import java.io.Serializable; + +/** + * Interface for getting a timestamp from a custom value. Used in window + * reduces. In order to work properly, the timestamps must be non-decreasing. + * + * @param + * Type of the value to create the timestamp from. + */ +public interface TimeStamp extends Serializable { + + /** + * Values + * + * @param value + * The value to create the timestamp from + * @return The timestamp + */ + public long getTimestamp(T value); + + /** + * Function to define the starting time for reference + * + * @return The starting timestamp + */ + public long getStartTime(); +} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java deleted file mode 100644 index 91758e8..0000000 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/Timestamp.java +++ /dev/null @@ -1,38 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.invokable.util; - -import java.io.Serializable; - -/** - * Interface for getting a timestamp from a custom value. Used in window - * reduces. In order to work properly, the timestamps must be non-decreasing. - * - * @param - * Type of the value to create the timestamp from. - */ -public interface Timestamp extends Serializable { - - /** - * Values - * @param value - * The value to create the timestamp from - * @return The timestamp - */ - public long getTimestamp(T value); -} http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java index 7437bec..097e391 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowGroupReduceInvokableTest.java @@ -25,7 +25,7 @@ import java.util.Iterator; import java.util.List; import org.apache.flink.api.common.functions.GroupReduceFunction; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.util.MockInvokable; import org.apache.flink.util.Collector; import org.junit.Before; @@ -45,13 +45,15 @@ public class WindowGroupReduceInvokableTest { } } - public static final class MyTimestamp implements Timestamp { + public static final class MyTimestamp implements TimeStamp { private static final long serialVersionUID = 1L; private Iterator timestamps; + private long start; public MyTimestamp(List timestamps) { this.timestamps = timestamps.iterator(); + this.start = timestamps.get(0); } @Override @@ -59,6 +61,11 @@ public class WindowGroupReduceInvokableTest { long ts = timestamps.next(); return ts; } + + @Override + public long getStartTime() { + return start; + } } private final static String EOW = "|"; @@ -81,16 +88,16 @@ public class WindowGroupReduceInvokableTest { slideSize = 5; timestamps = Arrays.asList(101L, 103L, 121L, 122L, 123L, 124L, 180L, 181L, 185L, 190L); expectedResults.add(Arrays.asList("1", "2", EOW, EOW, EOW, "3", "4", "5", "6", EOW, "3", - "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", "8", - EOW, "7", "8", "9", EOW, "9", "10", EOW)); + "4", "5", "6", EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, EOW, "7", EOW, "7", + "8", "9", EOW, "8", "9", "10", EOW)); invokables.add(new WindowGroupReduceInvokable(new MySlidingWindowReduce(), windowSize, slideSize, new MyTimestamp(timestamps))); windowSize = 10; slideSize = 4; timestamps = Arrays.asList(101L, 103L, 110L, 112L, 113L, 114L, 120L, 121L, 125L, 130L); - expectedResults.add(Arrays.asList("1", "2", EOW, "3", "4", "5", EOW, "3", "4", "5", "6", - EOW, "4", "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "7", "8", "9", EOW, "9", + expectedResults.add(Arrays.asList("1", "2","3" ,EOW, "3", "4", "5","6", EOW, "3", "4", "5", "6", + EOW, "5", "6", "7", "8", EOW, "7", "8", "9", EOW, "8","9", "10", EOW)); invokables.add(new WindowGroupReduceInvokable(new MySlidingWindowReduce(), windowSize, slideSize, new MyTimestamp(timestamps))); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java index 1aed25f..ff0951d 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowReduceInvokableTest.java @@ -25,7 +25,7 @@ import java.util.List; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.invokable.util.Timestamp; +import org.apache.flink.streaming.api.invokable.util.TimeStamp; import org.apache.flink.streaming.util.MockInvokable; import org.junit.Test; @@ -52,13 +52,18 @@ public class WindowReduceInvokableTest { public Integer reduce(Integer value1, Integer value2) throws Exception { return value1 + value2; } - }, 4, 2, new Timestamp() { + }, 4, 2, new TimeStamp() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Integer value) { return value; } + + @Override + public long getStartTime() { + return 0; + } }); List expected = new ArrayList(); @@ -86,14 +91,19 @@ public class WindowReduceInvokableTest { Tuple2 value2) throws Exception { return new Tuple2(value1.f0, value1.f1 + value2.f1); } - }, 3, 2, new Timestamp>() { + }, 3, 2, 0, new TimeStamp>() { private static final long serialVersionUID = 1L; @Override public long getTimestamp(Tuple2 value) { return value.f1; } - }, 0); + + @Override + public long getStartTime() { + return 1; + } + }); List> expected2 = new ArrayList>(); expected2.add(new Tuple2("a", 6)); http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/5f601cf9/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java ---------------------------------------------------------------------- diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java index d80b937..a433fd0 100755 --- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java +++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java @@ -119,12 +119,12 @@ public class IncrementalLearningSkeleton { public static void main(String[] args) { - StreamExecutionEnvironment env = StreamExecutionEnvironment - .createLocalEnvironment(PARALLELISM).setBufferTimeout(1000); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment( + PARALLELISM).setBufferTimeout(1000); // Build new model on every second of new data DataStream model = env.addSource(new TrainingDataSource(), SOURCE_PARALLELISM) - .windowReduce(new PartialModelBuilder(), 5000); + .window(5000).reduceGroup(new PartialModelBuilder()); // Use partial model for prediction DataStream prediction = env.addSource(new NewDataSource(), SOURCE_PARALLELISM)