flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [33/34] incubator-flink git commit: [streaming] Updated javadocs for new windowing semantics
Date Fri, 05 Dec 2014 17:26:38 GMT
[streaming] Updated javadocs for new windowing semantics

[streaming] Updated streaming-guide to match recent api changes


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4e046a9b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4e046a9b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4e046a9b

Branch: refs/heads/master
Commit: 4e046a9bd43428820a409a0777ab281e3c366af5
Parents: 3f74e05
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Dec 2 23:39:47 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:47:18 2014 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 189 ++++++++----
 .../streaming/api/datastream/DataStream.java    |  63 ++--
 .../datastream/SingleOutputStreamOperator.java  |  40 ++-
 .../api/datastream/WindowedDataStream.java      | 289 ++++++++++---------
 .../streaming/api/windowing/helper/Count.java   |   6 +-
 .../streaming/api/windowing/helper/Delta.java   |   9 +-
 .../streaming/api/windowing/helper/Time.java    |  10 +-
 .../windowing/TimeWindowingExample.java         |   2 +-
 8 files changed, 360 insertions(+), 248 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index a0bc1d6..6038cd3 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -11,12 +11,12 @@ Introduction
 ------------
 
 
-Flink Streaming is an extension of the core Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like Flume, Twitter, ZeroMQ and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
+Flink Streaming is an extension of the core Flink API for high-throughput, low-latency data stream processing. The system can connect to and process data streams from many data sources like RabbitMQ, Flume, Twitter, ZeroMQ and also from any user defined data source. Data streams can be transformed and modified using high-level functions similar to the ones provided by the batch processing API. Flink Streaming provides native support for iterative stream processing. The processed data can be pushed to different output types.
 
 Flink Streaming API
 -----------
 
-The Streaming API is part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.streaming* package.
+The Streaming API is currently part of the *addons* Maven project. All relevant classes are located in the *org.apache.flink.streaming* package.
 
 Add the following dependency to your `pom.xml` to use the Flink Streaming.
 
@@ -91,14 +91,16 @@ As these steps are basically the same as in the core API we will only note the i
 For stream processing jobs, the user needs to obtain a `StreamExecutionEnvironment` in contrast with the batch API where one would need an `ExecutionEnvironment`. The process otherwise is essentially the same:
 
 ~~~java 
-StreamExecutionEnvironment.createLocalEnvironment(params…)
-StreamExecutionEnvironment.createRemoteEnvironment(params…)
+StreamExecutionEnvironment.getExecutionEnvironment()
+StreamExecutionEnvironment.createLocalEnvironment(parallelism)
+StreamExecutionEnvironment.createRemoteEnvironment(…)
 ~~~
 
 For connecting to data streams the `StreamExecutionEnvironment` has many different methods, from basic file sources to completely general user defined data sources. We will go into details in the [basics](#basics) section.
 
 ~~~java
 env.socketTextStream(host, port)
+env.fromElements(elements…)
 ~~~
 
 After defining the data stream sources, the user can specify transformations on the data streams to create a new data stream. Different data streams can be also combined together for joint transformations which are being showcased in the [operations](#operations) section.
@@ -111,6 +113,7 @@ The processed data can be pushed to different outputs called sinks. The user can
 
 ~~~java
 dataStream.writeAsCsv(path)
+dataStream.print()
 ~~~
 
 Once the complete program is specified `execute(programName)` is to be called on the `StreamExecutionEnvironment`. This will either execute on the local machine or submit the program for execution on a cluster, depending on the chosen execution environment.
@@ -126,22 +129,22 @@ Basics
 
 ### DataStream
 
-The `DataStream` is the basic abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. Operations will be applied on individual data points or windows of the `DataStream` based on the type of the operation. For example the map operator transforms each data point individually while window or batch aggregations work on an interval of data points at the same time.
+The `DataStream` is the basic abstraction provided by the Flink Streaming API. It represents a continuous stream of data of a certain type from either a data source or a transformed data stream. Operations will be applied on individual data points or windows of the `DataStream` based on the type of the operation. For example the map operator transforms each data point individually while window operations work on an interval of data points at the same time.
  
-The operations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy()` method returns a `GroupedDataStream` which can be used for group operations.
+The operations may return different `DataStream` types allowing more elaborate transformations, for example the `groupBy(…)` method returns a `GroupedDataStream` which can be used for grouped operations such as aggregating by key.
 
 ### Partitioning
 
 Partitioning controls how individual data points are distributed among the parallel instances of the transformation operators. By default *Forward* partitioning is used. There are several partitioning types supported in Flink Streaming:
 
- * *Forward*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. This is the default partitioner.
+ * *Forward*: Forward partitioning directs the output data to the next operator on the same machine (if possible) avoiding expensive network I/O. If there are more processing nodes than inputs or vice verse the load is distributed among the extra nodes in a round-robin fashion. This is the default partitioner.
 Usage: `dataStream.forward()`
- * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution.
+ * *Shuffle*: Shuffle partitioning randomly partitions the output data stream to the next operator using uniform distribution. Use this only when it is important that the partitioning is randomised. If you only care about an even load use *Distribute*
 Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
- * *Field*: Field partitioning partitions the output data stream based on the hash code of a selected key field. Data points with the same key are directed to the same operator instance.
-Usage: `dataStream.partitionBy(keyposition)`
+ * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. The user can define keys by field positions (for tuple and array types), field expressions (for Pojo types) and custom keys using the `KeySelector` interface. 
+Usage: `dataStream.partitionBy(keys)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`
  * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
@@ -149,22 +152,23 @@ Usage: `operator.setParallelism(1)`
 
 ### Sources
 
-The user can connect to data streams by the different implementations of `DataStreamSource` using methods provided by the `StreamExecutionEnvironment`. There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
+The user can connect to data streams by the different implementations of `SourceFunction` using `StreamExecutionEnvironment.addSource(SourceFunction)`. In contrast with other operators, DataStreamSources have a default operator parallelism of 1 which can be increased using the `.setParallelism(dop)` method as we will see later at the operator settings.
+
+There are several predefined ones similar to the ones of the batch API and some streaming specific ones like:
 
  * `socketTextStream(hostname, port)`
  * `readTextStream(filepath)`
- * `genereateSequence(from, to)`
+ * `generateSequence(from, to)`
  * `fromElements(elements…)`
  * `fromCollection(collection)`
  * `readTextFile(filepath)`
 
 These can be used to easily test and debug streaming programs.
 There are pre-implemented connectors for a number of the most popular message queue services, please refer to the section on [connectors](#stream-connectors) for more detail.
-Besides the pre-defined solutions the user can implement their own source by implementing the `SourceFunction` interface and using the `addSource(sourceFunction)` method of the `StreamExecutionEnvironment`.
 
 ### Sinks
 
-`DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations `DataStreamSink` available right away:
+`DataStreamSink` represents the different outputs of a Flink Streaming program. There are several pre-defined implementations available right away:
 
  * `dataStream.print()` – Writes the DataStream to the standard output, practical for testing purposes
  * `dataStream.writeAsText(parameters)` – Writes the DataStream to a text file
@@ -238,7 +242,7 @@ dataStream.reduce(new ReduceFunction<Integer>() {
 ~~~
 
 #### Merge
-Merges two or more `DataStream` instances creating a new DataStream containing all the elements from all the streams.
+Merges two or more `DataStream` outputs, creating a new DataStream containing all the elements from all the streams.
 
 ~~~java
 dataStream.merge(otherStream1, otherStream2…)
@@ -246,67 +250,141 @@ dataStream.merge(otherStream1, otherStream2…)
 
 ### Grouped operators
 
-Some transformations require that the `DataStream` is grouped on some key value. The user can create a `GroupedDataStream` by calling the `groupBy(keyPosition)` method of a non-grouped `DataStream`. The user can apply different reduce transformations on the obtained `GroupedDataStream`:
+Some transformations require that the elements of a `DataStream` are grouped on some key. The user can create a `GroupedDataStream` by calling the `groupBy(key)` method of a non-grouped `DataStream`. 
+Keys can be of three types: fields positions (applicable for tuple/array types), field expressions (applicable for pojo types), KeySelector instances. 
+
+The user can apply different reduce transformations on the obtained `GroupedDataStream`:
 
 #### Reduce on GroupedDataStream
 When the reduce operator is applied on a grouped data stream, the user-defined `ReduceFunction` will combine subsequent pairs of elements having the same key value. The combined results are sent to the output stream.
 
 ### Aggregations
 
-The Flink Streaming API supports different types of aggregation operators similarly to the core API. For grouped data streams the aggregations work in a grouped fashion.
+The Flink Streaming API supports different types of pre-defined aggregation operators similarly to the core API.
+
+Types of aggregations: `sum(field)`, `min(field)`, `max(field)`, `minBy(field, first)`, `maxBy(field, first)`
+
+With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. Fields can be selected using either field positions or field expressions (similarly to grouping).  
+
+With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given field. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter.
+
+### Window operators
+
+Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the DataStreams and apply reduce or aggregation operations on them afterwards. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary measure.
+
+The user can control the size (eviction) of the windows and the frequency of reduce/aggregation calls (triggers) on them in an intuitive api:
 
-Types of aggregations: `sum(fieldPosition)`, `min(fieldPosition)`, `max(fieldPosition)`, `minBy(fieldPosition, first)`, `maxBy(fieldPosition, first)`
+~~~java
+dataStream.window(…).every(…).reduce/reduceGroup/aggregation
+~~~
+
+The next example would create windows that hold elements of the last 5 seconds, and the user defined aggregation/reduce is executed on the windows every second (sliding the window by 1 second):
+
+~~~java
+dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
+~~~
 
-With `sum`, `min`, and `max` for every incoming tuple the selected field is replaced with the current aggregated value. 
+This approach is often referred to as policy based windowing. Different policies (count, time, etc.) can be mixed as well; for example to downsample our stream, we can create a window that will take the latest 100 elements of our stream every minute:
 
-With `minBy` and `maxBy` the output of the operator is the element with the current minimal or maximal value at the given fieldposition. If more components share the minimum or maximum value, the user can decide if the operator should return the first or last element. This can be set by the `first` boolean parameter.
+~~~java
+dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES))
+~~~
 
-### Window/Batch operators
+The user can also omit the `.every(…)` call which results in a tumbling window, which empties the window after every aggregation call.
 
-Window and batch operators allow the user to execute function on slices or windows of the DataStream in a sliding fashion. If the stepsize for the slide is not defined then the window/batchsize is used as stepsize by default. The user can also use user defined timestamps for calculating time windows.
+Several predefined policies are provided in the API, including delta-based, count-based and time-based policies. These can be accessed through the static methods provided by the PolicyHelper classses:
+
+`Time.of(…)`
+`Count.of(…)`
+`Delta.of(…)`
+
+For detailed description of these policies please refer to the javadocs.
+
+Now we are going to take a deeper look at the policies. 
+
+#### Policy based windowing
+The policy based windowing is a highly flexible way to specify your stream discretisation also called windowing semantics. Two types of policies are used for such a specification:
+
+1) `TriggerPolicy` This policy defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `.window(..).every(...)`, while we pass the triggering policy within `every`. 
+
+When multiple triggers are used, the reduce/aggregation will be triggered at every trigger.
+
+Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour as well.
+
+2) `Eviction Policy` This policy defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `.window(..)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
+
+When multiple evictions are used the strictest one will control the elements in the window. For instance in the call `ds.window(Count.of(5), Time.of(1,TimeUnit.SECONDS)).every(..)` we will get a window of max 5 elements which have arrived in the last second.
+
+
+In addition to the `ds.window().every()` style users can specifically pass the list of trigger and eviction policies during the window call:
+~~~java
+myStream.window(ListOfTriggerPolicies,ListOfEvictionPolicies)
+~~~
 
-When applied to grouped data streams the data stream is batched/windowed for different key values separately. 
+By default most triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases, especially when time based windowing is used. You may want to trigger not only when an element arrives but also in between. Active policies provide this functionality. The predefined time-based policies are already implemented in such an active way and can hold as an example in case you want to implement your own user defined active policy. 
 
-For example a `dataStream.groupBy(0).batch(100, 10)` produces batches of the last 100 elements for each key value with 10 record step size.
+Time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, these policies already cover most use cases.
  
-#### Reduce on windowed/batched data streams
-The transformation calls a user-defined `ReduceFunction` on records received in the batch or during the predefined time window. The window is shifted after each reduce call. The user can also use the different streaming aggregations.
+#### Reduce on windowed data streams
+The transformation calls a user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different streaming aggregations.
 
 A window reduce that sums the elements in the last minute with 10 seconds slide interval:
 
 ~~~java
-dataStream.window(60000, 10000).sum();
+dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field);
 ~~~
 
-#### ReduceGroup on windowed/batched data streams
-The transformation calls a `GroupReduceFunction` for each data batch or data window. The batch/window slides by the predefined number of elements/time after each call.
+#### ReduceGroup on windowed data streams
+The transformation calls a `GroupReduceFunction` for each data batch or data window similarly as a reduce, but providing access to all elements in the window.
 
 ~~~java
-dataStream.batch(1000, 100).reduceGroup(reducer);
+dataStream.window(…).every(…).reduceGroup(reducer);
 ~~~
 
-#### Policy based windowing
-The policy based windowing is an highly flexible way to specify your stream discretisation also called windowing semantics. Two types of policies are used for such a specification:
+#### Grouped operations on windowed data streams
+Calling the `.groupBy(fields)` method on a windowed stream groups the elements by the given fields inside the windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a central fashion), but the user defined functions will be applied on a per group basis.
 
-1) `TriggerPolicy` This policy defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `.window(..).every(...)`, while we pass the triggering policy within `every`. Several predefind policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Anyhow, policies are in general UDFs and can implement any custom behaviour as well.
+The user can also create windows and triggers on a per group basis calling `.window(…).every(…)` on an already grouped data stream. To highlight the differences let us look at to examples.
 
-2) `Eviction Policy` This policy defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `.window(..)` operation on a stream. There are mostely the same predefined policy types provided as for trigger policies.
+To get the maximal value by key on the last 100 elements we use the first approach:
+~~~java
+dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
+~~~
+Using this approach we took the last 100 elements, divided it into groups by key then applied the aggregation.
 
-For example, if we want a fife tuples long sliding window that triggers every second we can write the following:
+To create fixed size windows for every key we need to reverse the order of the groupBy call. So to take the max for the last 100 elements in Each group:
 ~~~java
-myStream.window(Count.of(5)).every(Time.of(1,TimeUnit.SECOND))
+dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
 ~~~
+This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.
+
+### Temporal database style operators
+
+While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straight forward implementation is to apply these operators on windows of the data streams.
+
+Currently join and cross operators are supported on time windows.
+
+The Join transformation produces a new Tuple DataStream with two fields. Each tuple holds a joined element of the first input DataStream in the first tuple field and a matching element of the second input DataStream in the second field for the current window.
+
+The following code shows a default Join transformation using field position keys:
 
-In addition to this, multiple policies can be added to a `LinkedList` and then be used in parallel by writing the following:
 ~~~java
-myStream.window(ListOfTriggerPolicies,ListOfEvictionPolicies)
+dataStream1.join(dataStream2)
+		.onWindow(windowing_params)
+		.where(key_in_first)
+		.equalTo(key_in_second);
+~~~
+
+The Cross transformation combines two DataStreams into one DataStreams. It builds all pairwise combinations of the elements of both input DataStreams in the current window, i.e., it builds a temporal Cartesian product.
+
+~~~java
+dataStream1.cross(dataStream2).onWindow(windowing_params);
 ~~~
 
-Especially when time based windowing is used, you may want to trigger not only when an element arrives but also in between. Active policies provide this functionality. The predefined time-based policies are already implemented in such an active way and can hold as an example in case you want to implement your own user defined active policy. Anyhow, as our time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, this policies already cover most use cases.
 
 ### Co operators
 
-Co operators allow the users to jointly transform two `DataStreams` of different types providing a simple way to jointly manipulate a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or the in cases when user needs explicit track of the datas origin.
+Co operators allow the users to jointly transform two `DataStreams` of different types providing a simple way to jointly manipulate a shared state. It is designed to support joint stream transformations where merging is not appropriate due to different data types or in case the user needs explicit tracking of the joined stream origin.
 Co operators can be applied to `ConnectedDataStreams` which represent two `DataStreams` of possibly different types. A `ConnectedDataStream` can be created by calling the `connect(otherDataStream)` method of a `DataStream`. Please note that the two connected `DataStreams` can also be merged data streams.
 
 #### Map on ConnectedDataStream
@@ -354,26 +432,8 @@ dataStream1.connect(dataStream2)
         })
 ~~~
 
-#### windowReduceGroup on ConnectedDataStream
-The windowReduceGroup operator applies a user defined `CoGroupFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
-
-~~~java
-DataStream<Integer> dataStream1 = ...
-DataStream<String> dataStream2 = ...
-
-dataStream1.connect(dataStream2)
-    .windowReduceGroup(new CoGroupFunction<Integer, String, String>() {
-
-        @Override
-        public void coGroup(Iterable<Integer> first, Iterable<String> second,
-            Collector<String> out) throws Exception {
-
-            //Do something here
-
-        }
-    }, 10000, 5000);
-~~~
-
+#### windowReduce on ConnectedDataStream
+The windowReduce operator applies a user defined `CoWindowFunction` to time aligned windows of the two data streams and return zero or more elements of an arbitrary type. The user can define the window and slide intervals and can also implement custom timestamps to be used for calculating windows.
 
 #### Reduce on ConnectedDataStream
 The Reduce operator for the `ConnectedDataStream` applies a simple reduce transformation on the joined data streams and then maps the reduced elements to a common output type.
@@ -465,7 +525,9 @@ Setting parallelism for operators works exactly the same way as in the core Flin
 ### Buffer timeout
 
 By default data points are not transferred on the network one-by-one, which would cause unnecessary network traffic, but are buffered in the output buffers. The size of the output buffers can be set in the Flink config files. While this method is good for optimizing throughput, it can cause latency issues when the incoming stream is not fast enough.
-To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time the buffers are flushed automatically even if they are not full. Usage:
+To tackle this issue the user can call `env.setBufferTimeout(timeoutMillis)` on the execution environment (or on individual operators) to set a maximum wait time for the buffers to fill up. After this time the buffers are flushed automatically even if they are not full. The default value for this timeout is 100ms which should be appropriate for most use-cases. 
+
+Usage:
 
 ~~~java
 LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
@@ -474,8 +536,13 @@ env.setBufferTimeout(timeoutMillis);
 env.genereateSequence(1,10).map(new MyMapper()).setBufferTimeout(timeoutMillis);
 ~~~
 
+To maximise the throughput the user can call `.setBufferTimeout(-1)` which will remove the timeout and buffers will only be flushed when they are full.
+To minimise latency, set the timeout to a value close to 0 (fro example 5 or 10 ms). Theoretically a buffer timeout of 0 will cause all outputs to be flushed when produced, but this setting should be avoided because it can cause severe performance degradation.
+
 ### Mutability
 
+This is currently a beta feature and it is only supported for a subset of the available operators.
+
 Most operators allow setting mutability for reading input data. If the operator is set mutable then the variable used to store input data for operators will be reused in a mutable fashion to avoid excessive object creation. By default, all operators are set to immutable.
 Usage:
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 55f5e71..f0e4309 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -595,6 +595,20 @@ public class DataStream<OUT> {
 	}
 
 	/**
+	 * Applies an aggregation that gives the maximum of the data stream at the
+	 * given position.
+	 * 
+	 * @param positionToMax
+	 *            The position in the data point to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+		checkFieldRange(positionToMax);
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
+				AggregationType.MAX));
+	}
+
+	/**
 	 * Applies an aggregation that that gives the maximum of the pojo data
 	 * stream at the given field expression. A field expression is either the
 	 * name of a public field or a getter method with parentheses of the
@@ -685,20 +699,6 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the maximum of the data stream at the
-	 * given position.
-	 * 
-	 * @param positionToMax
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
-		checkFieldRange(positionToMax);
-		return aggregate(ComparableAggregator.getAggregator(positionToMax, getOutputType(),
-				AggregationType.MAX));
-	}
-
-	/**
 	 * Applies an aggregation that that gives the current element with the
 	 * maximum value at the given position, if more elements have the maximum
 	 * value at the given position, the operator returns the first one by
@@ -745,16 +745,27 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * This allows you to set up windowing through a nice API using
-	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
-	 * {@link Delta}. Windowing allows the user to apply different user defined
-	 * functions on predefined chunks of the data stream. For example a reducer
-	 * could be applied on every 5 seconds of data to count the elements in that
-	 * time window.
+	 * Create a {@link WindowedDataStream} that can be used to apply
+	 * transformation like {@link WindowedDataStream#reduce} or aggregations on
+	 * preset chunks(windows) of the data stream. To define the windows one or
+	 * more {@link WindowingHelper} such as {@link Time}, {@link Count} and
+	 * {@link Delta} can be used.</br></br> When applied to a grouped data
+	 * stream, the windows (evictions) and slide sizes (triggers) will be
+	 * computed on a per group basis. </br></br> For more advanced control over
+	 * the trigger and eviction policies please refer to
+	 * {@link #window(triggers, evicters)} </br> </br> For example to create a
+	 * sum every 5 seconds in a tumbling fashion:</br>
+	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
+	 * create sliding windows use the
+	 * {@link WindowedDataStream#every(WindowingHelper...)} </br></br> The same
+	 * example with 3 second slides:</br>
+	 * 
+	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,
+	 *       TimeUnit.SECONDS)).sum(field)}
 	 * 
 	 * @param policyHelpers
 	 *            Any {@link WindowingHelper} such as {@link Time},
-	 *            {@link Count} and {@link Delta}.
+	 *            {@link Count} and {@link Delta} to define the window.
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
@@ -763,11 +774,11 @@ public class DataStream<OUT> {
 	}
 
 	/**
-	 * Set up a windowed data stream using the given {@link TriggerPolicy}s and
-	 * {@link EvictionPolicy}s. Windowing allows the user to apply different
-	 * user defined functions on predefined chunks of the data stream. For
-	 * example a reducer could be applied on every 5 seconds of data to count
-	 * the elements in that time window.
+	 * Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
+	 * s and {@link EvictionPolicy}s. Windowing can be used to apply
+	 * transformation like {@link WindowedDataStream#reduce} or aggregations on
+	 * preset chunks(windows) of the data stream.</br></br>For most common
+	 * use-cases please refer to {@link #window(WindowingHelper...)}
 	 * 
 	 * @param triggers
 	 *            The list of {@link TriggerPolicy}s that will determine how

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 294a56f..714807c 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import org.apache.commons.lang3.SerializationException;
 import org.apache.commons.lang3.SerializationUtils;
 import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.streamvertex.StreamingRuntimeContext;
@@ -71,6 +72,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	/**
+	 * This is a beta feature, use with care </br><br/>
 	 * Sets the mutability of the operator. If the operator is set to mutable,
 	 * the tuples received in the user defined functions, will be reused after
 	 * the function call. Setting an operator to mutable reduces garbage
@@ -123,11 +125,11 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	/**
-	 * Register an operator state for this operator by the given name. This name
-	 * can be used to retrieve the state during runtime using
-	 * {@link StreamingRuntimeContext#getState(String)}. To obtain the
-	 * {@link StreamingRuntimeContext} from the user-defined function use the
-	 * {@link RichFunction#getRuntimeContext()} method.
+	 * This is a beta feature </br></br> Register an operator state for this
+	 * operator by the given name. This name can be used to retrieve the state
+	 * during runtime using {@link StreamingRuntimeContext#getState(String)}. To
+	 * obtain the {@link StreamingRuntimeContext} from the user-defined function
+	 * use the {@link RichFunction#getRuntimeContext()} method.
 	 * 
 	 * @param name
 	 *            The name of the operator state.
@@ -135,23 +137,23 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 *            The state to be registered for this name.
 	 * @return The data stream with state registered.
 	 */
-	public SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
+	protected SingleOutputStreamOperator<OUT, O> registerState(String name, OperatorState<?> state) {
 		jobGraphBuilder.addOperatorState(getId(), name, state);
 		return this;
 	}
 
 	/**
-	 * Register operator states for this operator provided in a map. The
-	 * registered states can be retrieved during runtime using
-	 * {@link StreamingRuntimeContext#getState(String)}. To obtain the
-	 * {@link StreamingRuntimeContext} from the user-defined function use the
-	 * {@link RichFunction#getRuntimeContext()} method.
+	 * This is a beta feature </br></br> Register operator states for this
+	 * operator provided in a map. The registered states can be retrieved during
+	 * runtime using {@link StreamingRuntimeContext#getState(String)}. To obtain
+	 * the {@link StreamingRuntimeContext} from the user-defined function use
+	 * the {@link RichFunction#getRuntimeContext()} method.
 	 * 
 	 * @param states
 	 *            The map containing the states that will be registered.
 	 * @return The data stream with states registered.
 	 */
-	public SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
+	protected SingleOutputStreamOperator<OUT, O> registerState(Map<String, OperatorState<?>> states) {
 		for (Entry<String, OperatorState<?>> entry : states.entrySet()) {
 			jobGraphBuilder.addOperatorState(getId(), entry.getKey(), entry.getValue());
 		}
@@ -160,8 +162,18 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	}
 
 	@SuppressWarnings("unchecked")
-	public SingleOutputStreamOperator<OUT, O> partitionBy(int keyposition) {
-		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keyposition);
+	public SingleOutputStreamOperator<OUT, O> partitionBy(int... keypositions) {
+		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keypositions);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> partitionBy(String... fields) {
+		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(fields);
+	}
+
+	@SuppressWarnings("unchecked")
+	public SingleOutputStreamOperator<OUT, O> partitionBy(KeySelector<OUT, ?> keySelector) {
+		return (SingleOutputStreamOperator<OUT, O>) super.partitionBy(keySelector);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 15e5110..43b3993 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
@@ -43,10 +44,10 @@ import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
 
 /**
- * A {@link WindowedDataStream} represents a data stream whose elements are
- * batched together in a sliding batch. operations like
- * {@link #reduce(ReduceFunction)} or {@link #reduceGroup(GroupReduceFunction)}
- * are applied for each batch and the batch is slid afterwards.
+ * A {@link WindowedDataStream} represents a data stream that has been divided
+ * into windows (predefined chunks). User defined function such as
+ * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} or
+ * aggregations can be applied to the windows.
  *
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
@@ -122,6 +123,20 @@ public class WindowedDataStream<OUT> {
 		this.allCentral = windowedDataStream.allCentral;
 	}
 
+	/**
+	 * Defines the slide size (trigger frequency) for the windowed data stream.
+	 * This controls how often the user defined function will be triggered on
+	 * the window. </br></br> For example to get a window of 5 elements with a
+	 * slide of 2 seconds use: </br></br>
+	 * {@code ds.window(Count.of(5)).every(Time.of(2,TimeUnit.SECONDS))}
+	 * </br></br> The user function in this case will be called on the 5 most
+	 * recent elements every 2 seconds
+	 * 
+	 * @param policyHelpers
+	 *            The policies that define the triggering frequency
+	 * 
+	 * @return The windowed data stream with triggering set
+	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) {
 		WindowedDataStream<OUT> ret = this.copy();
@@ -137,11 +152,15 @@ public class WindowedDataStream<OUT> {
 
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} by the given key
-	 * positions to be used with grouped operators.
+	 * positions. The window sizes (evictions) and slide sizes (triggers) will
+	 * be calculated on the whole stream (in a central fashion), but the user
+	 * defined functions will be applied on a per group basis. </br></br> To get
+	 * windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
 	 * 
 	 * @param fields
 	 *            The position of the fields to group by.
-	 * @return The transformed {@link WindowedDataStream}
+	 * @return The grouped {@link WindowedDataStream}
 	 */
 	public WindowedDataStream<OUT> groupBy(int... fields) {
 		WindowedDataStream<OUT> ret = this.copy();
@@ -153,11 +172,19 @@ public class WindowedDataStream<OUT> {
 
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} by the given field
-	 * expressions to be used with grouped operators.
+	 * expressions. The window sizes (evictions) and slide sizes (triggers) will
+	 * be calculated on the whole stream (in a central fashion), but the user
+	 * defined functions will be applied on a per group basis. </br></br> To get
+	 * windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
+	 * </br></br> A field expression is either the name of a public field or a
+	 * getter method with parentheses of the stream's underlying type. A dot can
+	 * be used to drill down into objects, as in
+	 * {@code "field1.getInnerField2()" }.
 	 * 
 	 * @param fields
-	 *            The position of the fields to group by.
-	 * @return The transformed {@link WindowedDataStream}
+	 *            The fields to group by
+	 * @return The grouped {@link WindowedDataStream}
 	 */
 	public WindowedDataStream<OUT> groupBy(String... fields) {
 		WindowedDataStream<OUT> ret = this.copy();
@@ -168,13 +195,16 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Groups the elements of the {@link WindowedDataStream} by the given
-	 * {@link KeySelector} to be used with grouped operators.
+	 * Groups the elements of the {@link WindowedDataStream} using the given
+	 * {@link KeySelector}. The window sizes (evictions) and slide sizes
+	 * (triggers) will be calculated on the whole stream (in a central fashion),
+	 * but the user defined functions will be applied on a per group basis.
+	 * </br></br> To get windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
 	 * 
 	 * @param keySelector
-	 *            The specification of the key on which the
-	 *            {@link WindowedDataStream} will be grouped.
-	 * @return The transformed {@link WindowedDataStream}
+	 *            The keySelector used to extract the key for grouping.
+	 * @return The grouped {@link WindowedDataStream}
 	 */
 	public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
 		WindowedDataStream<OUT> ret = this.copy();
@@ -185,17 +215,14 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * This is a prototype implementation for new windowing features based on
-	 * trigger and eviction policies
+	 * Applies a reduce transformation on the windowed data stream by reducing
+	 * the current window at every trigger.The user can also extend the
+	 * {@link RichReduceFunction} to gain access to other features provided by
+	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 * 
-	 * @param triggerHelpers
-	 *            A list of trigger policies
-	 * @param evictionHelpers
-	 *            A list of eviction policies
-	 * @param sample
-	 *            A sample of the OUT data type required to gather type
-	 *            information
-	 * @return The single output operator
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
 	 */
 	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
 		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
@@ -203,6 +230,19 @@ public class WindowedDataStream<OUT> {
 				getReduceInvokable(reduceFunction));
 	}
 
+	/**
+	 * Applies a reduceGroup transformation on the windowed data stream by
+	 * reducing the current window at every trigger. In contrast with the
+	 * standard binary reducer, with reduceGroup the user can access all
+	 * elements of the window at the same time through the iterable interface.
+	 * The user can also extend the {@link RichGroupReduceFunction} to gain
+	 * access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
+	 */
 	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
 			GroupReduceFunction<OUT, R> reduceFunction) {
 		return dataStream.addFunction("NextGenWindowReduce", reduceFunction,
@@ -211,11 +251,11 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that sums every sliding batch/window of the data
-	 * stream at the given position.
+	 * Applies an aggregation that sums every window of the data stream at the
+	 * given position.
 	 * 
 	 * @param positionToSum
-	 *            The position in the data point to sum
+	 *            The position in the tuple/array to sum
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
@@ -225,24 +265,14 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Syntactic sugar for sum(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> sum() {
-		return sum(0);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the sum of the pojo data stream at
-	 * the given field expression. A field expression is either the name of a
-	 * public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that sums every window of the pojo data stream at
+	 * the given field for every window. </br></br> A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * as in {@code "field1.getInnerField2()" }.
 	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 * @param positionToSum
+	 *            The field to sum
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
@@ -251,11 +281,11 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum of every sliding
-	 * batch/window of the data stream at the given position.
+	 * Applies an aggregation that that gives the minimum value of every window
+	 * of the data stream at the given position.
 	 * 
 	 * @param positionToMin
-	 *            The position in the data point to minimize
+	 *            The position to minimize
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
@@ -265,13 +295,29 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the minimum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same minimum value the operator returns the first element by
-	 * default.
+	 * Applies an aggregation that that gives the minimum value of the pojo data
+	 * stream at the given field expression for every window. </br></br>A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MIN, false));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
 	 * 
 	 * @param positionToMinBy
-	 *            The position in the data point to minimize
+	 *            The position to minimize by
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
@@ -279,13 +325,13 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the minimum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same minimum value the operator returns either the first or last
-	 * one depending on the parameter setting.
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns either the first or last one depending
+	 * on the parameter setting.
 	 * 
 	 * @param positionToMinBy
-	 *            The position in the data point to minimize
+	 *            The position to minimize
 	 * @param first
 	 *            If true, then the operator return the first element with the
 	 *            minimum value, otherwise returns the last
@@ -298,20 +344,31 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Syntactic sugar for min(0)
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
 	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> min() {
-		return min(0);
+	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
+				AggregationType.MINBY, first));
 	}
 
 	/**
-	 * Applies an aggregation that gives the maximum of every sliding
-	 * batch/window of the data stream at the given position.
+	 * Applies an aggregation that gives the maximum value of every window of
+	 * the data stream at the given position.
 	 * 
 	 * @param positionToMax
-	 *            The position in the data point to maximize
+	 *            The position to maximize
 	 * @return The transformed DataStream.
 	 */
 	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
@@ -321,106 +378,60 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies an aggregation that gives the maximum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same maximum value the operator returns the first by default.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
-		return this.maxBy(positionToMaxBy, true);
-	}
-
-	/**
-	 * Applies an aggregation that gives the maximum element of every sliding
-	 * batch/window of the data stream by the given position. If more elements
-	 * have the same maximum value the operator returns either the first or last
-	 * one depending on the parameter setting.
-	 * 
-	 * @param positionToMaxBy
-	 *            The position in the data point to maximize
-	 * @param first
-	 *            If true, then the operator return the first element with the
-	 *            maximum value, otherwise returns the last
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
-		dataStream.checkFieldRange(positionToMaxBy);
-		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
-				AggregationType.MAXBY, first));
-	}
-
-	/**
-	 * Syntactic sugar for max(0)
-	 * 
-	 * @return The transformed DataStream.
-	 */
-	public SingleOutputStreamOperator<OUT, ?> max() {
-		return max(0);
-	}
-
-	/**
-	 * Applies an aggregation that that gives the minimum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that that gives the maximum value of the pojo data
+	 * stream at the given field expression for every window. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream}S underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }.
 	 * 
 	 * @param field
 	 *            The field expression based on which the aggregation will be
 	 *            applied.
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+	public SingleOutputStreamOperator<OUT, ?> max(String field) {
 		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MIN, false));
+				AggregationType.MAX, false));
 	}
 
 	/**
-	 * Applies an aggregation that that gives the maximum of the pojo data
-	 * stream at the given field expression. A field expression is either the
-	 * name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
 	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 * @param positionToMaxBy
+	 *            The position to maximize by
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> max(String field) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MAX, false));
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
 	}
 
 	/**
-	 * Applies an aggregation that that gives the minimum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns either the first or last one depending
+	 * on the parameter setting.
 	 * 
-	 * @param field
-	 *            The field expression based on which the aggregation will be
-	 *            applied.
+	 * @param positionToMaxBy
+	 *            The position to maximize by
 	 * @param first
-	 *            If True then in case of field equality the first object will
-	 *            be returned
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
 	 * @return The transformed DataStream.
 	 */
-	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
-		return aggregate(ComparableAggregator.getAggregator(field, getOutputType(),
-				AggregationType.MINBY, first));
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
+		dataStream.checkFieldRange(positionToMaxBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getOutputType(),
+				AggregationType.MAXBY, first));
 	}
 
 	/**
 	 * Applies an aggregation that that gives the maximum element of the pojo
-	 * data stream by the given field expression. A field expression is either
-	 * the name of a public field or a getter method with parentheses of the
-	 * {@link DataStream}S underlying type. A dot can be used to drill down into
-	 * objects, as in {@code "field1.getInnerField2()" }.
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
 	 * 
 	 * @param field
 	 *            The field expression based on which the aggregation will be

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index 73f7daf..840546f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -85,8 +85,10 @@ public class Count implements WindowingHelper {
 	}
 
 	/**
-	 * Specifies on which element a trigger or an eviction should happen (based
-	 * on the count of the elements)
+	 * Specifies a count based eviction (window size) or trigger policy (slide
+	 * size). For eviction 'count' defines the number of elements in each
+	 * window. For trigger 'count' defines how often do we call the user
+	 * function in terms of number of elements received.
 	 * 
 	 * @param count
 	 *            the number of elements to count before trigger/evict

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index ce75fc5..9c8c5ca 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -65,7 +65,14 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
 	}
 
 	/**
-	 * Creates a delta helper representing a delta count or eviction policy
+	 * Creates a delta helper representing a delta trigger or eviction policy.
+	 * </br></br> This policy calculates a delta between the data point which
+	 * triggered last and the currently arrived data point. It triggers if the
+	 * delta is higher than a specified threshold. </br></br> In case it gets
+	 * used for eviction, this policy starts from the first element of the
+	 * buffer and removes all elements from the buffer which have a higher delta
+	 * then the threshold. As soon as there is an element with a lower delta,
+	 * the eviction stops.
 	 * 
 	 * @param deltaFunction
 	 *            The delta function which should be used to calculate the delta

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 908d0cf..d987e32 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -75,8 +75,9 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	}
 
 	/**
-	 * Creates an helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size).
 	 * 
 	 * @param length
 	 *            The number of time units
@@ -92,8 +93,9 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	}
 
 	/**
-	 * Creates an helper representing a trigger which triggers every given
-	 * length or an eviction which evicts all elements older than length.
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size) using a user defined timestamp extractor.
 	 * 
 	 * @param length
 	 *            The number of time units

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4e046a9b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
index 128d3bf..8c26e4a 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/TimeWindowingExample.java
@@ -64,7 +64,7 @@ public class TimeWindowingExample {
 				.window(Count.of(100))
 				.every(Time.of(1000, TimeUnit.MILLISECONDS))
 				.groupBy(myKey)
-				.sum();
+				.sum(0);
 
 		stream.print();
 


Mime
View raw message