flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [17/36] flink git commit: [scala] [streaming] Added scala window helpers + timestamp rework for lambda support
Date Wed, 07 Jan 2015 14:12:56 GMT
[scala] [streaming] Added scala window helpers + timestamp rework for lambda support

Conflicts:
	flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1492e966
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1492e966
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1492e966

Branch: refs/heads/release-0.8
Commit: 1492e96670b6450e0c24ea31fe66d77c7d6a9c64
Parents: 5995248
Author: Gyula Fora <gyfora@apache.org>
Authored: Fri Jan 2 20:04:51 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Jan 5 18:05:58 2015 +0100

----------------------------------------------------------------------
 .../api/datastream/CoWindowDataStream.java      | 20 +++---
 .../api/datastream/ConnectedDataStream.java     | 30 ++++----
 .../api/datastream/TemporalOperator.java        | 39 ++++++-----
 .../co/CoGroupedWindowReduceInvokable.java      |  4 +-
 .../operator/co/CoWindowInvokable.java          |  8 +--
 .../operator/co/CoWindowReduceInvokable.java    | 15 ++--
 .../api/invokable/util/DefaultTimeStamp.java    | 39 -----------
 .../streaming/api/invokable/util/TimeStamp.java | 46 -------------
 .../streaming/api/windowing/helper/Delta.java   |  4 +-
 .../api/windowing/helper/SystemTimestamp.java   | 37 ++++++++++
 .../streaming/api/windowing/helper/Time.java    | 72 ++++++++++++++++----
 .../api/windowing/helper/Timestamp.java         | 39 +++++++++++
 .../api/windowing/helper/TimestampWrapper.java  | 44 ++++++++++++
 .../windowing/policy/ActiveTriggerPolicy.java   |  6 +-
 .../windowing/policy/TimeEvictionPolicy.java    | 31 +++++----
 .../api/windowing/policy/TimeTriggerPolicy.java | 42 ++++++------
 .../streaming/api/WindowCrossJoinTest.java      | 30 ++++----
 .../operator/CoGroupedWindowReduceTest.java     |  5 +-
 .../invokable/operator/CoWindowReduceTest.java  |  5 +-
 .../api/invokable/operator/CoWindowTest.java    | 23 +++----
 .../operator/GroupedWindowInvokableTest.java    | 21 +++---
 .../invokable/operator/WindowInvokableTest.java | 16 ++---
 .../policy/TimeEvictionPolicyTest.java          | 12 ++--
 .../windowing/policy/TimeTriggerPolicyTest.java | 28 +++-----
 .../examples/windowing/DeltaExtractExample.java |  6 +-
 .../flink/api/scala/streaming/DataStream.scala  | 19 +-----
 .../scala/streaming/StreamCrossOperator.scala   |  2 +-
 .../streaming/StreamExecutionEnvironment.scala  | 12 +++-
 .../scala/streaming/StreamJoinOperator.scala    |  1 +
 .../scala/streaming/WindowedDataStream.scala    |  6 +-
 .../api/scala/streaming/windowing/Delta.scala   | 47 +++++++++++++
 .../api/scala/streaming/windowing/Time.scala    | 55 +++++++++++++++
 32 files changed, 454 insertions(+), 310 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
index c8c634a..9129f9e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoWindowDataStream.java
@@ -22,33 +22,33 @@ import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * A {@link CoWindowDataStream} represents two data streams whose elements are
  * batched together into sliding windows. Operation
  * {@link #reduce(CoReduceFunction)} can be applied for each window.
- *
+ * 
  * @param <IN1>
  *            The type of the first input data stream
  * @param <IN2>
  *            The type of the second input data stream
  */
 public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2> {
-	TimeStamp<IN1> timeStamp1;
-	TimeStamp<IN2> timeStamp2;
+	TimestampWrapper<IN1> timeStamp1;
+	TimestampWrapper<IN2> timeStamp2;
 
 	protected CoWindowDataStream(DataStream<IN1> dataStream1, DataStream<IN2> dataStream2,
 			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
-			TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
+			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		super(dataStream1, dataStream2, windowSize1, windowSize2, slideInterval1, slideInterval2);
 		this.timeStamp1 = timeStamp1;
 		this.timeStamp2 = timeStamp2;
 	}
 
 	protected CoWindowDataStream(ConnectedDataStream<IN1, IN2> coDataStream, long windowSize1,
-			long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
-			TimeStamp<IN2> timeStamp2) {
+			long windowSize2, long slideInterval1, long slideInterval2,
+			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		super(coDataStream, windowSize1, windowSize2, slideInterval1, slideInterval2);
 		this.timeStamp1 = timeStamp1;
 		this.timeStamp2 = timeStamp2;
@@ -96,9 +96,9 @@ public class CoWindowDataStream<IN1, IN2> extends CoBatchedDataStream<IN1, IN2>
 			CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		CoWindowReduceInvokable<IN1, IN2, OUT> invokable;
 		if (isGrouped) {
-			invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
-					batchSize2, slideSize1, slideSize2, keySelector1, keySelector2, timeStamp1,
-					timeStamp2);
+			invokable = new CoGroupedWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer),
+					batchSize1, batchSize2, slideSize1, slideSize2, keySelector1, keySelector2,
+					timeStamp1, timeStamp2);
 		} else {
 			invokable = new CoWindowReduceInvokable<IN1, IN2, OUT>(clean(coReducer), batchSize1,
 					batchSize2, slideSize1, slideSize2, timeStamp1, timeStamp2);

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index 39b6460..efd9531 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -36,8 +36,8 @@ import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * The ConnectedDataStream represents a stream for two different data types. It
@@ -305,8 +305,8 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The transformed {@link ConnectedDataStream}
 	 */
 	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
-			long slideInterval1, long slideInterval2, TimeStamp<IN1> timeStamp1,
-			TimeStamp<IN2> timeStamp2) {
+			long slideInterval1, long slideInterval2, TimestampWrapper<IN1> timeStamp1,
+			TimestampWrapper<IN2> timeStamp2) {
 		if (windowSize1 < 1 || windowSize2 < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
 		}
@@ -338,10 +338,12 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            second input data stream are slid by after each transformation
 	 * @return The transformed {@link ConnectedDataStream}
 	 */
+	@SuppressWarnings("unchecked")
 	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
 			long slideInterval1, long slideInterval2) {
 		return window(windowSize1, windowSize2, slideInterval1, slideInterval2,
-				new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
 	}
 
 	/**
@@ -365,7 +367,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * @return The transformed {@link ConnectedDataStream}
 	 */
 	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2,
-			TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
+			TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		return window(windowSize1, windowSize2, windowSize1, windowSize2, timeStamp1, timeStamp2);
 	}
 
@@ -384,9 +386,11 @@ public class ConnectedDataStream<IN1, IN2> {
 	 *            milliseconds
 	 * @return The transformed {@link ConnectedDataStream}
 	 */
+	@SuppressWarnings("unchecked")
 	public CoWindowDataStream<IN1, IN2> window(long windowSize1, long windowSize2) {
 		return window(windowSize1, windowSize2, windowSize1, windowSize2,
-				new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
 	}
 
 	/**
@@ -479,10 +483,12 @@ public class ConnectedDataStream<IN1, IN2> {
 	 * 
 	 * @return The transformed {@link DataStream}.
 	 */
+	@SuppressWarnings("unchecked")
 	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
 			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval) {
 		return windowReduce(coWindowFunction, windowSize, slideInterval,
-				new DefaultTimeStamp<IN1>(), new DefaultTimeStamp<IN2>());
+				(TimestampWrapper<IN1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<IN2>) SystemTimestamp.getWrapper());
 	}
 
 	/**
@@ -510,7 +516,7 @@ public class ConnectedDataStream<IN1, IN2> {
 	 */
 	public <OUT> SingleOutputStreamOperator<OUT, ?> windowReduce(
 			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize, long slideInterval,
-			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
 
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
@@ -541,8 +547,8 @@ public class ConnectedDataStream<IN1, IN2> {
 
 	public <OUT> SingleOutputStreamOperator<OUT, ?> addGeneralWindowCombine(
 			CoWindowFunction<IN1, IN2, OUT> coWindowFunction, TypeInformation<OUT> outTypeInfo,
-			long windowSize, long slideInterval, TimeStamp<IN1> timestamp1,
-			TimeStamp<IN2> timestamp2) {
+			long windowSize, long slideInterval, TimestampWrapper<IN1> timestamp1,
+			TimestampWrapper<IN2> timestamp2) {
 
 		if (windowSize < 1) {
 			throw new IllegalArgumentException("Window size must be positive");
@@ -550,7 +556,7 @@ public class ConnectedDataStream<IN1, IN2> {
 		if (slideInterval < 1) {
 			throw new IllegalArgumentException("Slide interval must be positive");
 		}
-		
+
 		return addCoFunction("coWindowReduce", outTypeInfo, new CoWindowInvokable<IN1, IN2, OUT>(
 				clean(coWindowFunction), windowSize, slideInterval, timestamp1, timestamp2));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
index cd8aabd..e5385f0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/TemporalOperator.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public abstract class TemporalOperator<I1, I2, OP> {
 
@@ -29,8 +29,8 @@ public abstract class TemporalOperator<I1, I2, OP> {
 	public long windowSize;
 	public long slideInterval;
 
-	public TimeStamp<I1> timeStamp1;
-	public TimeStamp<I2> timeStamp2;
+	public TimestampWrapper<I1> timeStamp1;
+	public TimestampWrapper<I2> timeStamp2;
 
 	public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
 		if (input1 == null || input2 == null) {
@@ -41,37 +41,37 @@ public abstract class TemporalOperator<I1, I2, OP> {
 	}
 
 	/**
-	 * Continues a temporal Join transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be joined.
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be transformed.
 	 * 
 	 * @param windowSize
 	 *            The size of the window in milliseconds.
-	 * @return An incomplete Join transformation. Call {@link JoinWindow#where}
-	 *         to continue the Join.
+	 * @return An incomplete temporal transformation.
 	 */
 	public OP onWindow(long windowSize) {
 		return onWindow(windowSize, windowSize);
 	}
 
 	/**
-	 * Continues a temporal Join transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be joined.
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be transformed.
 	 * 
 	 * @param windowSize
 	 *            The size of the window in milliseconds.
 	 * @param slideInterval
 	 *            The slide size of the window.
-	 * @return An incomplete Join transformation. Call {@link JoinWindow#where}
-	 *         to continue the Join.
+	 * @return An incomplete temporal transformation.
 	 */
+	@SuppressWarnings("unchecked")
 	public OP onWindow(long windowSize, long slideInterval) {
-		return onWindow(windowSize, slideInterval, new DefaultTimeStamp<I1>(),
-				new DefaultTimeStamp<I2>());
+		return onWindow(windowSize, slideInterval,
+				(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
 	}
 
 	/**
-	 * Continues a temporal Join transformation.<br/>
-	 * Defines the window size on which the two DataStreams will be joined.
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be transformed.
 	 * 
 	 * @param windowSize
 	 *            The size of the window in milliseconds.
@@ -83,11 +83,10 @@ public abstract class TemporalOperator<I1, I2, OP> {
 	 * @param timeStamp2
 	 *            The timestamp used to extract time from the elements of the
 	 *            second data stream.
-	 * @return An incomplete Join transformation. Call {@link JoinWindow#where}
-	 *         to continue the Join.
+	 * @return An incomplete temporal transformation.
 	 */
-	public OP onWindow(long windowSize, long slideInterval, TimeStamp<I1> timeStamp1,
-			TimeStamp<I2> timeStamp2) {
+	public OP onWindow(long windowSize, long slideInterval, TimestampWrapper<I1> timeStamp1,
+			TimestampWrapper<I2> timeStamp2) {
 
 		this.windowSize = windowSize;
 		this.slideInterval = slideInterval;

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
index 4905566..736239f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedWindowReduceInvokable.java
@@ -22,8 +22,8 @@ import java.util.Map;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
 		CoWindowReduceInvokable<IN1, IN2, OUT> {
@@ -38,7 +38,7 @@ public class CoGroupedWindowReduceInvokable<IN1, IN2, OUT> extends
 	public CoGroupedWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer,
 			long windowSize1, long windowSize2, long slideInterval1, long slideInterval2,
 			KeySelector<IN1, ?> keySelector1, KeySelector<IN2, ?> keySelector2,
-			TimeStamp<IN1> timestamp1, TimeStamp<IN2> timestamp2) {
+			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
 		super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2, timestamp1,
 				timestamp2);
 		this.keySelector1 = keySelector1;

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
index 7df5668..59552f4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowInvokable.java
@@ -23,8 +23,8 @@ import java.util.List;
 
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.state.CircularFifoList;
 
 public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
@@ -35,8 +35,8 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
 	protected long slideSize;
 	protected CircularFifoList<StreamRecord<IN1>> circularList1;
 	protected CircularFifoList<StreamRecord<IN2>> circularList2;
-	protected TimeStamp<IN1> timeStamp1;
-	protected TimeStamp<IN2> timeStamp2;
+	protected TimestampWrapper<IN1> timeStamp1;
+	protected TimestampWrapper<IN2> timeStamp2;
 
 	protected StreamWindow window;
 
@@ -44,7 +44,7 @@ public class CoWindowInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
 	protected long nextRecordTime;
 
 	public CoWindowInvokable(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
-			long slideInterval, TimeStamp<IN1> timeStamp1, TimeStamp<IN2> timeStamp2) {
+			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		super(coWindowFunction);
 		this.coWindowFunction = coWindowFunction;
 		this.windowSize = windowSize;

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
index fa47761..0c8598f 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowReduceInvokable.java
@@ -19,8 +19,7 @@ package org.apache.flink.streaming.api.invokable.operator.co;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokable<IN1, IN2, OUT> {
 	private static final long serialVersionUID = 1L;
@@ -28,14 +27,14 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 	protected long startTime2;
 	protected long nextRecordTime1;
 	protected long nextRecordTime2;
-	protected TimeStamp<IN1> timestamp1;
-	protected TimeStamp<IN2> timestamp2;
+	protected TimestampWrapper<IN1> timestamp1;
+	protected TimestampWrapper<IN2> timestamp2;
 	protected StreamWindow<IN1> window1;
 	protected StreamWindow<IN2> window2;
 
 	public CoWindowReduceInvokable(CoReduceFunction<IN1, IN2, OUT> coReducer, long windowSize1,
-			long windowSize2, long slideInterval1, long slideInterval2, TimeStamp<IN1> timestamp1,
-			TimeStamp<IN2> timestamp2) {
+			long windowSize2, long slideInterval1, long slideInterval2,
+			TimestampWrapper<IN1> timestamp1, TimestampWrapper<IN2> timestamp2) {
 		super(coReducer, windowSize1, windowSize2, slideInterval1, slideInterval2);
 		this.timestamp1 = timestamp1;
 		this.timestamp2 = timestamp2;
@@ -51,10 +50,10 @@ public class CoWindowReduceInvokable<IN1, IN2, OUT> extends CoBatchReduceInvokab
 		this.window2 = new StreamWindow<IN2>(batchSize2, slideSize2);
 		this.batch1 = this.window1;
 		this.batch2 = this.window2;
-		if (timestamp1 instanceof DefaultTimeStamp) {
+		if (timestamp1.isDefaultTimestamp()) {
 			(new TimeCheck1()).start();
 		}
-		if (timestamp2 instanceof DefaultTimeStamp) {
+		if (timestamp2.isDefaultTimestamp()) {
 			(new TimeCheck2()).start();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
deleted file mode 100644
index 2f22e8e..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/DefaultTimeStamp.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-/**
- * Default timestamp function that uses the Java System.currentTimeMillis()
- * method to retrieve a timestamp.
- *
- * @param <T>
- *            Type of the inputs of the reducing function.
- */
-public class DefaultTimeStamp<T> implements TimeStamp<T> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public long getTimestamp(T value) {
-		return System.currentTimeMillis();
-	}
-
-	@Override
-	public long getStartTime() {
-		return System.currentTimeMillis();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
deleted file mode 100644
index 86fa101..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/util/TimeStamp.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.util;
-
-import java.io.Serializable;
-
-/**
- * Interface for getting a timestamp from a custom value. Used in window
- * reduces. In order to work properly, the timestamps must be non-decreasing.
- *
- * @param <T>
- *            Type of the value to create the timestamp from.
- */
-public interface TimeStamp<T> extends Serializable {
-
-	/**
-	 * Values
-	 * 
-	 * @param value
-	 *            The value to create the timestamp from
-	 * @return The timestamp
-	 */
-	public long getTimestamp(T value);
-
-	/**
-	 * Function to define the starting time for reference
-	 * 
-	 * @return The starting timestamp
-	 */
-	public long getStartTime();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 9c8c5ca..5434a4e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -84,8 +84,8 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
 	 *            The threshold used by the delta function.
 	 * @return Helper representing a delta trigger or eviction policy
 	 */
-	public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction, DATA initVal,
-			double threshold) {
+	public static <DATA> Delta<DATA> of(double threshold, DeltaFunction<DATA> deltaFunction,
+			DATA initVal) {
 		return new Delta<DATA>(deltaFunction, initVal, threshold);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
new file mode 100644
index 0000000..8581ac5
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/SystemTimestamp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+/**
+ * {@link Timestamp} implementation to be used when system time is needed to
+ * determine windows
+ */
+public class SystemTimestamp<T> implements Timestamp<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public long getTimestamp(T value) {
+		return System.currentTimeMillis();
+	}
+
+	public static <R> TimestampWrapper<R> getWrapper() {
+		return new TimestampWrapper<R>(new SystemTimestamp<R>(), System.currentTimeMillis());
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index d987e32..9dc1c8c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -19,8 +19,6 @@ package org.apache.flink.streaming.api.windowing.helper;
 
 import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
@@ -39,7 +37,7 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	private long length;
 	private TimeUnit granularity;
-	private TimeStamp<DATA> timeStamp;
+	private TimestampWrapper<DATA> timestampWrapper;
 	private long delay;
 
 	/**
@@ -53,31 +51,52 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	 *            the smallest possible granularity is milliseconds. Any smaller
 	 *            time unit might cause an error at runtime due to conversion
 	 *            problems.
-	 * @param timeStamp
+	 * @param timestamp
 	 *            The user defined timestamp that will be used to extract time
 	 *            information from the incoming elements
+	 * @param startTime
+	 *            The startTime of the stream for computing the first window
 	 */
-	private Time(long length, TimeUnit timeUnit, TimeStamp<DATA> timeStamp) {
+	private Time(long length, TimeUnit timeUnit, Timestamp<DATA> timestamp, long startTime) {
+		this(length, timeUnit, new TimestampWrapper<DATA>(timestamp, startTime));
+	}
+
+	/**
+	 * Creates an helper representing a trigger which triggers every given
+	 * length or an eviction which evicts all elements older than length.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timeUnit
+	 *            The unit of time such as minute oder millisecond. Note that
+	 *            the smallest possible granularity is milliseconds. Any smaller
+	 *            time unit might cause an error at runtime due to conversion
+	 *            problems.
+	 * @param timestampWrapper
+	 *            The user defined {@link TimestampWrapper} that will be used to
+	 *            extract time information from the incoming elements
+	 */
+	private Time(long length, TimeUnit timeUnit, TimestampWrapper<DATA> timestampWrapper) {
 		this.length = length;
 		this.granularity = timeUnit;
-		this.timeStamp = timeStamp;
+		this.timestampWrapper = timestampWrapper;
 		this.delay = 0;
 	}
 
 	@Override
 	public EvictionPolicy<DATA> toEvict() {
-		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timeStamp);
+		return new TimeEvictionPolicy<DATA>(granularityInMillis(), timestampWrapper);
 	}
 
 	@Override
 	public TriggerPolicy<DATA> toTrigger() {
-		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timeStamp, delay);
+		return new TimeTriggerPolicy<DATA>(granularityInMillis(), timestampWrapper, delay);
 	}
 
 	/**
 	 * Creates a helper representing a time trigger which triggers every given
 	 * length (slide size) or a time eviction which evicts all elements older
-	 * than length (window size).
+	 * than length (window size) using System time.
 	 * 
 	 * @param length
 	 *            The number of time units
@@ -88,8 +107,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	 *            problems.
 	 * @return Helper representing the time based trigger and eviction policy
 	 */
+	@SuppressWarnings("unchecked")
 	public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
-		return new Time<DATA>(length, timeUnit, new DefaultTimeStamp<DATA>());
+		return new Time<DATA>(length, timeUnit,
+				(TimestampWrapper<DATA>) SystemTimestamp.getWrapper());
 	}
 
 	/**
@@ -99,13 +120,32 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	 * 
 	 * @param length
 	 *            The number of time units
-	 * @param timeStamp
+	 * @param timestamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
+	 * @param startTime
+	 *            The startTime used to compute the first window
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp, long startTime) {
+		return new Time<DATA>(length, null, timestamp, startTime);
+	}
+
+	/**
+	 * Creates a helper representing a time trigger which triggers every given
+	 * length (slide size) or a time eviction which evicts all elements older
+	 * than length (window size) using a user defined timestamp extractor. By
+	 * default the start time is set to 0.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timestamp
 	 *            The user defined timestamp that will be used to extract time
 	 *            information from the incoming elements
 	 * @return Helper representing the time based trigger and eviction policy
 	 */
-	public static <DATA> Time<DATA> of(long length, TimeStamp<DATA> timeStamp) {
-		return new Time<DATA>(length, TimeUnit.MILLISECONDS, timeStamp);
+	public static <DATA> Time<DATA> of(long length, Timestamp<DATA> timestamp) {
+		return of(length, timestamp, 0);
 	}
 
 	/**
@@ -121,6 +161,10 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 	}
 
 	private long granularityInMillis() {
-		return this.granularity.toMillis(this.length);
+		if (granularity != null) {
+			return this.granularity.toMillis(this.length);
+		} else {
+			return this.length;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
new file mode 100644
index 0000000..fea6020
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Timestamp.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+/**
+ * Interface for getting a timestamp from a custom value. Used in window
+ * reduces. In order to work properly, the timestamps must be non-decreasing.
+ *
+ * @param <T>
+ *            Type of the value to create the timestamp from.
+ */
+public interface Timestamp<T> extends Serializable {
+
+	/**
+	 * Values
+	 * 
+	 * @param value
+	 *            The value to create the timestamp from
+	 * @return The timestamp
+	 */
+	public long getTimestamp(T value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
new file mode 100644
index 0000000..8c3a09d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/TimestampWrapper.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.helper;
+
+import java.io.Serializable;
+
+public class TimestampWrapper<T> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+	private long startTime;
+	private Timestamp<T> timestamp;
+
+	public TimestampWrapper(Timestamp<T> timeStamp, long startTime) {
+		this.timestamp = timeStamp;
+		this.startTime = startTime;
+	}
+
+	public long getTimestamp(T in) {
+		return timestamp.getTimestamp(in);
+	}
+
+	public long getStartTime() {
+		return startTime;
+	}
+
+	public boolean isDefaultTimestamp() {
+		return timestamp instanceof SystemTimestamp;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
index a8a704d..414250c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/ActiveTriggerPolicy.java
@@ -17,7 +17,7 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 
 /**
  * This interface extends the {@link TriggerPolicy} interface with functionality
@@ -28,7 +28,7 @@ import org.apache.flink.streaming.api.invokable.util.TimeStamp;
  * first. It can return zero ore more fake data points which will be added
  * before the the currently arrived real element gets processed. This allows to
  * handle empty windows in time based windowing with an user defined
- * {@link TimeStamp}. Triggers are not called on fake datapoint. A fake
+ * {@link Timestamp}. Triggers are not called on fake datapoint. A fake
  * datapoint is always considered as triggered.
  * 
  * 2) An active trigger has a factory method for a runnable. This factory method
@@ -49,7 +49,7 @@ public interface ActiveTriggerPolicy<DATA> extends TriggerPolicy<DATA> {
 	 * first. It can return zero ore more fake data points which will be added
 	 * before the the currently arrived real element gets processed. This allows
 	 * to handle empty windows in time based windowing with an user defined
-	 * {@link TimeStamp}. Triggers are not called on fake datapoints. A fake
+	 * {@link Timestamp}. Triggers are not called on fake datapoints. A fake
 	 * datapoint is always considered as triggered.
 	 * 
 	 * @param datapoint

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index aca1dee..16c30fc 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -19,14 +19,15 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * This eviction policy evicts all elements which are older then a specified
- * time. The time is measured using a given {@link TimeStamp} implementation. A
+ * time. The time is measured using a given {@link Timestamp} implementation. A
  * point in time is always represented as long. Therefore, the granularity can
  * be set as long value as well.
- *
+ * 
  * @param <DATA>
  *            The type of the incoming data points which are processed by this
  *            policy.
@@ -40,12 +41,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 	private static final long serialVersionUID = -1457476766124518220L;
 
 	private long granularity;
-	private TimeStamp<DATA> timestamp;
+	private TimestampWrapper<DATA> timestampWrapper;
 	private LinkedList<Long> buffer = new LinkedList<Long>();
 
 	/**
 	 * This eviction policy evicts all elements which are older than a specified
-	 * time. The time is measured using a given {@link TimeStamp}
+	 * time. The time is measured using a given {@link Timestamp}
 	 * implementation. A point in time is always represented as long. Therefore,
 	 * the granularity can be set as long value as well. If this value is set to
 	 * 2 the policy will evict all elements which are older as 2.
@@ -60,12 +61,12 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 	 *            The granularity of the eviction. If this value is set to 2 the
 	 *            policy will evict all elements which are older as 2(if
 	 *            (time(X)<current time-granularity) evict X).
-	 * @param timestamp
-	 *            The {@link TimeStamp} to measure the time with. This can be
-	 *            either user defined of provided by the API.
+	 * @param timestampWrapper
+	 *            The {@link TimestampWrapper} to measure the time with. This
+	 *            can be either user defined of provided by the API.
 	 */
-	public TimeEvictionPolicy(long granularity, TimeStamp<DATA> timestamp) {
-		this.timestamp = timestamp;
+	public TimeEvictionPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
+		this.timestampWrapper = timestampWrapper;
 		this.granularity = granularity;
 	}
 
@@ -78,7 +79,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		try {
 			threshold = (Long) datapoint - granularity;
 		} catch (ClassCastException e) {
-			threshold = timestamp.getTimestamp((DATA) datapoint) - granularity;
+			threshold = timestampWrapper.getTimestamp((DATA) datapoint) - granularity;
 		}
 
 		// return result
@@ -91,9 +92,9 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 		checkForDeleted(bufferSize);
 
-		//remember timestamp
-		long time=timestamp.getTimestamp(datapoint);
-		
+		// remember timestamp
+		long time = timestampWrapper.getTimestamp(datapoint);
+
 		// delete and count expired tuples
 		long threshold = time - granularity;
 		int counter = deleteAndCountExpired(threshold);
@@ -130,7 +131,7 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 
 	@Override
 	public TimeEvictionPolicy<DATA> clone() {
-		return new TimeEvictionPolicy<DATA>(granularity, timestamp);
+		return new TimeEvictionPolicy<DATA>(granularity, timestampWrapper);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 57bccf2..1e91b8e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -19,12 +19,12 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 
 /**
  * This trigger policy triggers with regard to the time. The is measured using a
- * given {@link TimeStamp} implementation. A point in time is always represented
+ * given {@link Timestamp} implementation. A point in time is always represented
  * as long. Therefore, parameters such as granularity and delay can be set as
  * long value as well.
  * 
@@ -42,12 +42,12 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	protected long startTime;
 	protected long granularity;
-	protected TimeStamp<DATA> timestamp;
+	protected TimestampWrapper<DATA> timestampWrapper;
 	protected long delay;
 
 	/**
 	 * This trigger policy triggers with regard to the time. The is measured
-	 * using a given {@link TimeStamp} implementation. A point in time is always
+	 * using a given {@link Timestamp} implementation. A point in time is always
 	 * represented as long. Therefore, parameters such as granularity can be set
 	 * as long value as well. If this value for the granularity is set to 2 for
 	 * example, the policy will trigger at every second point in time.
@@ -55,22 +55,22 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * @param granularity
 	 *            The granularity of the trigger. If this value is set to x the
 	 *            policy will trigger at every x-th time point
-	 * @param timestamp
-	 *            The {@link TimeStamp} to measure the time with. This can be
-	 *            either user defined of provided by the API.
+	 * @param timestampWrapper
+	 *            The {@link TimestampWrapper} to measure the time with. This
+	 *            can be either user defined of provided by the API.
 	 * @param timeWrapper
 	 *            This policy creates fake elements to not miss windows in case
 	 *            no element arrived within the duration of the window. This
 	 *            extractor should wrap a long into such an element of type
 	 *            DATA.
 	 */
-	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp) {
-		this(granularity, timestamp, 0);
+	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper) {
+		this(granularity, timestampWrapper, 0);
 	}
 
 	/**
 	 * This is mostly the same as
-	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, TimeStamp)}. In addition
+	 * {@link TimeTriggerPolicy#TimeTriggerPolicy(long, Timestamp)}. In addition
 	 * to granularity and timestamp a delay can be specified for the first
 	 * trigger. If the start time given by the timestamp is x, the delay is y,
 	 * and the granularity is z, the first trigger will happen at x+y+z.
@@ -78,9 +78,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 * @param granularity
 	 *            The granularity of the trigger. If this value is set to 2 the
 	 *            policy will trigger at every second time point
-	 * @param timestamp
-	 *            The {@link TimeStamp} to measure the time with. This can be
-	 *            either user defined of provided by the API.
+	 * @param timestampWrapper
+	 *            The {@link TimestampWrapper} to measure the time with. This
+	 *            can be either user defined of provided by the API.
 	 * @param delay
 	 *            A delay for the first trigger. If the start time given by the
 	 *            timestamp is x, the delay is y, and the granularity is z, the
@@ -91,9 +91,9 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 *            extractor should wrap a long into such an element of type
 	 *            DATA.
 	 */
-	public TimeTriggerPolicy(long granularity, TimeStamp<DATA> timestamp, long delay) {
-		this.startTime = timestamp.getStartTime() + delay;
-		this.timestamp = timestamp;
+	public TimeTriggerPolicy(long granularity, TimestampWrapper<DATA> timestampWrapper, long delay) {
+		this.startTime = timestampWrapper.getStartTime() + delay;
+		this.timestampWrapper = timestampWrapper;
 		this.granularity = granularity;
 		this.delay = delay;
 	}
@@ -107,7 +107,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		LinkedList<Object> fakeElements = new LinkedList<Object>();
 		// check if there is more then one window border missed
 		// use > here. In case >= would fit, the regular call will do the job.
-		while (timestamp.getTimestamp(datapoint) >= startTime + granularity) {
+		while (timestampWrapper.getTimestamp(datapoint) >= startTime + granularity) {
 			startTime += granularity;
 			fakeElements.add(startTime - 1);
 		}
@@ -127,7 +127,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 	 */
 	@Override
 	public Runnable createActiveTriggerRunnable(ActiveTriggerCallback callback) {
-		if (this.timestamp instanceof DefaultTimeStamp) {
+		if (this.timestampWrapper.isDefaultTimestamp()) {
 			return new TimeCheck(callback);
 		} else {
 			return null;
@@ -177,7 +177,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	@Override
 	public synchronized boolean notifyTrigger(DATA datapoint) {
-		long recordTime = timestamp.getTimestamp(datapoint);
+		long recordTime = timestampWrapper.getTimestamp(datapoint);
 		if (recordTime >= startTime + granularity) {
 			if (granularity != 0) {
 				startTime = recordTime - ((recordTime - startTime) % granularity);
@@ -190,7 +190,7 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 
 	@Override
 	public TimeTriggerPolicy<DATA> clone() {
-		return new TimeTriggerPolicy<DATA>(granularity, timestamp, delay);
+		return new TimeTriggerPolicy<DATA>(granularity, timestampWrapper, delay);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
index b71bb25..056d0a8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WindowCrossJoinTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
 
 public class WindowCrossJoinTest implements Serializable {
@@ -96,12 +96,16 @@ public class WindowCrossJoinTest implements Serializable {
 		DataStream<Tuple2<Integer, String>> inStream1 = env.fromCollection(in1);
 		DataStream<Tuple1<Integer>> inStream2 = env.fromCollection(in2);
 
-		inStream1.join(inStream2).onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
-				.where(0).equalTo(0).addSink(new JoinResultSink());
+		inStream1
+				.join(inStream2)
+				.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
+						new MyTimestamp<Tuple1<Integer>>()).where(0).equalTo(0)
+				.addSink(new JoinResultSink());
 
 		inStream1
 				.cross(inStream2)
-				.onWindow(1000, 1000, new MyTimestamp1(), new MyTimestamp2())
+				.onWindow(1000, 1000, new MyTimestamp<Tuple2<Integer, String>>(),
+						new MyTimestamp<Tuple1<Integer>>())
 				.with(new CrossFunction<Tuple2<Integer, String>, Tuple1<Integer>, Tuple2<Tuple2<Integer, String>, Tuple1<Integer>>>() {
 
 					private static final long serialVersionUID = 1L;
@@ -119,25 +123,15 @@ public class WindowCrossJoinTest implements Serializable {
 		assertEquals(crossExpectedResults, crossResults);
 	}
 
-	private static class MyTimestamp1 implements TimeStamp<Tuple2<Integer, String>> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public long getTimestamp(Tuple2<Integer, String> value) {
-			return 101L;
+	private static class MyTimestamp<T> extends TimestampWrapper<T> {
+		public MyTimestamp() {
+			super(null, 0);
 		}
 
-		@Override
-		public long getStartTime() {
-			return 100L;
-		}
-	}
-
-	private static class MyTimestamp2 implements TimeStamp<Tuple1<Integer>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public long getTimestamp(Tuple1<Integer> value) {
+		public long getTimestamp(T value) {
 			return 101L;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
index e3f2a1b..508366c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupedWindowReduceTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
@@ -82,13 +82,14 @@ public class CoGroupedWindowReduceTest {
 		}
 	}
 
-	public static final class MyTimeStamp<T> implements TimeStamp<T> {
+	public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
 		private static final long serialVersionUID = 1L;
 
 		private Iterator<Long> timestamps;
 		private long start;
 
 		public MyTimeStamp(List<Long> timestamps) {
+			super(null, 0);
 			this.timestamps = timestamps.iterator();
 			this.start = timestamps.get(0);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
index 90ad483..035a021 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowReduceTest.java
@@ -27,7 +27,7 @@ import java.util.List;
 
 import org.apache.flink.streaming.api.function.co.CoReduceFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowReduceInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.junit.Test;
 
@@ -57,13 +57,14 @@ public class CoWindowReduceTest {
 		}
 	}
 
-	public static final class MyTimeStamp<T> implements TimeStamp<T> {
+	public static final class MyTimeStamp<T> extends TimestampWrapper<T> {
 		private static final long serialVersionUID = 1L;
 
 		private Iterator<Long> timestamps;
 		private long start;
 
 		public MyTimeStamp(List<Long> timestamps) {
+			super(null, 0);
 			this.timestamps = timestamps.iterator();
 			this.start = timestamps.get(0);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
index c6d446a..4ab3492 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoWindowTest.java
@@ -27,7 +27,8 @@ import java.util.Set;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.function.co.CoWindowFunction;
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.util.MockCoContext;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -80,7 +81,7 @@ public class CoWindowTest {
 
 	}
 
-	private static final class MyTS1 implements TimeStamp<Integer> {
+	private static final class MyTS1 implements Timestamp<Integer> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -89,14 +90,9 @@ public class CoWindowTest {
 			return value;
 		}
 
-		@Override
-		public long getStartTime() {
-			return 1;
-		}
-
 	}
 
-	private static final class MyTS2 implements TimeStamp<Tuple2<Integer, Integer>> {
+	private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -105,18 +101,14 @@ public class CoWindowTest {
 			return value.f0;
 		}
 
-		@Override
-		public long getStartTime() {
-			return 1;
-		}
-
 	}
 
 	@Test
 	public void coWindowGroupReduceTest2() throws Exception {
 
 		CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>(
-				new MyCoGroup1(), 2, 1, new MyTS1(), new MyTS1());
+				new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
+				new TimestampWrapper<Integer>(new MyTS1(), 1));
 
 		// Windowsize 2, slide 1
 		// 1,2|2,3|3,4|4,5
@@ -152,7 +144,8 @@ public class CoWindowTest {
 		assertEquals(expected1, actual1);
 
 		CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
-				new MyCoGroup2(), 2, 3, new MyTS2(), new MyTS2());
+				new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
+						1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
 
 		// WindowSize 2, slide 3
 		// 1,2|4,5|7,8|

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index d97cadc..f38d5c1 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -28,7 +28,8 @@ import java.util.List;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
@@ -360,20 +361,18 @@ public class GroupedWindowInvokableTest {
 		expected.add(new Tuple2<Integer, String>(32, "b"));
 		expected.add(new Tuple2<Integer, String>(32, "c"));
 
-		TimeStamp<Tuple2<Integer, String>> myTimeStamp = new TimeStamp<Tuple2<Integer, String>>() {
+		Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
 			public long getTimestamp(Tuple2<Integer, String> value) {
 				return value.f0;
 			}
-
-			@Override
-			public long getStartTime() {
-				return 1;
-			}
 		};
 
+		TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>(
+				myTimeStamp, 1);
+
 		ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() {
 			private static final long serialVersionUID = 1L;
 
@@ -387,11 +386,11 @@ public class GroupedWindowInvokableTest {
 		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
 		// Trigger every 2 time units but delay the first trigger by 2 (First
 		// trigger after 4, then every 2)
-		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
 
 		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
 		// Always delete all elements older then 4
-		evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
+		evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
 
 		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
 
@@ -409,10 +408,10 @@ public class GroupedWindowInvokableTest {
 
 		// repeat the test with central eviction. The result should be the same.
 		triggers.clear();
-		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStamp, 2L));
+		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
 		evictions.clear();
 		LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
-		centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStamp));
+		centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
 
 		invokable = new GroupedWindowInvokable<Tuple2<Integer, String>, Tuple2<Integer, String>>(
 				myReduceFunction, keySelector, distributedTriggers, evictions, triggers,

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
index 421a999..83b4596 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -24,7 +24,8 @@ import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
@@ -62,18 +63,13 @@ public class WindowInvokableTest {
 		expected.add(10);
 		expected.add(32);
 
-		TimeStamp<Integer> myTimeStamp = new TimeStamp<Integer>() {
+		Timestamp<Integer> myTimeStamp = new Timestamp<Integer>() {
 			private static final long serialVersionUID = 1L;
 
 			@Override
 			public long getTimestamp(Integer value) {
 				return value;
 			}
-
-			@Override
-			public long getStartTime() {
-				return 1;
-			}
 		};
 
 		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
@@ -88,11 +84,13 @@ public class WindowInvokableTest {
 		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
 		// Trigger every 2 time units but delay the first trigger by 2 (First
 		// trigger after 4, then every 2)
-		triggers.add(new TimeTriggerPolicy<Integer>(2L, myTimeStamp, 2L));
+		triggers.add(new TimeTriggerPolicy<Integer>(2L, new TimestampWrapper<Integer>(myTimeStamp,
+				1), 2L));
 
 		LinkedList<EvictionPolicy<Integer>> evictions = new LinkedList<EvictionPolicy<Integer>>();
 		// Always delete all elements older then 4
-		evictions.add(new TimeEvictionPolicy<Integer>(4L, myTimeStamp));
+		evictions.add(new TimeEvictionPolicy<Integer>(4L, new TimestampWrapper<Integer>(
+				myTimeStamp, 1)));
 
 		WindowInvokable<Integer, Integer> invokable = new WindowReduceInvokable<Integer>(
 				myReduceFunction, triggers, evictions);

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
index 82c8841..b5d502b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicyTest.java
@@ -19,7 +19,8 @@ package org.apache.flink.streaming.api.windowing.policy;
 
 import java.util.LinkedList;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -35,25 +36,20 @@ public class TimeEvictionPolicyTest {
 
 		// create a timestamp
 		@SuppressWarnings("serial")
-		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
 			@Override
 			public long getTimestamp(Integer value) {
 				return value;
 			}
 
-			@Override
-			public long getStartTime() {
-				return 0;
-			}
-
 		};
 
 		// test different granularity
 		for (long granularity = 0; granularity < 40; granularity++) {
 			// create policy
 			TimeEvictionPolicy<Integer> policy = new TimeEvictionPolicy<Integer>(granularity,
-					timeStamp);
+					new TimestampWrapper<Integer>(timeStamp, 0));
 
 			// The trigger status should not effect the policy. Therefore, it's
 			// value is changed after each usage.

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
index 9c77a55..2bdbd96 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicyTest.java
@@ -21,7 +21,8 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.junit.Test;
 
 public class TimeTriggerPolicyTest {
@@ -33,29 +34,24 @@ public class TimeTriggerPolicyTest {
 
 		// create a timestamp
 		@SuppressWarnings("serial")
-		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
 			@Override
 			public long getTimestamp(Integer value) {
 				return value;
 			}
 
-			@Override
-			public long getStartTime() {
-				return 0;
-			}
-
 		};
 
 		// test different granularity
 		for (long granularity = 0; granularity < 31; granularity++) {
 			// create policy
-			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity, timeStamp);
+
+			TriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(granularity,
+					new TimestampWrapper<Integer>(timeStamp, 0));
 
 			// remember window border
-			// Remark: This might NOT work in case the timeStamp uses
-			// System.getCurrentTimeMillis to determine the start time.
-			long currentTime = timeStamp.getStartTime();
+			long currentTime = 0;
 
 			// test by adding values
 			for (int i = 0; i < times.length; i++) {
@@ -85,22 +81,18 @@ public class TimeTriggerPolicyTest {
 
 		// create a timestamp
 		@SuppressWarnings("serial")
-		TimeStamp<Integer> timeStamp = new TimeStamp<Integer>() {
+		Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
 
 			@Override
 			public long getTimestamp(Integer value) {
 				return value;
 			}
 
-			@Override
-			public long getStartTime() {
-				return 0;
-			}
-
 		};
 
 		// create policy
-		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5, timeStamp);
+		TimeTriggerPolicy<Integer> policy = new TimeTriggerPolicy<Integer>(5,
+				new TimestampWrapper<Integer>(timeStamp, 0));
 
 		// expected result
 		Long[][] result = { {}, {}, { 4L, 9L, 14L, 19L }, { 24L } };

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
index 1013e6f..d6a9ac0 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/windowing/DeltaExtractExample.java
@@ -49,10 +49,8 @@ public class DeltaExtractExample {
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		DataStream dstream = env
 				.addSource(new CountingSource())
-				.window(Delta.of(new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(0d,
-						0d, "foo"), 1.2))
-				.every(Count.of(2))
-				.reduce(new ConcatStrings());
+				.window(Delta.of(1.2, new EuclideanDistance(new FieldsFromTuple(0, 1)), new Tuple3(
+						0d, 0d, "foo"))).every(Count.of(2)).reduce(new ConcatStrings());
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
index 38ad384..dc1e5b3 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/DataStream.scala
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.scala.streaming
 import org.apache.flink.api.scala._
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 import org.apache.flink.streaming.api.datastream.{ DataStream => JavaStream }
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import scala.reflect.ClassTag
@@ -53,24 +54,6 @@ import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
 
 class DataStream[T](javaStream: JavaStream[T]) {
 
-  /* This code is originally from the Apache Spark project. */
-  /**
-   * Clean a closure to make it ready to serialized and send to tasks
-   * (removes unreferenced variables in $outer's, updates REPL variables)
-   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
-   * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
-   * if not.
-   *
-   * @param f the closure to clean
-   * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
-   * @throws <tt>SparkException<tt> if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
-   *   serializable
-   */
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
   /**
    * Gets the underlying java DataStream object.
    */

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
index 72052b9..e9010c8 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamCrossOperator.scala
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.function.co.CrossWindowFunction
 import org.apache.flink.api.common.functions.CrossFunction
 import org.apache.flink.api.scala.typeutils.CaseClassSerializer
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
-
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class StreamCrossOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
   TemporalOperator[I1, I2, StreamCrossOperator.CrossWindow[I1, I2]](i1, i2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
index 2489a64..a7a471f 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamExecutionEnvironment.scala
@@ -161,9 +161,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
    */
   def addSource[T: ClassTag: TypeInformation](function: SourceFunction[T]): DataStream[T] = {
     Validate.notNull(function, "Function must not be null.")
-    ClosureCleaner.clean(function, true)
+    val cleanFun = StreamExecutionEnvironment.clean(function)
     val typeInfo = implicitly[TypeInformation[T]]
-    new DataStream[T](javaEnv.addSource(function, typeInfo))
+    new DataStream[T](javaEnv.addSource(cleanFun, typeInfo))
   }
   
    /**
@@ -174,8 +174,9 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
   def addSource[T: ClassTag: TypeInformation](function: Collector[T] => Unit): DataStream[T] = {
     Validate.notNull(function, "Function must not be null.")
     val sourceFunction = new SourceFunction[T] {
+      val cleanFun = StreamExecutionEnvironment.clean(function)
       override def invoke(out: Collector[T]) {
-        function(out)
+        cleanFun(out)
       }
     }
     addSource(sourceFunction)
@@ -205,6 +206,11 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
 }
 
 object StreamExecutionEnvironment {
+  
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    ClosureCleaner.clean(f, checkSerializable)
+    f
+  }
 
   /**
    * Creates an execution environment that represents the context in which the program is

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
index f47d79e..4ed5082 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/StreamJoinOperator.scala
@@ -33,6 +33,7 @@ import org.apache.commons.lang.Validate
 import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable
 import org.apache.flink.streaming.util.keys.KeySelectorUtil
 import org.apache.flink.api.java.operators.Keys
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class StreamJoinOperator[I1, I2](i1: JavaStream[I1], i2: JavaStream[I2]) extends
 TemporalOperator[I1, I2, StreamJoinOperator.JoinWindow[I1, I2]](i1, i2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/1492e966/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
index 5346c4c..e33368c 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/streaming/WindowedDataStream.scala
@@ -40,14 +40,10 @@ import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.A
 import org.apache.flink.api.java.typeutils.TupleTypeInfoBase
 import org.apache.flink.streaming.api.function.aggregation.SumFunction
 import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
+import org.apache.flink.api.scala.streaming.StreamExecutionEnvironment.clean
 
 class WindowedDataStream[T](javaStream: JavaWStream[T]) {
 
-  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
-    ClosureCleaner.clean(f, checkSerializable)
-    f
-  }
-
   /**
    * Defines the slide size (trigger frequency) for the windowed data stream.
    * This controls how often the user defined function will be triggered on


Mime
View raw message