flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [13/13] flink git commit: [FLINK-2807] Add Javadocs for new windowing semantics/internals
Date Mon, 05 Oct 2015 14:42:47 GMT
[FLINK-2807] Add Javadocs for new windowing semantics/internals


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/62df0a03
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/62df0a03
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/62df0a03

Branch: refs/heads/master
Commit: 62df0a0349b276d4a5b7d9954d2a07f367a61d2d
Parents: 8c2c769
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Sat Oct 3 16:47:28 2015 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Oct 5 16:36:35 2015 +0200

----------------------------------------------------------------------
 .../api/windowing/assigners/GlobalWindows.java  |  8 +++
 .../assigners/SlidingProcessingTimeWindows.java | 13 ++++
 .../windowing/assigners/SlidingTimeWindows.java | 13 ++++
 .../TumblingProcessingTimeWindows.java          | 13 ++++
 .../assigners/TumblingTimeWindows.java          | 13 ++++
 .../api/windowing/assigners/WindowAssigner.java | 22 ++++++
 .../api/windowing/evictors/CountEvictor.java    | 10 +++
 .../api/windowing/evictors/DeltaEvictor.java    | 16 +++++
 .../api/windowing/evictors/Evictor.java         | 25 ++++++-
 .../api/windowing/evictors/TimeEvictor.java     | 11 +++
 .../ContinuousProcessingTimeTrigger.java        | 12 ++++
 .../triggers/ContinuousWatermarkTrigger.java    | 14 ++++
 .../api/windowing/triggers/CountTrigger.java    | 11 +++
 .../api/windowing/triggers/DeltaTrigger.java    | 20 ++++++
 .../triggers/ProcessingTimeTrigger.java         |  7 ++
 .../api/windowing/triggers/PurgingTrigger.java  | 15 +++++
 .../api/windowing/triggers/Trigger.java         | 70 ++++++++++++++++++--
 .../windowing/triggers/WatermarkTrigger.java    |  9 +++
 .../operators/BucketStreamSortOperator.java     | 16 ++++-
 .../runtime/operators/Triggerable.java          |  7 +-
 .../EvictingNonKeyedWindowOperator.java         | 10 ++-
 .../windowing/EvictingWindowOperator.java       | 12 ++++
 .../windowing/NonKeyedWindowOperator.java       |  9 +++
 .../operators/windowing/WindowOperator.java     | 60 +++++++++++++++--
 .../windowing/buffers/EvictingWindowBuffer.java | 15 ++++-
 .../windowing/buffers/HeapWindowBuffer.java     |  8 ++-
 .../buffers/PreAggregatingHeapWindowBuffer.java |  9 ++-
 .../windowing/buffers/WindowBuffer.java         | 38 +++++++++--
 .../windowing/buffers/WindowBufferFactory.java  | 24 +++++++
 29 files changed, 483 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index 391a6a4..52c8f55 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -23,6 +23,14 @@ import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import java.util.Collection;
 import java.util.Collections;
 
+/**
+ * A {@link WindowAssigner} that assigns all elements to the same global window.
+ *
+ * <p>
+ * Use this if you want to use a {@link Trigger} and
+ * {@link org.apache.flink.streaming.api.windowing.evictors.Evictor} to to flexible, policy based
+ * windows.
+ */
 public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
index 6fc79b0..65d7641 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingProcessingTimeWindows.java
@@ -28,6 +28,19 @@ import java.io.ObjectInputStream;
 import java.util.Collection;
 import java.util.List;
 
+/**
+ * A {@link WindowAssigner} that windows elements into sliding, time-based windows. The windowing
+ * is based on system time. Windows can possibly overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(SlidingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
 public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
index 49bff05..52ae356 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SlidingTimeWindows.java
@@ -26,6 +26,19 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
+/**
+ * A {@link WindowAssigner} that windows elements into sliding windows based on the timestamp of the
+ * elements. Windows can possibly overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(SlidingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
 public class SlidingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
index 1f2eebf..41f6362 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingProcessingTimeWindows.java
@@ -25,6 +25,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import java.util.Collection;
 import java.util.Collections;
 
+/**
+ * A {@link WindowAssigner} that windows elements into time-based windows. The windowing is
+ * based on system time. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(TumblingProcessingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
 public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
index 019f45b..b6022b3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/TumblingTimeWindows.java
@@ -25,6 +25,19 @@ import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 import java.util.Collection;
 import java.util.Collections;
 
+/**
+ * A {@link WindowAssigner} that windows elements into windows based on the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(TumblingTimeWindows.of(Time.of(1, MINUTES), Time.of(10, SECONDS));
+ * } </pre>
+ */
 public class TumblingTimeWindows extends WindowAssigner<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
index 5996426..105caa6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/assigners/WindowAssigner.java
@@ -23,10 +23,32 @@ import scala.Serializable;
 
 import java.util.Collection;
 
+/**
+ * A {@code WindowAssigner} assigns zero or more {@link Window Windows} to an element.
+ *
+ * <p>
+ * In a window operation, elements are grouped by their key (if available) and by the windows to
+ * which it was assigned. The set of elements with the same key and window is called a pane.
+ * When a {@link Trigger} decides that a certain pane should fire the
+ * {@link org.apache.flink.streaming.api.functions.windowing.WindowFunction} is applied
+ * to produce output elements for that pane.
+ *
+ * @param <T> The type of elements that this WindowAssigner can assign windows to.
+ * @param <W> The type of {@code Window} that this assigner assigns.
+ */
 public abstract class WindowAssigner<T, W extends Window> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
+	/**
+	 * Returns a {@code Collection} of windows that should be assigned to the element.
+	 *
+	 * @param element The element to which windows should be assigned.
+	 * @param timestamp The timestamp of the element.
+	 */
 	public abstract Collection<W> assignWindows(T element, long timestamp);
 
+	/**
+	 * Returns the default trigger associated with this {@code WindowAssigner}.
+	 */
 	public abstract Trigger<T, W> getDefaultTrigger();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
index 04636ee..0a078e2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/CountEvictor.java
@@ -20,6 +20,11 @@ package org.apache.flink.streaming.api.windowing.evictors;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+/**
+ * An {@link Evictor} that keeps only a certain amount of elements.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
 public class CountEvictor<W extends Window> implements Evictor<Object, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -38,6 +43,11 @@ public class CountEvictor<W extends Window> implements Evictor<Object, W> {
 		}
 	}
 
+	/**
+	 * Creates a {@code CountEvictor} that keeps the given number of elements.
+	 *
+	 * @param maxCount The number of elements to keep in the pane.
+	 */
 	public static <W extends Window> CountEvictor<W> of(long maxCount) {
 		return new CountEvictor<>(maxCount);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
index c7872ce..0083a04 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/DeltaEvictor.java
@@ -22,6 +22,16 @@ import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+/**
+ * An {@link Evictor} that keeps elements based on a {@link DeltaFunction} and a threshold.
+ *
+ * <p>
+ * Eviction starts from the first element of the buffer and removes all elements from the buffer
+ * which have a higher delta then the threshold. As soon as there is an element with a lower delta,
+ * the eviction stops.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
 public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -52,6 +62,12 @@ public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
 		return "DeltaEvictor(" +  deltaFunction + ", " + threshold + ")";
 	}
 
+	/**
+	 * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
+	 *
+	 * @param threshold The threshold
+	 * @param deltaFunction The {@code DeltaFunction}
+	 */
 	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
 		return new DeltaEvictor<>(threshold, deltaFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
index db04ac4..1a6c5c8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/Evictor.java
@@ -21,8 +21,31 @@ import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import scala.Serializable;
 
+/**
+ * An {@code Evictor} can remove elements from a pane before it is being processed and after
+ * window evaluation was triggered by a
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger}.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Evictor}.
+ *
+ * @param <T> The type of elements that this {@code Evictor} can evict.
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
 public interface Evictor<T, W extends Window> extends Serializable {
 
-	public abstract int evict(Iterable<StreamRecord<T>> elements, int size, W window);
+	/**
+	 * Computes how many elements should be removed from the pane. The result specifies how
+	 * many elements should be removed from the beginning.
+	 *
+	 * @param elements The elements currently in the pane.
+	 * @param size The current number of elements in the pane.
+	 * @param window The {@link Window}
+	 */
+	int evict(Iterable<StreamRecord<T>> elements, int size, W window);
 }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
index 2965214..5004c42 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/evictors/TimeEvictor.java
@@ -22,6 +22,12 @@ import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
+/**
+ * An {@link Evictor} that keeps elements for a certain amount of time. Elements older
+ * than {@code current_time - keep_time} are evicted.
+ *
+ * @param <W> The type of {@link Window Windows} on which this {@code Evictor} can operate.
+ */
 public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -55,6 +61,11 @@ public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
 		return windowSize;
 	}
 
+	/**
+	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
+	 *
+	 * @param windowSize The amount of time for which to keep elements.
+	 */
 	public static <W extends Window> TimeEvictor<W> of(AbstractTime windowSize) {
 		return new TimeEvictor<>(windowSize.toMilliseconds());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index 24e8ce3..f23f6ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -21,6 +21,12 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. The time is the current
+ * system time.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
 public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -80,6 +86,12 @@ public class ContinuousProcessingTimeTrigger<W extends Window> implements Trigge
 		return "ContinuousProcessingTimeTrigger(" + interval + ")";
 	}
 
+	/**
+	 * Creates a trigger that continuously fires based on the given interval.
+	 *
+	 * @param interval The time interval at which to fire.
+	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+	 */
 	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(AbstractTime interval) {
 		return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
index e11ceba..02ea81d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousWatermarkTrigger.java
@@ -21,6 +21,14 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.time.AbstractTime;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * A {@link Trigger} that continuously fires based on a given time interval. This fires based
+ * on {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
 public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -66,6 +74,12 @@ public class ContinuousWatermarkTrigger<W extends Window> implements Trigger<Obj
 		return interval;
 	}
 
+	/**
+	 * Creates a trigger that continuously fires based on the given interval.
+	 *
+	 * @param interval The time interval at which to fire.
+	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+	 */
 	public static <W extends Window> ContinuousWatermarkTrigger<W> of(AbstractTime interval) {
 		return new ContinuousWatermarkTrigger<>(interval.toMilliseconds());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index a51fae6..53480fe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -19,6 +19,11 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
 public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -55,6 +60,12 @@ public class CountTrigger<W extends Window> implements Trigger<Object, W> {
 		return "CountTrigger(" +  maxCount + ")";
 	}
 
+	/**
+	 * Creates a trigger that fires once the number of elements in a pane reaches the given count.
+	 *
+	 * @param maxCount The count of elements at which to fire.
+	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+	 */
 	public static <W extends Window> CountTrigger<W> of(long maxCount) {
 		return new CountTrigger<>(maxCount);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
index ecd7ed0..cf4cf0c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/DeltaTrigger.java
@@ -20,6 +20,16 @@ package org.apache.flink.streaming.api.windowing.triggers;
 import org.apache.flink.streaming.api.functions.windowing.delta.DeltaFunction;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * A {@link Trigger} that fires based on a {@link DeltaFunction} and a threshold.
+ *
+ * <p>
+ * This trigger calculates a delta between the data point which triggered last
+ * and the currently arrived data point. It triggers if the delta is higher than
+ * a specified threshold.
+ *
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
 public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -60,6 +70,16 @@ public class DeltaTrigger<T, W extends Window> implements Trigger<T, W> {
 		return "DeltaTrigger(" +  deltaFunction + ", " + threshold + ")";
 	}
 
+	/**
+	 * Creates a delta trigger from the given threshold and {@code DeltaFunction}.
+	 *
+	 * @param threshold The threshold at which to trigger.
+	 * @param deltaFunction The delta function to use
+	 *
+	 * @param <T> The type of elements on which this trigger can operate.
+	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+	 * @return
+	 */
 	public static <T, W extends Window> DeltaTrigger<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
 		return new DeltaTrigger<>(threshold, deltaFunction);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index f693a67..cc3440c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -19,6 +19,10 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+/**
+ * A {@link Trigger} that fires once the current system time passes the end of the window
+ * to which a pane belongs.
+ */
 public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
@@ -50,6 +54,9 @@ public class ProcessingTimeTrigger implements Trigger<Object, TimeWindow> {
 		return "ProcessingTimeTrigger()";
 	}
 
+	/**
+	 * Creates a new trigger that fires once system time passes the end of the window.
+	 */
 	public static ProcessingTimeTrigger create() {
 		return new ProcessingTimeTrigger();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 88e22cd..1c896a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -20,6 +20,16 @@ package org.apache.flink.streaming.api.windowing.triggers;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
+/**
+ * A trigger that can turn any {@link Trigger} into a purging {@code Trigger}.
+ *
+ * <p>
+ * When the nested trigger fires, this will return a {@code FIRE_AND_PURGE}
+ * {@link org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerResult}
+ *
+ * @param <T> The type of elements on which this trigger can operate.
+ * @param <W> The type of {@link Window Windows} on which this trigger can operate.
+ */
 public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 	private static final long serialVersionUID = 1L;
 
@@ -65,6 +75,11 @@ public class PurgingTrigger<T, W extends Window> implements Trigger<T, W> {
 		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
 	}
 
+	/**
+	 * Creates a new purging trigger from the given {@code Trigger}.
+	 *
+	 * @param nestedTrigger The trigger that is wrapped by this purging trigger
+	 */
 	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
 		return new PurgingTrigger<>(nestedTrigger);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index b04aacf..97d9ba5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -20,21 +20,81 @@ package org.apache.flink.streaming.api.windowing.triggers;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 import scala.Serializable;
 
+/**
+ * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
+ * results for that part of the window.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Trigger}.
+ *
+ * @param <T> The type of elements on which this {@code Trigger} works.
+ * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
+ */
 public interface Trigger<T, W extends Window> extends Serializable {
 
-	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
+	/**
+	 * Called for every element that gets added to a pane. The result of this will determine
+	 * whether the pane is evaluated to emit results.
+	 *
+	 * @param element The element that arrived.
+	 * @param timestamp The timestamp of the element that arrived.
+	 * @param window The window to which this pane belongs.
+	 * @param ctx A context object that can be used to register timer callbacks.
+	 */
+	TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx);
 
-	public TriggerResult onTime(long time, TriggerContext ctx);
+	/**
+	 * Called when a timer that was set using the trigger context fires.
+	 *
+	 * @param time The timestamp at which the timer fired.
+	 * @param ctx A context object that can be used to register timer callbacks.
+	 */
+	TriggerResult onTime(long time, TriggerContext ctx);
 
-	public Trigger<T, W> duplicate();
+	/**
+	 * Creates a duplicate of the {@code Trigger} without the state of the original {@code Trigger}.
+	 * @return The duplicate {@code Trigger} object.
+	 */
+	Trigger<T, W> duplicate();
 
-	public static enum TriggerResult {
+	/**
+	 * Result type for trigger methods. This determines what happens which the window.
+	 *
+	 * <p>
+	 * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
+	 * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
+	 * are purged. On {@code CONTINUE} nothing happens, processing continues.
+	 */
+	enum TriggerResult {
 		CONTINUE, FIRE_AND_PURGE, FIRE
 	}
 
-	public interface TriggerContext {
+	/**
+	 * A context object that is given to {@code Trigger} methods to allow them to register timer
+	 * callbacks.
+	 */
+	interface TriggerContext {
+
+		/**
+		 * Register a system time callback. When the current system time passes the specified
+		 * time {@link #onTime(long, TriggerContext)} is called.
+		 *
+		 * @param time The time at which to invoke {@link #onTime(long, TriggerContext)}
+		 */
 		void registerProcessingTimeTimer(long time);
 
+		/**
+		 * Register a watermark callback. When the current watermark passes the specified
+		 * time {@link #onTime(long, TriggerContext)} is called.
+		 *
+		 * @see org.apache.flink.streaming.api.watermark.Watermark
+		 *
+		 * @param time The watermark at which to invoke {@link #onTime(long, TriggerContext)}
+		 */
 		void registerWatermarkTimer(long time);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
index 6ba8890..5d66ba3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/triggers/WatermarkTrigger.java
@@ -19,6 +19,12 @@ package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
 
+/**
+ * A {@link Trigger} that fires once the watermark passes the end of the window
+ * to which a pane belongs.
+ *
+ * @see org.apache.flink.streaming.api.watermark.Watermark
+ */
 public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 	private static final long serialVersionUID = 1L;
 
@@ -50,6 +56,9 @@ public class WatermarkTrigger implements Trigger<Object, TimeWindow> {
 		return "WatermarkTrigger()";
 	}
 
+	/**
+	 * Creates trigger that fires once the watermark passes the end of the window.
+	 */
 	public static WatermarkTrigger create() {
 		return new WatermarkTrigger();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
index 145ad25..017c8ea 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.java
@@ -33,6 +33,13 @@ import java.util.Map;
 import java.util.Set;
 
 
+/**
+ * An operator that can sort a stream based on timestamps. Arriving elements will be put into
+ * buckets based on their timestamp. Sorting and emission of sorted elements happens once
+ * the watermark passes the end of a bucket.
+ *
+ * @param <T> The type of the elements on which this operator works.
+ */
 public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
 	private static final long serialVersionUID = 1L;
 
@@ -40,8 +47,13 @@ public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> imple
 
 	private transient Map<Long, List<StreamRecord<T>>> buckets;
 
-	public BucketStreamSortOperator(long granularity) {
-		this.granularity = granularity;
+	/**
+	 * Creates a new sorting operator that creates buckets with the given interval.
+	 *
+	 * @param interval The size (in time) of one bucket.
+	 */
+	public BucketStreamSortOperator(long interval) {
+		this.granularity = interval;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
index ac1a543..50d1cb6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/Triggerable.java
@@ -19,14 +19,13 @@
 package org.apache.flink.streaming.runtime.operators;
 
 /**
- * This interface must be implemented by objects that are triggered by a
- * {@link TriggerTimer}.
+ * This interface must be implemented by objects that are triggered by the timer service available
+ * to stream operators in {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment}.
  */
 public interface Triggerable {
 
 	/**
-	 * This method is invoked by the {@link TriggerTimer}
-	 * and given the timestamp for which the trigger was scheduled.
+	 * This method is invoked with the timestamp for which the trigger was scheduled.
 	 * <p>
 	 * If the triggering is delayed for whatever reason (trigger timer was blocked, JVM stalled due
 	 * to a garbage collection), the timestamp supplied to this function will still be the original

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
index 53df838..31c7fed 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingNonKeyedWindowOperator.java
@@ -30,7 +30,15 @@ import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuff
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-
+/**
+ * Evicting window operator for non-keyed windows.
+ *
+ * @see org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator
+ *
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
 public class EvictingNonKeyedWindowOperator<IN, OUT, W extends Window> extends NonKeyedWindowOperator<IN, OUT, W> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 334eb54..49d58e4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -33,6 +33,18 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Map;
 
+/**
+ * A {@link WindowOperator} that also allows an {@link Evictor} to be used.
+ *
+ * <p>
+ * The {@code Evictor} is used to evict elements from panes before processing a window and after
+ * a {@link Trigger} has fired.
+ *
+ * @param <K> The type of key returned by the {@code KeySelector}.
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
 public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends WindowOperator<K, IN, OUT, W> {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index d48643d..a80242d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -45,6 +45,15 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * Window operator for non-keyed windows.
+ *
+ * @see org.apache.flink.streaming.runtime.operators.windowing.WindowOperator
+ *
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
 public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		extends AbstractUdfStreamOperator<OUT, AllWindowFunction<IN, OUT, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 2d4635f..548afb3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -46,6 +46,32 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
+/**
+ * An operator that implements the logic for windowing based on a {@link WindowAssigner} and
+ * {@link Trigger}.
+ *
+ * <p>
+ * When an element arrives it gets assigned a key using a {@link KeySelector} and it get's
+ * assigned to zero or more windows using a {@link WindowAssigner}. Based on this the element
+ * is put into panes. A pane is the bucket of elements that have the same key and same
+ * {@code Window}. An element can be in multiple panes of it was assigned to multiple windows by the
+ * {@code WindowAssigner}.
+ *
+ * <p>
+ * Each pane gets its own instance of the provided {@code Trigger}. This trigger determines when
+ * the contents of the pane should be processed to emit results. When a trigger fires,
+ * the given {@link WindowFunction} is invoked to produce the results that are emitted for
+ * the pane to which the {@code Trigger} belongs.
+ *
+ * <p>
+ * This operator also needs a {@link WindowBufferFactory} to create a buffer for storing the
+ * elements of each pane.
+ *
+ * @param <K> The type of key returned by the {@code KeySelector}.
+ * @param <IN> The type of the incoming elements.
+ * @param <OUT> The type of elements emitted by the {@code WindowFunction}.
+ * @param <W> The type of {@code Window} that the {@code WindowAssigner} assigns.
+ */
 public class WindowOperator<K, IN, OUT, W extends Window>
 		extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
 		implements OneInputStreamOperator<IN, OUT>, Triggerable, InputTypeConfigurable {
@@ -54,24 +80,47 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
 
-
 	private final WindowAssigner<? super IN, W> windowAssigner;
+
 	private final KeySelector<IN, K> keySelector;
 
 	private final Trigger<? super IN, ? super W> triggerTemplate;
+
 	private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
 
+	/**
+	 * The windows (panes) that are currently in-flight. Each pane has a {@code WindowBuffer}
+	 * and a {@code TriggerContext} that stores the {@code Trigger} for that pane.
+	 */
 	protected transient Map<K, Map<W, Tuple2<WindowBuffer<IN>, TriggerContext>>> windows;
 
+	/**
+	 * Processing time timers that are currently in-flight.
+	 */
 	private transient Map<Long, Set<TriggerContext>> processingTimeTimers;
+
+	/**
+	 * Current waiting watermark callbacks.
+	 */
 	private transient Map<Long, Set<TriggerContext>> watermarkTimers;
 
+	/**
+	 * This is given to the {@code WindowFunction} for emitting elements with a given timestamp.
+	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	/**
+	 * If this is true. The current processing time is set as the timestamp of incoming elements.
+	 * This for use with a {@link org.apache.flink.streaming.api.windowing.evictors.TimeEvictor}
+	 * if eviction should happen based on processing time.
+	 */
 	private boolean setProcessingTime = false;
 
 	private TypeSerializer<IN> inputSerializer;
 
+	/**
+	 * Creates a new {@code WindowOperator} based on the given policies and user functions.
+	 */
 	public WindowOperator(WindowAssigner<? super IN, W> windowAssigner,
 			KeySelector<IN, K> keySelector,
 			WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory,
@@ -245,6 +294,10 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		}
 	}
 
+	/**
+	 * A context object that is given to {@code Trigger} functions to allow them to register
+	 * timer/watermark callbacks.
+	 */
 	protected class TriggerContext implements Trigger.TriggerContext {
 		Trigger<? super IN, ? super W> trigger;
 		K key;
@@ -312,9 +365,4 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
 		return windowBufferFactory;
 	}
-
-	@VisibleForTesting
-	public boolean isSetProcessingTime() {
-		return setProcessingTime;
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
index 50e392b..28365e1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/EvictingWindowBuffer.java
@@ -17,6 +17,19 @@
  */
 package org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
+/**
+ * A {@code WindowBuffer} that can also evict elements from the buffer. The order in which
+ * the elements are added is preserved. Elements can only be evicted started from the beginning of
+ * the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+
 public interface EvictingWindowBuffer<T> extends WindowBuffer<T> {
-	public boolean removeElements(int count);
+
+	/**
+	 * Removes the given number of elements, starting from the beginning.
+	 * @param count The number of elements to remove.
+	 */
+	void removeElements(int count);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
index 092718a..f9f8b26 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/HeapWindowBuffer.java
@@ -25,6 +25,11 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.ArrayDeque;
 
+/**
+ * An {@link EvictingWindowBuffer} that stores elements on the Java Heap.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
 public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
 	private static final long serialVersionUID = 1L;
 
@@ -40,12 +45,11 @@ public class HeapWindowBuffer<T> implements EvictingWindowBuffer<T> {
 	}
 
 	@Override
-	public boolean removeElements(int count) {
+	public void removeElements(int count) {
 		// TODO determine if this can be done in a better way
 		for (int i = 0; i < count; i++) {
 			elements.removeFirst();
 		}
-		return false;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
index 85f90b0..37be8f0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/PreAggregatingHeapWindowBuffer.java
@@ -25,6 +25,13 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.util.Collections;
 
+/**
+ * An {@link WindowBuffer} that stores elements on the Java Heap. This buffer uses a
+ * {@link ReduceFunction} to pre-aggregate elements that are added to the buffer.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
+
 public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
 	private static final long serialVersionUID = 1L;
 
@@ -85,7 +92,7 @@ public class PreAggregatingHeapWindowBuffer<T> implements WindowBuffer<T> {
 
 		@Override
 		public PreAggregatingHeapWindowBuffer<T> create() {
-			return new PreAggregatingHeapWindowBuffer<T>(reduceFunction);
+			return new PreAggregatingHeapWindowBuffer<>(reduceFunction);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
index 8c891d5..b111667 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBuffer.java
@@ -18,17 +18,47 @@
 package org.apache.flink.streaming.runtime.operators.windowing.buffers;
 
 
+import org.apache.flink.streaming.api.windowing.windows.Window;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 import java.io.Serializable;
 
+/**
+ * A {@code WindowBuffer} is used by
+ * {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} to store
+ * the elements of one pane.
+ *
+ * <p>
+ * A pane is the bucket of elements that have the same key (assigned by the
+ * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+ * be in multiple panes of it was assigned to multiple windows by the
+ * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+ * have their own instance of the {@code Evictor}.
+ *
+ * @param <T> The type of elements that this {@code WindowBuffer} can store.
+ */
 public interface WindowBuffer<T> extends Serializable {
 
-	public void storeElement(StreamRecord<T> element) throws Exception;
+	/**
+	 * Adds the element to the buffer.
+	 *
+	 * @param element The element to add.
+	 */
+	void storeElement(StreamRecord<T> element) throws Exception;
 
-	public Iterable<StreamRecord<T>> getElements();
+	/**
+	 * Returns all elements that are currently in the buffer.
+	 */
+	Iterable<StreamRecord<T>> getElements();
 
-	public Iterable<T> getUnpackedElements();
+	/**
+	 * Returns all elements that are currently in the buffer. This will unwrap the contained
+	 * elements from their {@link StreamRecord}.
+	 */
+	Iterable<T> getUnpackedElements();
 
-	public int size();
+	/**
+	 * Returns the number of elements that are currently in the buffer.
+	 */
+	int size();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/62df0a03/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
index 4a7f6df..4bcdf09 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/buffers/WindowBufferFactory.java
@@ -22,9 +22,33 @@ import org.apache.flink.configuration.Configuration;
 
 import java.io.Serializable;
 
+/**
+ * A factory for {@link WindowBuffer WindowBuffers}.
+ *
+ * @param <T> The type of elements that the created {@code WindowBuffer} can store.
+ * @param <B> The type of the created {@code WindowBuffer}
+ */
 public interface WindowBufferFactory<T, B extends WindowBuffer<T>> extends Serializable {
+
+	/**
+	 * Sets the {@link RuntimeContext} that is used to initialize eventual user functions
+	 * inside the created buffers.
+	 */
 	void setRuntimeContext(RuntimeContext ctx);
+
+	/**
+	 * Calls {@code open()} on eventual user functions inside the buffer.
+	 */
 	void open(Configuration config) throws Exception;
+
+	/**
+	 * Calls {@code close()} on eventual user functions inside the buffer.
+	 */
+
 	void close() throws Exception;
+
+	/**
+	 * Creates a new {@code WindowBuffer}.
+	 */
 	B create();
 }


Mime
View raw message