flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [11/20] flink git commit: [streaming] Documentations updated to match the reworked windowing semantics
Date Mon, 16 Feb 2015 14:25:37 GMT
[streaming] Documentations updated to match the reworked windowing semantics


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f571ece6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f571ece6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f571ece6

Branch: refs/heads/master
Commit: f571ece6c2a1728bdd90195309546771686e0277
Parents: 4470207
Author: Gyula Fora <gyfora@apache.org>
Authored: Sat Feb 14 11:41:09 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:08 2015 +0100

----------------------------------------------------------------------
 docs/streaming_guide.md                         | 100 +++++++++++++------
 .../streaming/api/datastream/DataStream.java    |  44 ++++----
 .../api/datastream/DiscretizedStream.java       |  29 ++----
 .../api/datastream/WindowedDataStream.java      |  74 ++++++++------
 .../api/function/RichWindowMapFunction.java     |   9 ++
 .../api/function/WindowMapFunction.java         |   7 ++
 .../windowing/GroupedStreamDiscretizer.java     |  10 +-
 .../windowing/GroupedTimeDiscretizer.java       |   4 +
 .../operator/windowing/StreamDiscretizer.java   |   6 ++
 .../operator/windowing/WindowFlattener.java     |   4 +
 .../operator/windowing/WindowMapper.java        |   5 +
 .../operator/windowing/WindowMerger.java        |   5 +
 .../operator/windowing/WindowPartitioner.java   |   7 +-
 .../operator/windowing/WindowReducer.java       |   5 +
 .../streaming/api/windowing/StreamWindow.java   |  81 +++++++++++++++
 .../windowbuffer/BasicWindowBuffer.java         |   4 +
 .../windowbuffer/CompletePreAggregator.java     |   4 +
 .../windowbuffer/TumblingGroupedPreReducer.java |   3 +
 .../windowbuffer/TumblingPreReducer.java        |   3 +
 .../windowing/windowbuffer/WindowBuffer.java    |   4 +
 .../flink/streaming/api/scala/DataStream.scala  |  18 ++--
 .../api/scala/WindowedDataStream.scala          |  29 +++---
 22 files changed, 322 insertions(+), 133 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 3476390..e409b49 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -42,7 +42,7 @@ Add the following dependency to your `pom.xml` to use the Flink Streaming.
 <dependency>
     <groupId>org.apache.flink</groupId>
     <artifactId>flink-streaming-core</artifactId>
-    <version>{{site.FLINK_VERSION_SHORT}}</version>
+    <version>{{site.FLINK_VERSION_STABLE}}</version>
 </dependency>
 ~~~
 
@@ -162,11 +162,11 @@ Usage: `dataStream.shuffle()`
  * *Distribute*: Distribute partitioning directs the output data stream to the next operator in a round-robin fashion, achieving a balanced distribution.
 Usage: `dataStream.distribute()`
  * *Field/Key*: Field/Key partitioning partitions the output data stream based on the hash code of a selected key of the tuples. Data points with the same key are directed to the same operator instance. The user can define keys by field positions (for tuple and array types), field expressions (for Pojo types) and custom keys using the `KeySelector` interface. 
-Usage: `dataStream.groupBy(keys)`
+Usage: `dataStream.partitionBy(keys)`
  * *Broadcast*: Broadcast partitioning sends the output data stream to all parallel instances of the next operator.
 Usage: `dataStream.broadcast()`
- * *Global*: All data are sent to the first instance of the next processing operator. Use this option with care to avoid serious performance bottlenecks.
-Usage: `dataStream.global()`
+ * *Global*: All data points end up at the same operator instance. To achieve this use the parallelism setting of the corresponding operator.
+Usage: `operator.setParallelism(1)`
 
 ### Sources
 
@@ -292,16 +292,24 @@ There is also an option to apply user defined aggregations with the usage of the
 
 ### Window operators
 
-Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduction or aggregation operations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.
+Flink streaming provides very flexible windowing semantics to create arbitrary windows (also referred to as discretizations or slices) of the data streams and apply reduce, map or aggregation operations on the windows acquired. Windowing can be used for instance to create rolling aggregations of the most recent N elements, where N could be defined by Time, Count or any arbitrary user defined measure.
 
-The user can control the size (eviction) of the windows and the frequency of reduction or aggregation calls (triggers) on them in an intuitive API:
+The user can control the size (eviction) of the windows and the frequency of reduction or aggregation calls (trigger) on them in an intuitive API (some examples):
 
+ * `dataStream.window(eviction).every(trigger).reduceWindow(…)`
+ * `dataStream.window(…).every(…).mapWindow(…).flatten()`
+ * `dataStream.window(…).every(…).groupBy(…).aggregate(…).getDiscretizedStream()`
 
- * `dataStream.window(…).every(…).reduce(…)`
- * `dataStream.window(…).every(…).reduceGroup(…)`
- * `dataStream.window(…).every(…).aggregate(…)`
+The core abstraction of the Windowing semantics is the `WindowedDataStream` and the `StreamWindow`. The WindowedDataStream which is created when we call the `.window(…)` method of the DataStream, represents the windowed discretisation of the underlying stream. The user can think about it simply as a `DataStream<StreamWindow<T>>` where additional API functions are supplied to provide efficient transformations of individual windows. 
 
-The next example would create windows that hold elements of the last 5 seconds, and the user defined aggregation/reduce is executed on the windows every second (sliding the window by 1 second):
+The result of a window transformation is again a `WindowedDataStream` which can also be used to further transform the resulting windows. In this sense, window transformations define mapping from `StreamWindow -> StreamWindow’`.
+
+The user have different ways of working further with a result of a window operation:
+ * `windowedDataStream.flatten()` - Which streams out the results element wise and returns a `DataStream<T>` where T is the type of the underlying windowed stream
+ * `windowedDataStream.getDiscretizedStream()` - Which returns a `DataStream<StreamWindow<T>>` for applying some advanced logic on the stream windows itself
+ * Calling any window transformation further transforms the windows
+
+The next example would create windows that hold elements of the last 5 seconds, and the user defined transformation would be executed on the windows every second (sliding the window by 1 second):
 
 ~~~java
 dataStream.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(1, TimeUnit.SECONDS))
@@ -313,7 +321,7 @@ This approach is often referred to as policy based windowing. Different policies
 dataStream.window(Count.of(100)).every(Time.of(1, TimeUnit.MINUTES))
 ~~~
 
-The user can also omit the `.every(…)` call which results in a tumbling window emptying the window after every aggregation call.
+The user can also omit the `.every(…)` call which results in a tumbling window emptying the window after every transformation call.
 
 Several predefined policies are provided in the API, including delta-based, count-based and time-based policies. These can be accessed through the static methods provided by the `PolicyHelper` classes:
 
@@ -328,26 +336,22 @@ The policy based windowing is a highly flexible way to specify stream discretisa
 
  * `TriggerPolicy` defines when to trigger the reduce UDF on the current window and emit the result. In the API it completes a window statement such as: `.window(…).every(…)`, while the triggering policy is passed within `every`. 
 
-When multiple triggers are used, the reduction or aggregation is executed at every trigger.
-
 Several predefined policies are provided in the API, including delta-based, punctuation based, count-based and time-based policies. Policies are in general UDFs and can implement any custom behaviour.
 
  * `EvictionPolicy` defines the length of a window as a means of a predicate for evicting tuples when they are no longer needed. In the API this can be defined by the `.window(…)` operation on a stream. There are mostly the same predefined policy types provided as for trigger policies.
 
-When multiple evictions are used the strictest one controls the elements in the window. For instance in the call `dataStream.window(Count.of(5), Time.of(1,TimeUnit.SECONDS)).every(…)` produces a window of maximum 5 elements which have arrived in the last second.
-
-In addition to the `dataStream.window(…).every(…)` style users can specifically pass the list of trigger and eviction policies during the window call:
+In addition to the `dataStream.window(…).every(…)` style users can specifically pass the trigger and eviction policies during the window call:
 
 ~~~java
-dataStream.window(ListOfTriggerPolicies, ListOfEvictionPolicies)
+dataStream.window(TriggerPolicy, EvictionPolicy)
 ~~~
 
-By default most triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases, especially when time based windowing is applied. To also provide trigering between elements so called active policies can be used. The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations. 
+By default most triggers can only trigger when a new element arrives. This might not be suitable for all the use-cases, especially when time based windowing is applied. To also provide triggering between elements so called active policies can be used. The predefined time-based policies are already implemented in such a way and can hold as an example for user defined active policy implementations. 
 
 Time-based trigger and eviction policies can work with user defined `TimeStamp` implementations, these policies already cover most use cases.
  
 #### Reduce on windowed data streams
-The transformation calls a user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different streaming aggregations.
+The `WindowedDataStream<T>.reduceWindow(ReduceFunction<T>)` transformation calls the user-defined `ReduceFunction` at every trigger on the records currently in the window. The user can also use the different pre-implemented streaming aggregations: `sum, min, max, minBy, maxBy`
 
 A window reduce that sums the elements in the last minute with 10 seconds slide interval:
 
@@ -355,19 +359,21 @@ A window reduce that sums the elements in the last minute with 10 seconds slide
 dataStream.window(Time.of(1, TimeUnit.MINUTES)).every(Time.of(10,TimeUnit.SECONDS)).sum(field);
 ~~~
 
-#### ReduceGroup on windowed data streams
-The transformation calls a `GroupReduceFunction` for each data batch or data window similarly as a reduce, but providing access to all elements in the window.
+#### Map on windowed data streams
+The `WindowedDataStream<T>.mapWindow(WindowMapFunction<T,O>)` transformation calls  `WindowMapFunction.mapWindow(…)` for each `StreamWindow` in the discretised stream providing access to all elements in the window through the iterable interface. At each function call the output `StreamWindow<O>` will consist of all the elements collected to the collector. This allows a straightforward way of mapping one stream window to another.
 
 ~~~java
-dataStream.window(…).every(…).reduceGroup(reducer);
+windowedDataStream.mapWindow(windowMapFunction)
 ~~~
 
 #### Grouped operations on windowed data streams
-Calling the `.groupBy(fields)` method on a windowed stream groups the elements by the given fields inside the windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a central fashion), but the user defined functions will be applied on a per group basis.
+Calling the `.groupBy(…)` method on a windowed stream groups the elements by the given fields inside the stream windows. The window sizes (evictions) and slide sizes (triggers) will be calculated on the whole stream (in a global fashion), but the user defined functions will be applied on a per group basis. This means that for a call `windowedStream.groupBy(…).reduceWindow(…)` will transform each window into another window consisting of as many elements as groups, with the reduced values per key. Similarly the `mapWindow` transformation is applied per group as well.
 
-The user can also create windows and triggers on a per group basis calling `.window(…).every(…)` on an already grouped data stream. To highlight the differences let us look at to examples.
+The user can also create discretisation on a per group basis calling `.window(…).every(…)` on an already grouped data stream. This will apply the discretisation logic independently for each key.
 
-To get the maximal value by key on the last 100 elements we use the first approach:
+To highlight the differences let us look at two examples.
+
+To get the maximal value for each key on the last 100 elements (global) we use the first approach:
 
 ~~~java
 dataStream.window(Count.of(100)).every(…).groupBy(groupingField).max(field);
@@ -383,9 +389,40 @@ dataStream.groupBy(groupingField).window(Count.of(100)).every(…).max(field);
 
 This will create separate windows for different keys and apply the trigger and eviction policies on a per group basis.
 
+#### Applying multiple transformations on a window
+Using the `WindowedDataStream` abstraction we can apply several transformations one after another on the discretised streams without having to re-discretise it:
+
+~~~java
+dataStream.window(Count.of(1000)).groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…).flatten()
+~~~
+
+The above call would create global windows of 1000 elements group it by the first key and then apply a mapWindow transformation. The resulting windowed stream will then be grouped by the second key and further reduced. The results of the reduce transformation are then flattened.
+
+Notice here that we only defined the window size once at the beginning of the transformation. This means that anything that happens afterwards (`.groupBy(firstKey).mapWindow(…).groupBy(secondKey).reduceWindow(…)`) happens inside the 1000 element windows. Of course the mapWindow might reduce the number of elements but the idea is that each transformation still corresponds to the same 1000 elements in the original stream.
+
+#### Global vs local discretisation
+By default all window discretisation calls (`dataStream.window(…)`) define global windows meaning that a global window of count 100 will contain the last 100 elements arrived at the discretisation operator in order. In most cases (except for Time) this means that the operator doing the actual discretisation needs to have a degree of parallelism of 1 to be able to correctly execute the discretisation logic. 
+
+Sometimes it is enough to create local discretisations, which allows the discretiser to run in parallel and apply the given discretisation logic at every discretiser instance. To allow local discretisation use the `.local()` method of the windowed data stream.
+
+Example:
+
+~~~java
+dataStream.window(Count.of(100)).maxBy(field);
+~~~
+
+This would create global windows of 100 elements (Count discretises with parallelism of 1) and return the record with the max value by the selected field, while
+
+~~~java
+dataStream.window(Count.of(100)).local().maxBy(field);
+~~~
+
+this would create several count discretisers (as defined by the environment parallelism) and compute the max values accordingly.
+
+
 ### Temporal database style operators
 
-While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straight forward implementation is to apply these operators on windows of the data streams.
+While database style operators like joins (on key) and crosses are hard to define properly on data streams, a straight forward implementation is to apply these operators on windows of the data streams. 
 
 Currently join and cross operators are supported on time windows.
 
@@ -406,6 +443,7 @@ The Cross transformation combines two DataStreams into one DataStreams. It build
 dataStream1.cross(dataStream2).onWindow(windowing_params);
 ~~~
 
+Please note that this is currently not integrated with the windowing semantics, integration is work in progress.
 
 ### Co operators
 
@@ -475,7 +513,7 @@ DataStream<Integer> odd = split.select("odd");
 
 In the above example the data stream named ‘even’ will only contain elements that are directed to the output named “even”. The user can of course further transform these new stream by for example squaring only the even elements.
 
-Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, by simply applying the transformation on the split data stream without select provides this functionality.
+Data streams only receive the elements directed to selected output names. The user can also select multiple output names by `splitStream.select(“output1”, “output2”…)`. It is common that a stream listens to all the outputs, so `split.selectAll()` provides this functionality without having to select all names.
 
 The outputs of an operator are directed by implementing a selector function (implementing the `OutputSelector` interface):
 
@@ -857,7 +895,7 @@ After installing Docker an image can be pulled for each connector. Containers ca
 For the easiest set up create a jar with all the dependencies of the *flink-streaming-connectors* project.
 
 ~~~batch
-cd /PATH/TO/GIT/flink/flink-staging/flink-streaming-connectors
+cd /PATH/TO/GIT/incubator-flink/flink-addons/flink-streaming-connectors
 mvn assembly:assembly
 ~~~batch
 
@@ -974,14 +1012,14 @@ Now a terminal started running from the image with all the necessary configurati
 
 To have the latest version of Flink type:
 ~~~batch
-cd /git/flink/
+cd /git/incubator-flink/
 git pull
 ~~~
 
 Then build the code with:
 
 ~~~batch
-cd /git/flink/flink-staging/flink-streaming/flink-streaming-connectors/
+cd /git/incubator-flink/flink-addons/flink-streaming/flink-streaming-connectors/
 mvn install -DskipTests
 ~~~
 
@@ -1009,4 +1047,4 @@ In the example there are to connectors. One that sends messages to Flume and one
 <DATE> INFO flume.FlumeTopology: String: <five> arrived from Flume
 ~~~
 
-[Back to top](#top)
+[Back to top](#top)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index 71a97f8..f221e4b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -850,14 +850,15 @@ public class DataStream<OUT> {
 
 	/**
 	 * Create a {@link WindowedDataStream} that can be used to apply
-	 * transformation like {@link WindowedDataStream#reduce} or aggregations on
-	 * preset chunks(windows) of the data stream. To define the windows one or
-	 * more {@link WindowingHelper} such as {@link Time}, {@link Count} and
+	 * transformation like {@link WindowedDataStream#reduceWindow},
+	 * {@link WindowedDataStream#mapWindow} or aggregations on preset
+	 * chunks(windows) of the data stream. To define windows a
+	 * {@link WindowingHelper} such as {@link Time}, {@link Count} and
 	 * {@link Delta} can be used.</br></br> When applied to a grouped data
 	 * stream, the windows (evictions) and slide sizes (triggers) will be
 	 * computed on a per group basis. </br></br> For more advanced control over
 	 * the trigger and eviction policies please refer to
-	 * {@link #window(triggers, evicters)} </br> </br> For example to create a
+	 * {@link #window(trigger, eviction)} </br> </br> For example to create a
 	 * sum every 5 seconds in a tumbling fashion:</br>
 	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).sum(field)} </br></br> To
 	 * create sliding windows use the
@@ -867,33 +868,34 @@ public class DataStream<OUT> {
 	 * {@code ds.window(Time.of(5, TimeUnit.SECONDS)).every(Time.of(3,
 	 *       TimeUnit.SECONDS)).sum(field)}
 	 * 
-	 * @param policyHelpers
+	 * @param policyHelper
 	 *            Any {@link WindowingHelper} such as {@link Time},
-	 *            {@link Count} and {@link Delta} to define the window.
+	 *            {@link Count} and {@link Delta} to define the window size.
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public WindowedDataStream<OUT> window(WindowingHelper policyHelpers) {
-		return new WindowedDataStream<OUT>(this, policyHelpers);
+	public WindowedDataStream<OUT> window(WindowingHelper policyHelper) {
+		return new WindowedDataStream<OUT>(this, policyHelper);
 	}
 
 	/**
 	 * Create a {@link WindowedDataStream} using the given {@link TriggerPolicy}
-	 * s and {@link EvictionPolicy}s. Windowing can be used to apply
-	 * transformation like {@link WindowedDataStream#reduce} or aggregations on
-	 * preset chunks(windows) of the data stream.</br></br>For most common
-	 * use-cases please refer to {@link #window(WindowingHelper...)}
-	 * 
-	 * @param triggers
-	 *            The list of {@link TriggerPolicy}s that will determine how
-	 *            often the user function is called on the window.
-	 * @param evicters
-	 *            The list of {@link EvictionPolicy}s that will determine the
-	 *            number of elements in each time window.
+	 * and {@link EvictionPolicy}. Windowing can be used to apply transformation
+	 * like {@link WindowedDataStream#reduceWindow},
+	 * {@link WindowedDataStream#mapWindow} or aggregations on preset
+	 * chunks(windows) of the data stream.</br></br>For most common use-cases
+	 * please refer to {@link #window(WindowingHelper)}
+	 * 
+	 * @param trigger
+	 *            The {@link TriggerPolicy} that will determine how often the
+	 *            user function is called on the window.
+	 * @param eviction
+	 *            The {@link EvictionPolicy} that will determine the number of
+	 *            elements in each time window.
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
-	public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> evicter) {
-		return new WindowedDataStream<OUT>(this, trigger, evicter);
+	public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> eviction) {
+		return new WindowedDataStream<OUT>(this, trigger, eviction);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index 7ab7c2c..bc9f5b9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.datastream;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -67,16 +66,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		return discretizedStream;
 	}
 
-	/**
-	 * Applies a reduce transformation on the windowed data stream by reducing
-	 * the current window at every trigger.The user can also extend the
-	 * {@link RichReduceFunction} to gain access to other features provided by
-	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param reduceFunction
-	 *            The reduce function that will be applied to the windows.
-	 * @return The transformed DataStream
-	 */
 	@Override
 	public DiscretizedStream<OUT> reduceWindow(ReduceFunction<OUT> reduceFunction) {
 
@@ -92,18 +81,6 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		}
 	}
 
-	/**
-	 * Applies a reduceGroup transformation on the windowed data stream by
-	 * reducing the current window at every trigger. In contrast with the
-	 * standard binary reducer, with reduceGroup the user can access all
-	 * elements of the window at the same time through the iterable interface.
-	 * The user can also extend the to gain access to other features provided by
-	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * 
-	 * @param windowMapFunction
-	 *            The reduce function that will be applied to the windows.
-	 * @return The transformed DataStream
-	 */
 	@Override
 	public <R> DiscretizedStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
 
@@ -238,4 +215,10 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT> {
 		return new DiscretizedStream<OUT>(discretizedStream.copy(), groupByKey, transformation);
 	}
 
+	@Override
+	public WindowedDataStream<OUT> local() {
+		throw new UnsupportedOperationException(
+				"Local discretisation can only be applied after defining the discretisation logic");
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index 4da12ac..b63cb96 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.streaming.api.function.RichWindowMapFunction;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
 import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
@@ -53,10 +54,11 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 
 /**
- * A {@link WindowedDataStream} represents a data stream that has been divided
- * into windows (predefined chunks). User defined function such as
+ * A {@link WindowedDataStream} represents a data stream that has been
+ * discretised into windows. User defined function such as
  * {@link #reduceWindow(ReduceFunction)}, {@link #mapWindow()} or aggregations
- * can be applied to the windows.
+ * can be applied to the windows. The results of these transformations are also
+ * WindowedDataStreams of the same discretisation unit.
  * 
  * @param <OUT>
  *            The output type of the {@link WindowedDataStream}
@@ -64,9 +66,7 @@ import org.apache.flink.streaming.util.keys.KeySelectorUtil;
 public class WindowedDataStream<OUT> {
 
 	protected enum WindowTransformation {
-
 		REDUCEWINDOW, MAPWINDOW, NONE;
-
 		private Function UDF;
 
 		public WindowTransformation with(Function UDF) {
@@ -132,8 +132,8 @@ public class WindowedDataStream<OUT> {
 	 * </br></br> The user function in this case will be called on the 5 most
 	 * recent elements every 2 seconds
 	 * 
-	 * @param policyHelpers
-	 *            The policies that define the triggering frequency
+	 * @param policyHelper
+	 *            The policy that define the triggering frequency
 	 * 
 	 * @return The windowed data stream with triggering set
 	 */
@@ -151,7 +151,7 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} by the given key
 	 * positions. The window sizes (evictions) and slide sizes (triggers) will
-	 * be calculated on the whole stream (in a central fashion), but the user
+	 * be calculated on the whole stream (in a global fashion), but the user
 	 * defined functions will be applied on a per group basis. </br></br> To get
 	 * windows and triggers on a per group basis apply the
 	 * {@link DataStream#window} operator on an already grouped data stream.
@@ -171,7 +171,7 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} by the given field
 	 * expressions. The window sizes (evictions) and slide sizes (triggers) will
-	 * be calculated on the whole stream (in a central fashion), but the user
+	 * be calculated on the whole stream (in a global fashion), but the user
 	 * defined functions will be applied on a per group basis. </br></br> To get
 	 * windows and triggers on a per group basis apply the
 	 * {@link DataStream#window} operator on an already grouped data stream.
@@ -191,7 +191,7 @@ public class WindowedDataStream<OUT> {
 	/**
 	 * Groups the elements of the {@link WindowedDataStream} using the given
 	 * {@link KeySelector}. The window sizes (evictions) and slide sizes
-	 * (triggers) will be calculated on the whole stream (in a central fashion),
+	 * (triggers) will be calculated on the whole stream (in a global fashion),
 	 * but the user defined functions will be applied on a per group basis.
 	 * </br></br> To get windows and triggers on a per group basis apply the
 	 * {@link DataStream#window} operator on an already grouped data stream.
@@ -212,11 +212,10 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Sets the windowed computations local, so that the windowing and reduce or
-	 * aggregation logic will be computed for each parallel instance of this
-	 * operator
+	 * Sets the window discretisation local, meaning that windows will be
+	 * created in parallel at environment parallelism.
 	 * 
-	 * @return The local windowed data stream
+	 * @return The WindowedDataStream with local discretisation
 	 */
 	public WindowedDataStream<OUT> local() {
 		WindowedDataStream<OUT> out = copy();
@@ -224,11 +223,24 @@ public class WindowedDataStream<OUT> {
 		return out;
 	}
 
+	/**
+	 * Returns the {@link DataStream} of {@link StreamWindow}s which represent
+	 * the discretised stream. There is no ordering guarantee for the received
+	 * windows.
+	 * 
+	 * @return The discretised stream
+	 */
 	public DataStream<StreamWindow<OUT>> getDiscretizedStream() {
 		return discretize(WindowTransformation.NONE, new BasicWindowBuffer<OUT>())
 				.getDiscretizedStream();
 	}
 
+	/**
+	 * Flattens the results of the window computations and streams out the
+	 * window elements.
+	 * 
+	 * @return The data stream consisting of the individual records.
+	 */
 	public DataStream<OUT> flatten() {
 		return dataStream;
 	}
@@ -259,15 +271,16 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies a reduceGroup transformation on the windowed data stream by
-	 * reducing the current window at every trigger. In contrast with the
-	 * standard binary reducer, with reduceGroup the user can access all
+	 * Applies a mapWindow transformation on the windowed data stream by calling
+	 * the mapWindow function on the window at every trigger. In contrast with
+	 * the standard binary reducer, with mapWindow allows the user to access all
 	 * elements of the window at the same time through the iterable interface.
-	 * The user can also extend the to gain access to other features provided by
-	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * The user can also extend the {@link RichWindowMapFunction} to gain access
+	 * to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
 	 * 
 	 * @param windowMapFunction
-	 *            The reduce function that will be applied to the windows.
+	 *            The function that will be applied to the windows.
 	 * @return The transformed DataStream
 	 */
 	public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction) {
@@ -276,18 +289,21 @@ public class WindowedDataStream<OUT> {
 	}
 
 	/**
-	 * Applies a reduceGroup transformation on the windowed data stream by
-	 * reducing the current window at every trigger. In contrast with the
-	 * standard binary reducer, with reduceGroup the user can access all
+	 * Applies a mapWindow transformation on the windowed data stream by calling
+	 * the mapWindow function on the window at every trigger. In contrast with
+	 * the standard binary reducer, with mapWindow allows the user to access all
 	 * elements of the window at the same time through the iterable interface.
-	 * The user can also extend the to gain access to other features provided by
-	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
-	 * </br> </br> This version of reduceGroup uses user supplied
-	 * typeinformation for serializaton. Use this only when the system is unable
-	 * to detect type information using: {@link #mapWindow()}
+	 * The user can also extend the {@link RichWindowMapFunction} to gain access
+	 * to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * </br> </br> This version of mapWindow uses user supplied typeinformation
+	 * for serializaton. Use this only when the system is unable to detect type
+	 * information.
 	 * 
 	 * @param windowMapFunction
-	 *            The reduce function that will be applied to the windows.
+	 *            The function that will be applied to the windows.
+	 * @param outType
+	 *            The output type of the operator.
 	 * @return The transformed DataStream
 	 */
 	public <R> WindowedDataStream<R> mapWindow(WindowMapFunction<OUT, R> windowMapFunction,

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
index ac2a19e..e34a2f9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/RichWindowMapFunction.java
@@ -18,8 +18,17 @@
 package org.apache.flink.streaming.api.function;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Abstract class for defining rich mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.</p> In addition the user can access the functionality
+ * provided by the {@link RichFunction} interface.
+ */
 public abstract class RichWindowMapFunction<IN, OUT> extends AbstractRichFunction implements
 		WindowMapFunction<IN, OUT> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
index 273d731..eb5f6c3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/WindowMapFunction.java
@@ -20,8 +20,15 @@ package org.apache.flink.streaming.api.function;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
+import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Interface for defining mapWindow transformation to be applied on
+ * {@link WindowedDataStream}s. The mapWindow function will be called on each
+ * {@link StreamWindow}.
+ */
 public interface WindowMapFunction<T, O> extends Function, Serializable {
 
 	void mapWindow(Iterable<T> values, Collector<O> out) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index 58d7adb..d6bbb08 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -28,11 +28,15 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
+/**
+ * This invokable represents the grouped discretization step of a window
+ * transformation. The user supplied eviction and trigger policies are applied
+ * on a per group basis to create the {@link StreamWindow} that will be further
+ * transformed in the next stages. </p> To allow pre-aggregations supply an
+ * appropriate {@link WindowBuffer}
+ */
 public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {
 
-	/**
-	 * Auto-generated serial version UID
-	 */
 	private static final long serialVersionUID = -3469545957144404137L;
 
 	protected KeySelector<IN, ?> keySelector;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
index 5363c10..a9f78b6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedTimeDiscretizer.java
@@ -22,6 +22,10 @@ import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
+/**
+ * A specialized {@link GroupedStreamDiscretizer} to be used with time only
+ * policies
+ */
 public class GroupedTimeDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 
 	private static final long serialVersionUID = -3469545957144404137L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index a14058a..d053dd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -26,6 +26,12 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
 
+/**
+ * This invokable represents the discretization step of a window transformation.
+ * The user supplied eviction and trigger policies are applied to create the
+ * {@link StreamWindow} that will be further transformed in the next stages.
+ * </p> To allow pre-aggregations supply an appropriate {@link WindowBuffer}
+ */
 public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index 7eaea58..edefeef 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -20,6 +20,10 @@ package org.apache.flink.streaming.api.invokable.operator.windowing;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
+/**
+ * This invokable flattens the results of the window transformations by
+ * outputing the elements of the {@link StreamWindow} one-by-one
+ */
 public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	public WindowFlattener() {

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
index de93fab..3dfd59d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -18,10 +18,15 @@
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
+/**
+ * This invokable is used to apply mapWindow transformations on
+ * {@link WindowedDataStream}s.
+ */
 public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
index 8601d06..a58bb9f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -23,6 +23,11 @@ import java.util.Map;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
+/**
+ * This invokable merges together the different partitions of the
+ * {@link StreamWindow}s used to merge the results of parallel transformations
+ * that belong in the same window.
+ */
 public class WindowMerger<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
 
 	private Map<Integer, StreamWindow<T>> windows;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
index ea4451e..e010af4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -21,8 +21,11 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.invokable.ChainableInvokable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
-public class WindowPartitioner<T> extends
-		ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
+/**
+ * This invokable applies either split or key partitioning depending on the
+ * transformation.
+ */
+public class WindowPartitioner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
 
 	private KeySelector<T, ?> keySelector;
 	private int numberOfSplits;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
index 3a4bb69..d7182de 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
@@ -19,9 +19,14 @@ package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 
+/**
+ * This invokable is used to apply reduceWindow transformations on
+ * {@link WindowedDataStream}s.
+ */
 public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWindow<IN>> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
index 988058c..74ed62e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/StreamWindow.java
@@ -25,8 +25,17 @@ import java.util.Random;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.util.Collector;
 
+/**
+ * Core abstraction for representing windows for {@link WindowedDataStream}s.
+ * The user can apply transformations on these windows with the appropriate
+ * {@link WindowedDataStream} methods. </p> Each stream window consists of a
+ * random ID, a number representing the number of partitions for this specific
+ * window (ID) and the elements itself. The ID and number of parts will be used
+ * to merge the subwindows after distributed transformations.
+ */
 public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 
 	private static final long serialVersionUID = -5150196421193988403L;
@@ -36,25 +45,54 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 
 	public int numberOfParts;
 
+	/**
+	 * Creates a new window with a random id
+	 */
 	public StreamWindow() {
 		this(rnd.nextInt(), 1);
 	}
 
+	/**
+	 * Creates a new window with the specific id
+	 * 
+	 * @param windowID
+	 *            ID of the window
+	 */
 	public StreamWindow(int windowID) {
 		this(windowID, 1);
 	}
 
+	/**
+	 * Creates a new window with the given id and number of parts
+	 * 
+	 * @param windowID
+	 * @param numberOfParts
+	 */
 	public StreamWindow(int windowID, int numberOfParts) {
 		super();
 		this.windowID = windowID;
 		this.numberOfParts = numberOfParts;
 	}
 
+	/**
+	 * Creates a shallow copy of the window
+	 * 
+	 * @param window
+	 *            The window to be copied
+	 */
 	public StreamWindow(StreamWindow<T> window) {
 		this(window.windowID, window.numberOfParts);
 		addAll(window);
 	}
 
+	/**
+	 * Creates a deep copy of the window using the given serializer
+	 * 
+	 * @param window
+	 *            The window to be copied
+	 * @param serializer
+	 *            The serializer used for copying the records.
+	 */
 	public StreamWindow(StreamWindow<T> window, TypeSerializer<T> serializer) {
 		this(window.windowID, window.numberOfParts);
 		for (T element : window) {
@@ -62,6 +100,14 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		}
 	}
 
+	/**
+	 * Partitions the window using the given keyselector. A subwindow will be
+	 * created for each key.
+	 * 
+	 * @param keySelector
+	 *            The keyselector used for extracting keys.
+	 * @return A list of the subwindows
+	 */
 	public List<StreamWindow<T>> partitionBy(KeySelector<T, ?> keySelector) throws Exception {
 		Map<Object, StreamWindow<T>> partitions = new HashMap<Object, StreamWindow<T>>();
 
@@ -85,6 +131,13 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		return output;
 	}
 
+	/**
+	 * Splits the window into n equal (if possible) sizes.
+	 * 
+	 * @param n
+	 *            Number of desired partitions
+	 * @return The list of subwindows.
+	 */
 	public List<StreamWindow<T>> split(int n) {
 		int numElements = size();
 		if (n > numElements) {
@@ -116,10 +169,24 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		return this;
 	}
 
+	/**
+	 * Checks whether this window can be merged with the given one.
+	 * 
+	 * @param otherWindow
+	 *            The window to test
+	 * @return Window compatibility
+	 */
 	public boolean compatibleWith(StreamWindow<T> otherWindow) {
 		return this.windowID == otherWindow.windowID && this.numberOfParts > 1;
 	}
 
+	/**
+	 * Merges compatible windows together.
+	 * 
+	 * @param windows
+	 *            Windows to merge
+	 * @return Merged window
+	 */
 	public static <R> StreamWindow<R> merge(StreamWindow<R>... windows) {
 		StreamWindow<R> window = new StreamWindow<R>(windows[0]);
 		for (int i = 1; i < windows.length; i++) {
@@ -134,6 +201,13 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		return window;
 	}
 
+	/**
+	 * Merges compatible windows together.
+	 * 
+	 * @param windows
+	 *            Windows to merge
+	 * @return Merged window
+	 */
 	public static <R> StreamWindow<R> merge(List<StreamWindow<R>> windows) {
 		if (windows.isEmpty()) {
 			throw new RuntimeException("Need at least one window to merge");
@@ -171,6 +245,13 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 		return super.toString();
 	}
 
+	/**
+	 * Creates a new {@link StreamWindow} with random id from the given elements
+	 * 
+	 * @param elements
+	 *            The elements contained in the resulting window
+	 * @return The window
+	 */
 	public static <R> StreamWindow<R> fromElements(R... elements) {
 		StreamWindow<R> window = new StreamWindow<R>();
 		for (R element : elements) {

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 9d5676c..458de41 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -23,6 +23,10 @@ import java.util.NoSuchElementException;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Basic window buffer that stores the elements in a simple list without any
+ * pre-aggregation.
+ */
 public class BasicWindowBuffer<T> implements WindowBuffer<T> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
index a01f97c..59bd974 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/CompletePreAggregator.java
@@ -18,6 +18,10 @@
 
 package org.apache.flink.streaming.api.windowing.windowbuffer;
 
+/**
+ * Interface for marking window pre-aggregators that fully process the window so
+ * that no further reduce step is necessary afterwards.
+ */
 public interface CompletePreAggregator {
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index eef8bed..7403ffe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -26,6 +26,9 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Grouped pre-reducer for tumbling eviction polciy.
+ */
 public class TumblingGroupedPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index b755e73..35cf60e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -22,6 +22,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Non-grouped pre-reducer for tumbling eviction policy.
+ */
 public class TumblingPreReducer<T> implements WindowBuffer<T>, CompletePreAggregator {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 1a45194..e0429ab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -22,6 +22,10 @@ import java.io.Serializable;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.util.Collector;
 
+/**
+ * Interface for defining specialized buffers to store/emit window data.
+ * Pre-aggregators should be implemented using this interface.
+ */
 public interface WindowBuffer<T> extends Serializable, Cloneable {
 
 	public void store(T element) throws Exception;

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index f6e92aa..3ab4ff1 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -442,9 +442,9 @@ class DataStream[T](javaStream: JavaStream[T]) {
 
   /**
    * Create a WindowedDataStream that can be used to apply
-   * transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream. To define the windows one or
-   * more WindowingHelper-s such as Time, Count and
+   * transformation like .reduceWindow(...) or aggregations on
+   * preset chunks(windows) of the data stream. To define the windows a
+   * WindowingHelper such as Time, Count and
    * Delta can be used.</br></br> When applied to a grouped data
    * stream, the windows (evictions) and slide sizes (triggers) will be
    * computed on a per group basis. </br></br> For more advanced control over
@@ -455,14 +455,14 @@ class DataStream[T](javaStream: JavaStream[T]) {
     javaStream.window(windowingHelper)
 
   /**
-   * Create a WindowedDataStream using the given TriggerPolicy-s and EvictionPolicy-s.
-   * Windowing can be used to apply transformation like .reduce(...) or aggregations on
-   * preset chunks(windows) of the data stream.</br></br>For most common
-   * use-cases please refer to window(WindowingHelper[_]*)
+   * Create a WindowedDataStream using the given Trigger and Eviction policies.
+   * Windowing can be used to apply transformation like .reduceWindow(...) or 
+   * aggregations on preset chunks(windows) of the data stream.</br></br>For most common
+   * use-cases please refer to window(WindowingHelper[_])
    *
    */
-  def window(trigger: TriggerPolicy[T], evicter: EvictionPolicy[T]):
-    WindowedDataStream[T] = javaStream.window(trigger, evicter)
+  def window(trigger: TriggerPolicy[T], eviction: EvictionPolicy[T]):
+    WindowedDataStream[T] = javaStream.window(trigger, eviction)
 
   /**
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/f571ece6/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index 33d71be..b072197 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -49,7 +49,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   /**
    * Groups the elements of the WindowedDataStream using the given
    * field positions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * (triggers) will be calculated on the whole stream (in a global fashion),
    * but the user defined functions will be applied on a per group basis.
    * </br></br> To get windows and triggers on a per group basis apply the
    * DataStream.window(...) operator on an already grouped data stream.
@@ -60,7 +60,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   /**
    * Groups the elements of the WindowedDataStream using the given
    * field expressions. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * (triggers) will be calculated on the whole stream (in a global fashion),
    * but the user defined functions will be applied on a per group basis.
    * </br></br> To get windows and triggers on a per group basis apply the
    * DataStream.window(...) operator on an already grouped data stream.
@@ -72,7 +72,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   /**
    * Groups the elements of the WindowedDataStream using the given
    * KeySelector function. The window sizes (evictions) and slide sizes
-   * (triggers) will be calculated on the whole stream (in a central fashion),
+   * (triggers) will be calculated on the whole stream (in a global fashion),
    * but the user defined functions will be applied on a per group basis.
    * </br></br> To get windows and triggers on a per group basis apply the
    * DataStream.window(...) operator on an already grouped data stream.
@@ -88,16 +88,15 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   }
   
   /**
-   * Sets the computations local meaning that the windowing and reduce or
-   * aggregation logic will be computed for each parallel instance of this
-   * operator
+   * Sets the window discretisation local, meaning that windows will be
+   * created in parallel at environment parallelism.
    * 
    */
   def local(): WindowedDataStream[T] = javaStream.local
  
   /**
    * Flattens the result of a window transformation returning the stream of window
-   * contents elementwise
+   * contents elementwise.
    */
   def flatten(): DataStream[T] = javaStream.flatten()
   
@@ -135,12 +134,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   }
 
   /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
+   * Applies a mapWindow transformation on the windowed data stream by calling the mapWindow
+   * method on current window at every trigger. In contrast with the simple binary reduce 
+   * operator, mapWindow exposes the whole window through the Iterable interface.
    * </br>
    * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   * Whenever possible try to use reduceWindow instead of mapWindow for increased efficiency
    */
   def mapWindow[R: ClassTag: TypeInformation](reducer: WindowMapFunction[T, R]):
   WindowedDataStream[R] = {
@@ -151,12 +150,12 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
   }
 
   /**
-   * Applies a reduceGroup transformation on the windowed data stream by reducing
-   * the current window at every trigger. In contrast with the simple binary reduce operator,
-   * groupReduce exposes the whole window through the Iterable interface.
+   * Applies a mapWindow transformation on the windowed data stream by calling the mapWindow
+   * method on current window at every trigger. In contrast with the simple binary reduce 
+   * operator, mapWindow exposes the whole window through the Iterable interface.
    * </br>
    * </br>
-   * Whenever possible try to use reduce instead of groupReduce for increased efficiency
+   * Whenever possible try to use reduceWindow instead of mapWindow for increased efficiency
    */
   def mapWindow[R: ClassTag: TypeInformation](fun: (Iterable[T], Collector[R]) => Unit):
   WindowedDataStream[R] = {


Mime
View raw message