flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [7/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming
Date Tue, 21 Jul 2015 10:45:17 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
index 97cebc1..52c07d0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamReduce.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFunction<IN>>
 		implements OneInputStreamOperator<IN, IN> {
@@ -34,15 +36,19 @@ public class StreamReduce<IN> extends AbstractUdfStreamOperator<IN, ReduceFuncti
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
+	public void processElement(StreamRecord<IN> element) throws Exception {
 
 		if (currentValue != null) {
-			// TODO: give operator a way to specify that elements should be copied
-			currentValue = userFunction.reduce(currentValue, element);
+			currentValue = userFunction.reduce(currentValue, element.getValue());
 		} else {
-			currentValue = element;
+			currentValue = element.getValue();
 
 		}
-		output.collect(currentValue);
+		output.collect(element.replace(currentValue));
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
index 5399302..6961a4d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
@@ -18,6 +18,8 @@
 package org.apache.flink.streaming.api.operators;
 
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFunction<IN>>
 		implements OneInputStreamOperator<IN, Object> {
@@ -31,7 +33,12 @@ public class StreamSink<IN> extends AbstractUdfStreamOperator<Object, SinkFuncti
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-		userFunction.invoke(element);
+	public void processElement(StreamRecord<IN> element) throws Exception {
+		userFunction.invoke(element.getValue());
+	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		// ignore it for now, we are a sink, after all
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index 907f93a..0cc46f5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -17,39 +17,274 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.util.Collector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 
 /**
  * {@link StreamOperator} for streaming sources.
  */
-public class StreamSource<OUT> extends AbstractUdfStreamOperator<OUT, SourceFunction<OUT>> implements StreamOperator<OUT> {
+public class StreamSource<T> extends AbstractUdfStreamOperator<T, SourceFunction<T>> implements StreamOperator<T> {
 
 	private static final long serialVersionUID = 1L;
 
-	public StreamSource(SourceFunction<OUT> sourceFunction) {
+	public StreamSource(SourceFunction<T> sourceFunction) {
 		super(sourceFunction);
 
 		this.chainingStrategy = ChainingStrategy.HEAD;
 	}
 
-	public void run(final Object lockingObject, final Collector<OUT> collector) throws Exception {
-		SourceFunction.SourceContext<OUT> ctx = new SourceFunction.SourceContext<OUT>() {
-			@Override
-			public void collect(OUT element) {
-				collector.collect(element);
-			}
+	public void run(final Object lockingObject, final Output<StreamRecord<T>> collector) throws Exception {
 
-			@Override
-			public Object getCheckpointLock() {
-				return lockingObject;
-			}
-		};
+		SourceFunction.SourceContext<T> ctx = null;
+		if (userFunction instanceof EventTimeSourceFunction) {
+			ctx = new ManualWatermarkContext<T>(lockingObject, collector);
+		} else if (executionConfig.getAutoWatermarkInterval() > 0) {
+			ctx = new AutomaticWatermarkContext<T>(lockingObject, collector, executionConfig);
+		} else if (executionConfig.areTimestampsEnabled()) {
+			ctx = new NonTimestampContext<T>(lockingObject, collector);
+		} else {
+			ctx = new NonWatermarkContext<T>(lockingObject, collector);
+		}
 
 		userFunction.run(ctx);
 	}
 
 	public void cancel() {
+
 		userFunction.cancel();
 	}
+
+	/**
+	 * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
+	 * In addition to {@link NonWatermarkContext} this will also not attach timestamps to sources.
+	 * (Technically it will always set the timestamp to 0).
+	 */
+	public static class NonTimestampContext<T> implements SourceFunction.SourceContext<T> {
+
+
+		private final Object lockingObject;
+		private final Output<StreamRecord<T>> output;
+		StreamRecord<T> reuse;
+
+		public NonTimestampContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
+			this.lockingObject = lockingObjectParam;
+			this.output = outputParam;
+			this.reuse = new StreamRecord<T>(null);
+		}
+
+		@Override
+		public void collect(T element) {
+			output.collect(reuse.replace(element, 0));
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lockingObject;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/**
+	 * {@link SourceFunction.SourceContext} to be used for sources that don't emit watermarks.
+	 */
+	public static class NonWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+
+		private final Object lockingObject;
+		private final Output<StreamRecord<T>> output;
+		StreamRecord<T> reuse;
+
+		public NonWatermarkContext(Object lockingObjectParam, Output<StreamRecord<T>> outputParam) {
+			this.lockingObject = lockingObjectParam;
+			this.output = outputParam;
+			this.reuse = new StreamRecord<T>(null);
+		}
+
+		@Override
+		public void collect(T element) {
+			long currentTime = System.currentTimeMillis();
+			output.collect(reuse.replace(element, currentTime));
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lockingObject;
+		}
+
+		@Override
+		public void close() {
+		}
+	}
+
+	/**
+	 * {@link SourceFunction.SourceContext} to be used for sources with automatic timestamps
+	 * and watermark emission.
+	 */
+	public static class AutomaticWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+		private transient ScheduledFuture<?> watermarkTimer = null;
+		private final long watermarkInterval;
+
+		private final Object lockingObject;
+		private final Output<StreamRecord<T>> output;
+		StreamRecord<T> reuse;
+
+		private volatile long lastWatermarkTime;
+
+		public AutomaticWatermarkContext(Object lockingObjectParam,
+				Output<StreamRecord<T>> outputParam,
+				ExecutionConfig executionConfig) {
+			this.lockingObject = lockingObjectParam;
+			this.output = outputParam;
+			this.reuse = new StreamRecord<T>(null);
+
+			watermarkInterval = executionConfig.getAutoWatermarkInterval();
+
+			ScheduledExecutorService service = Executors.newScheduledThreadPool(2);
+
+			watermarkTimer = service.scheduleAtFixedRate(new Runnable() {
+				@Override
+				public void run() {
+					long currentTime = System.currentTimeMillis();
+					// align the watermarks across all machines. this will ensure that we
+					// don't have watermarks that creep along at different intervals because
+					// the machine clocks are out of sync
+					long watermarkTime = currentTime - (currentTime % watermarkInterval);
+					if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+						synchronized (lockingObject) {
+							if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+								output.emitWatermark(new Watermark(watermarkTime));
+								lastWatermarkTime = watermarkTime;
+							}
+						}
+					}
+				}
+			}, 0, watermarkInterval, TimeUnit.MILLISECONDS);
+
+		}
+
+		@Override
+		public void collect(T element) {
+			synchronized (lockingObject) {
+				long currentTime = System.currentTimeMillis();
+				output.collect(reuse.replace(element, currentTime));
+
+				long watermarkTime = currentTime - (currentTime % watermarkInterval);
+				if (watermarkTime - lastWatermarkTime >= watermarkInterval) {
+					output.emitWatermark(new Watermark(watermarkTime));
+					lastWatermarkTime = watermarkTime;
+				}
+			}
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			throw new UnsupportedOperationException("Automatic-Timestamp sources cannot emit" +
+					" elements with a timestamp. See interface ManualTimestampSourceFunction" +
+					" if you want to manually assign timestamps to elements.");
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lockingObject;
+		}
+
+		@Override
+		public void close() {
+			if (watermarkTimer != null && !watermarkTimer.isDone()) {
+				watermarkTimer.cancel(true);
+			}
+		}
+	}
+
+	/**
+	 * {@link SourceFunction.SourceContext} to be used for sources with manual timestamp
+	 * assignment and manual watermark emission.
+	 */
+	public static class ManualWatermarkContext<T> implements SourceFunction.SourceContext<T> {
+
+		private final Object lockingObject;
+		private final Output<StreamRecord<T>> output;
+		StreamRecord<T> reuse;
+
+		public ManualWatermarkContext(Object lockingObject, Output<StreamRecord<T>> output) {
+			this.lockingObject = lockingObject;
+			this.output = output;
+			this.reuse = new StreamRecord<T>(null);
+		}
+
+		@Override
+		public void collect(T element) {
+			throw new UnsupportedOperationException("Manual-Timestamp sources can only emit" +
+					" elements with a timestamp. Please use collectWithTimestamp().");
+		}
+
+		@Override
+		public void collectWithTimestamp(T element, long timestamp) {
+			synchronized (lockingObject) {
+				output.collect(reuse.replace(element, timestamp));
+			}
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			output.emitWatermark(mark);
+		}
+
+		@Override
+		public Object getCheckpointLock() {
+			return lockingObject;
+		}
+
+		@Override
+		public void close() {
+
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
new file mode 100644
index 0000000..0ff223c
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TimestampedCollector.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+
+/**
+ * Wrapper around an {@link Output} for user functions that expect a {@link Collector}.
+ * Before giving the {@link TimestampedCollector} to a user function you must set
+ * the {@link Instant timestamp} that should be attached to emitted elements. Most operators
+ * would set the {@link Instant timestamp} of the incoming {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord} here.
+ *
+ * @param <T> The type of the elments that can be emitted.
+ */
+public class TimestampedCollector<T> implements Collector<T> {
+	private final Output<StreamRecord<T>> output;
+	private long timestamp;
+	private StreamRecord<T> reuse;
+
+	/**
+	 * Creates a new {@link TimestampedCollector} that wraps the given {@link Output}.
+	 */
+	public TimestampedCollector(Output<StreamRecord<T>> output) {
+		this.output = output;
+		this.reuse = new StreamRecord<T>(null);
+	}
+
+	@Override
+	public void collect(T record) {
+		output.collect(reuse.replace(record, timestamp));
+	}
+
+	/**
+	 * Sets the {@link Instant timestamp} that is attached to elements that get emitted using
+	 * {@link #collect}
+	 * @param timestamp The timestamp in milliseconds
+	 */
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	@Override
+	public void close() {
+		output.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
index 2b3090b..afc6d1b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/TwoInputStreamOperator.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.api.operators;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 /**
  * Interface for stream operators with two inputs. Use
  * {@link org.apache.flink.streaming.api.operators.AbstractStreamOperator} as a base class if
@@ -29,7 +32,32 @@ package org.apache.flink.streaming.api.operators;
  */
 public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> {
 
-	public void processElement1(IN1 element) throws Exception;
+	/**
+	 * Processes one element that arrived on the first input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 */
+	public void processElement1(StreamRecord<IN1> element) throws Exception;
+
+	/**
+	 * Processes one element that arrived on the second input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 */
+	public void processElement2(StreamRecord<IN2> element) throws Exception;
+
+	/**
+	 * Processes a {@link Watermark} that arrived on the first input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 *
+	 * @see org.apache.flink.streaming.api.watermark.Watermark
+	 */
+	public void processWatermark1(Watermark mark) throws Exception;
+
+	/**
+	 * Processes a {@link Watermark} that arrived on the second input of this two-input operator.
+	 * This method is guaranteed to not be called concurrently with other methods of the operator.
+	 *
+	 * @see org.apache.flink.streaming.api.watermark.Watermark
+	 */
+	public void processWatermark2(Watermark mark) throws Exception;
 
-	public void processElement2(IN2 element) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
index e3662d6..d2bd107 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMap.java
@@ -17,9 +17,13 @@
 
 package org.apache.flink.streaming.api.operators.co;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class CoStreamFlatMap<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoFlatMapFunction<IN1, IN2, OUT>>
@@ -27,18 +31,54 @@ public class CoStreamFlatMap<IN1, IN2, OUT>
 
 	private static final long serialVersionUID = 1L;
 
+	private transient TimestampedCollector<OUT> collector;
+
+	// We keep track of watermarks from both inputs, the combined input is the minimum
+	// Once the minimum advances we emit a new watermark for downstream operators
+	private long combinedWatermark = Long.MIN_VALUE;
+	private long input1Watermark = Long.MAX_VALUE;
+	private long input2Watermark = Long.MAX_VALUE;
+
 	public CoStreamFlatMap(CoFlatMapFunction<IN1, IN2, OUT> flatMapper) {
 		super(flatMapper);
 	}
 
 	@Override
-	public void processElement1(IN1 element) throws Exception {
-		userFunction.flatMap1(element, output);
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		collector = new TimestampedCollector<OUT>(output);
+	}
+
+	@Override
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		collector.setTimestamp(element.getTimestamp());
+		userFunction.flatMap1(element.getValue(), collector);
 
 	}
 
 	@Override
-	public void processElement2(IN2 element) throws Exception {
-		userFunction.flatMap2(element, output);
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		collector.setTimestamp(element.getTimestamp());
+		userFunction.flatMap2(element.getValue(), collector);
+	}
+
+	@Override
+	public void processWatermark1(Watermark mark) throws Exception {
+		input1Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
+	}
+
+	@Override
+	public void processWatermark2(Watermark mark) throws Exception {
+		input2Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
index 3dc509a..b46a929 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamGroupedReduce.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN2, OUT> {
 
@@ -44,30 +45,33 @@ public class CoStreamGroupedReduce<IN1, IN2, OUT> extends CoStreamReduce<IN1, IN
 	}
 
 	@Override
-	public void processElement1(IN1 element) throws Exception {
+	public void processElement1(StreamRecord<IN1> elementRecord) throws Exception {
+		IN1 element = elementRecord.getValue();
 		Object key = keySelector1.getKey(element);
 		currentValue1 = values1.get(key);
 		if (currentValue1 != null) {
 			reduced1 = userFunction.reduce1(currentValue1, element);
 			values1.put(key, reduced1);
-			output.collect(userFunction.map1(reduced1));
+			output.collect(elementRecord.replace(userFunction.map1(reduced1)));
 		} else {
 			values1.put(key, element);
-			output.collect(userFunction.map1(element));
+			output.collect(elementRecord.replace(userFunction.map1(element)));
 		}
 	}
 
 	@Override
-	public void processElement2(IN2 element) throws Exception {
+	public void processElement2(StreamRecord<IN2> elementRecord) throws Exception {
+		IN2 element = elementRecord.getValue();
+
 		Object key = keySelector2.getKey(element);
 		currentValue2 = values2.get(key);
 		if (currentValue2 != null) {
 			reduced2 = userFunction.reduce2(currentValue2, element);
 			values2.put(key, reduced2);
-			output.collect(userFunction.map2(reduced2));
+			output.collect(elementRecord.replace(userFunction.map2(reduced2)));
 		} else {
 			values2.put(key, element);
-			output.collect(userFunction.map2(element));
+			output.collect(elementRecord.replace(userFunction.map2(element)));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
index a8e57e3..8d7c7c4 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamMap.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators.co;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class CoStreamMap<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
@@ -27,19 +29,43 @@ public class CoStreamMap<IN1, IN2, OUT>
 
 	private static final long serialVersionUID = 1L;
 
+	// We keep track of watermarks from both inputs, the combined input is the minimum
+	// Once the minimum advances we emit a new watermark for downstream operators
+	private long combinedWatermark = Long.MIN_VALUE;
+	private long input1Watermark = Long.MAX_VALUE;
+	private long input2Watermark = Long.MAX_VALUE;
+
 	public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
 		super(mapper);
 	}
 
 	@Override
-	public void processElement1(IN1 element) throws Exception {
-		output.collect(userFunction.map1(element));
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		output.collect(element.replace(userFunction.map1(element.getValue())));
+	}
 
+	@Override
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		output.collect(element.replace(userFunction.map2(element.getValue())));
 	}
 
 	@Override
-	public void processElement2(IN2 element) throws Exception {
-		output.collect(userFunction.map2(element));
+	public void processWatermark1(Watermark mark) throws Exception {
+		input1Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
+	}
 
+	@Override
+	public void processWatermark2(Watermark mark) throws Exception {
+		input2Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
index 7157b1d..8609eab 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamReduce.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.api.operators.co;
 import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 public class CoStreamReduce<IN1, IN2, OUT>
 		extends AbstractUdfStreamOperator<OUT, CoReduceFunction<IN1, IN2, OUT>>
@@ -30,6 +32,12 @@ public class CoStreamReduce<IN1, IN2, OUT>
 	protected IN1 currentValue1 = null;
 	protected IN2 currentValue2 = null;
 
+	// We keep track of watermarks from both inputs, the combined input is the minimum
+	// Once the minimum advances we emit a new watermark for downstream operators
+	private long combinedWatermark = Long.MIN_VALUE;
+	private long input1Watermark = Long.MAX_VALUE;
+	private long input2Watermark = Long.MAX_VALUE;
+
 	public CoStreamReduce(CoReduceFunction<IN1, IN2, OUT> coReducer) {
 		super(coReducer);
 		currentValue1 = null;
@@ -37,23 +45,42 @@ public class CoStreamReduce<IN1, IN2, OUT>
 	}
 
 	@Override
-	public void processElement1(IN1 element) throws Exception {
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
 		if (currentValue1 != null) {
-			currentValue1 = userFunction.reduce1(currentValue1, element);
+			currentValue1 = userFunction.reduce1(currentValue1, element.getValue());
 		} else {
-			currentValue1 = element;
+			currentValue1 = element.getValue();
 		}
-		output.collect(userFunction.map1(currentValue1));
+		output.collect(element.replace(userFunction.map1(currentValue1)));
 	}
 
 	@Override
-	public void processElement2(IN2 element) throws Exception {
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
 		if (currentValue2 != null) {
-			currentValue2 = userFunction.reduce2(currentValue2, element);
+			currentValue2 = userFunction.reduce2(currentValue2, element.getValue());
 		} else {
-			currentValue2 = element;
+			currentValue2 = element.getValue();
+		}
+		output.collect(element.replace(userFunction.map2(currentValue2)));
+	}
+
+	@Override
+	public void processWatermark1(Watermark mark) throws Exception {
+		input1Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
 		}
-		output.collect(userFunction.map2(currentValue2));
 	}
 
+	@Override
+	public void processWatermark2(Watermark mark) throws Exception {
+		input2Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
index e7b069e..40d0a89 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/co/CoStreamWindow.java
@@ -24,8 +24,10 @@ import java.util.List;
 import org.apache.commons.math.util.MathUtils;
 import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
 import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.TimestampedCollector;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.state.CircularFifoList;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
@@ -47,6 +49,12 @@ public class CoStreamWindow<IN1, IN2, OUT>
 	protected long startTime;
 	protected long nextRecordTime;
 
+	// We keep track of watermarks from both inputs, the combined input is the minimum
+	// Once the minimum advances we emit a new watermark for downstream operators
+	private long combinedWatermark = Long.MIN_VALUE;
+	private long input1Watermark = Long.MAX_VALUE;
+	private long input2Watermark = Long.MAX_VALUE;
+
 	public CoStreamWindow(CoWindowFunction<IN1, IN2, OUT> coWindowFunction, long windowSize,
 			long slideInterval, TimestampWrapper<IN1> timeStamp1, TimestampWrapper<IN2> timeStamp2) {
 		super(coWindowFunction);
@@ -62,13 +70,13 @@ public class CoStreamWindow<IN1, IN2, OUT>
 	}
 
 	@Override
-	public void processElement1(IN1 element) throws Exception {
-		window.addToBuffer1(element);
+	public void processElement1(StreamRecord<IN1> element) throws Exception {
+		window.addToBuffer1(element.getValue());
 	}
 
 	@Override
-	public void processElement2(IN2 element) throws Exception {
-		window.addToBuffer2(element);
+	public void processElement2(StreamRecord<IN2> element) throws Exception {
+		window.addToBuffer2(element.getValue());
 	}
 
 	@SuppressWarnings("unchecked")
@@ -86,8 +94,30 @@ public class CoStreamWindow<IN1, IN2, OUT>
 			second.add(element);
 		}
 
+		TimestampedCollector<OUT> timestampedCollector = new TimestampedCollector<OUT>(output);
+		timestampedCollector.setTimestamp(System.currentTimeMillis());
 		if (!window.circularList1.isEmpty() || !window.circularList2.isEmpty()) {
-			userFunction.coWindow(first, second, output);
+			userFunction.coWindow(first, second, timestampedCollector);
+		}
+	}
+
+	@Override
+	public void processWatermark1(Watermark mark) throws Exception {
+		input1Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
+		}
+	}
+
+	@Override
+	public void processWatermark2(Watermark mark) throws Exception {
+		input2Watermark = mark.getTimestamp();
+		long newMin = Math.min(input1Watermark, input2Watermark);
+		if (newMin > combinedWatermark && input1Watermark != Long.MAX_VALUE && input2Watermark != Long.MAX_VALUE) {
+			combinedWatermark = newMin;
+			output.emitWatermark(new Watermark(combinedWatermark));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 0cdafd9..0de16b2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.operators.windowing;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.policy.CentralActiveTrigger;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -51,9 +52,11 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
-			last = element;
-			Object key = keySelector.getKey(element);
+	public void processElement(StreamRecord<IN> element) throws Exception {
+
+//			last = copy(element);
+			last = element.getValue();
+			Object key = keySelector.getKey(element.getValue());
 
 			synchronized (groupedDiscretizers) {
 				StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index 64e8b04..e3cab5c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -26,6 +26,7 @@ import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator represents the grouped discretization step of a window
@@ -67,9 +68,10 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
+	public void processElement(StreamRecord<IN> element) throws Exception {
 
-			Object key = keySelector.getKey(element);
+
+			Object key = keySelector.getKey(element.getValue());
 
 			StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
index c6b2499..c74b96e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedWindowBuffer.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator flattens the results of the window transformations by
@@ -51,9 +52,9 @@ public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
 	}
 
 	@Override
-	public void processElement(WindowEvent<T> event) throws Exception {
-		if (event.getElement() != null) {
-			Object key = keySelector.getKey(event.getElement());
+	public void processElement(StreamRecord<WindowEvent<T>> event) throws Exception {
+		if (event.getValue().getElement() != null) {
+			Object key = keySelector.getKey(event.getValue().getElement());
 			WindowBuffer<T> currentWindow = windowMap.get(key);
 
 			if (currentWindow == null) {
@@ -61,7 +62,7 @@ public class GroupedWindowBuffer<T> extends StreamWindowBuffer<T> {
 				windowMap.put(key, currentWindow);
 			}
 
-			handleWindowEvent(event, currentWindow);
+			handleWindowEvent(event.getValue(), currentWindow);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
index 4ab31cb..df84b62 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamDiscretizer.java
@@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.operators.windowing;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
@@ -26,6 +27,7 @@ import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator represents the discretization step of a window transformation.
@@ -67,7 +69,7 @@ public class StreamDiscretizer<IN>
 	}
 
 	@Override
-	public void processElement(IN element) throws Exception {
+	public void processElement(StreamRecord<IN> element) throws Exception {
 		processRealElement(element);
 	}
 
@@ -80,13 +82,13 @@ public class StreamDiscretizer<IN>
 	 *            a real input element
 	 * @throws Exception
 	 */
-	protected synchronized void processRealElement(IN input) throws Exception {
+	protected synchronized void processRealElement(StreamRecord<IN> input) throws Exception {
 
 		// Setting the input element in order to avoid NullFieldException when triggering on fake element
-		windowEvent.setElement(input);
+		windowEvent.setElement(input.getValue());
 		if (isActiveTrigger) {
 			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
-			Object[] result = trigger.preNotifyTrigger(input);
+			Object[] result = trigger.preNotifyTrigger(input.getValue());
 			for (Object in : result) {
 				triggerOnFakeElement(in);
 			}
@@ -94,14 +96,14 @@ public class StreamDiscretizer<IN>
 
 		boolean isTriggered = false;
 
-		if (triggerPolicy.notifyTrigger(input)) {
+		if (triggerPolicy.notifyTrigger(input.getValue())) {
 			emitWindow();
 			isTriggered = true;
 		}
 
-		evict(input, isTriggered);
+		evict(input.getValue(), isTriggered);
 
-		output.collect(windowEvent.setElement(input));
+		output.collect(input.replace(windowEvent.setElement(input.getValue())));
 		bufferSize++;
 
 	}
@@ -109,7 +111,7 @@ public class StreamDiscretizer<IN>
 	/**
 	 * This method triggers on an arrived fake element The method is
 	 * synchronized to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processRealElement(Object)}
+	 * {@link StreamDiscretizer#processRealElement(org.apache.flink.streaming.runtime.streamrecord.StreamRecord)}
 	 * 
 	 * @param input
 	 *            a fake input element
@@ -130,7 +132,7 @@ public class StreamDiscretizer<IN>
 	 * if not empty
 	 */
 	protected void emitWindow() {
-		output.collect(windowEvent.setTrigger());
+		output.collect(new StreamRecord(windowEvent.setTrigger()));
 	}
 
 	private void activeEvict(Object input) {
@@ -142,7 +144,7 @@ public class StreamDiscretizer<IN>
 		}
 
 		if (numToEvict > 0) {
-			output.collect(windowEvent.setEviction(numToEvict));
+			output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
 			bufferSize -= numToEvict;
 			bufferSize = bufferSize >= 0 ? bufferSize : 0;
 		}
@@ -152,7 +154,7 @@ public class StreamDiscretizer<IN>
 		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, bufferSize);
 
 		if (numToEvict > 0) {
-			output.collect(windowEvent.setEviction(numToEvict));
+			output.collect(new StreamRecord(windowEvent.setEviction(numToEvict)));
 			bufferSize -= numToEvict;
 			bufferSize = bufferSize >= 0 ? bufferSize : 0;
 		}
@@ -220,4 +222,9 @@ public class StreamDiscretizer<IN>
 		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: "
 				+ evictionPolicy.toString() + ")";
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
index 074ff4b..c057f91 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/StreamWindowBuffer.java
@@ -19,9 +19,11 @@ package org.apache.flink.streaming.api.operators.windowing;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.WindowEvent;
 import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator manages the window buffers attached to the discretizers.
@@ -41,8 +43,8 @@ public class StreamWindowBuffer<T>
 	}
 
 	@Override
-	public void processElement(WindowEvent<T> windowEvent) throws Exception {
-		handleWindowEvent(windowEvent);
+	public void processElement(StreamRecord<WindowEvent<T>> windowEvent) throws Exception {
+		handleWindowEvent(windowEvent.getValue());
 	}
 
 	protected void handleWindowEvent(WindowEvent<T> windowEvent, WindowBuffer<T> buffer)
@@ -60,4 +62,8 @@ public class StreamWindowBuffer<T>
 		handleWindowEvent(windowEvent, buffer);
 	}
 
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
index 159b6f8..fa7696a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowFlattener.java
@@ -19,7 +19,9 @@ package org.apache.flink.streaming.api.operators.windowing;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator flattens the results of the window transformations by
@@ -36,9 +38,14 @@ public class WindowFlattener<T> extends AbstractStreamOperator<T>
 	}
 
 	@Override
-	public void processElement(StreamWindow<T> window) throws Exception {
-		for (T element : window) {
-			output.collect(element);
+	public void processElement(StreamRecord<StreamWindow<T>> window) throws Exception {
+		for (T element : window.getValue()) {
+			output.collect(new StreamRecord<T>(element));
 		}
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
index 93a92f4..9ed5e82 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowMerger.java
@@ -22,7 +22,9 @@ import java.util.Map;
 
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator merges together the different partitions of the
@@ -44,7 +46,8 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 
 	@Override
 	@SuppressWarnings("unchecked")
-	public void processElement(StreamWindow<T> nextWindow) throws Exception {
+	public void processElement(StreamRecord<StreamWindow<T>> nextWindowRecord) throws Exception {
+		StreamWindow<T> nextWindow = nextWindowRecord.getValue();
 
 		StreamWindow<T> current = windows.get(nextWindow.windowID);
 
@@ -55,10 +58,16 @@ public class WindowMerger<T> extends AbstractStreamOperator<StreamWindow<T>>
 		}
 
 		if (current.numberOfParts == 1) {
-			output.collect(current);
+			nextWindowRecord.replace(current);
+			output.collect(nextWindowRecord);
 			windows.remove(nextWindow.windowID);
 		} else {
 			windows.put(nextWindow.windowID, current);
 		}
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
index 6b10c16..9f31fa0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/WindowPartitioner.java
@@ -20,7 +20,9 @@ package org.apache.flink.streaming.api.operators.windowing;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * This operator applies either split or key partitioning depending on the
@@ -48,23 +50,30 @@ public class WindowPartitioner<T> extends AbstractStreamOperator<StreamWindow<T>
 	}
 
 	@Override
-	public void processElement(StreamWindow<T> currentWindow) throws Exception {
+	public void processElement(StreamRecord<StreamWindow<T>> currentWindow) throws Exception {
 
 		if (keySelector == null) {
 			if (numberOfSplits <= 1) {
 				output.collect(currentWindow);
 			} else {
-				for (StreamWindow<T> window : StreamWindow.split(currentWindow, numberOfSplits)) {
-					output.collect(window);
+				StreamWindow<T> unpackedWindow = currentWindow.getValue();
+				for (StreamWindow<T> window : StreamWindow.split(unpackedWindow, numberOfSplits)) {
+					currentWindow.replace(window);
+					output.collect(currentWindow);
 				}
 			}
 		} else {
 
 			for (StreamWindow<T> window : StreamWindow
-					.partitionBy(currentWindow, keySelector, true)) {
-				output.collect(window);
+					.partitionBy(currentWindow.getValue(), keySelector, true)) {
+				output.collect(new StreamRecord<StreamWindow<T>>(window));
 			}
 
 		}
 	}
+
+	@Override
+	public void processWatermark(Watermark mark) throws Exception {
+		output.emitWatermark(mark);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
index bfc160f..372cb10 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/state/PartitionedStreamOperatorState.java
@@ -81,7 +81,7 @@ public class PartitionedStreamOperatorState<IN, S, C extends Serializable> exten
 					return defaultState;
 				}
 			} catch (Exception e) {
-				throw new RuntimeException("User-defined key selector threw an exception.");
+				throw new RuntimeException("User-defined key selector threw an exception.", e);
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
new file mode 100644
index 0000000..1d88fe2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.watermark;
+
+/**
+ * A Watermark tells operators that receive it that no elements with a timestamp older or equal
+ * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
+ * sources and propagate through the operators of the topology. Operators must themselves emit
+ * watermarks to downstream operators using
+ * {@link org.apache.flink.streaming.api.operators.Output#emitWatermark(Watermark)}. Operators that
+ * do not internally buffer elements can always forward the watermark that they receive. Operators
+ * that buffer elements, such as window operators, must forward a watermark after emission of
+ * elements that is triggered by the arriving watermark.
+ *
+ * <p>
+ * In some cases a watermark is only a heuristic and operators should be able to deal with
+ * late elements. They can either discard those or update the result and emit updates/retractions
+ * to downstream operations.
+ *
+ */
+public class Watermark {
+
+	private long timestamp;
+
+	/**
+	 * Creates a new watermark with the given timestamp.
+	 */
+	public Watermark(long timestamp) {
+		this.timestamp = timestamp;
+	}
+
+	/**
+	 * Returns the timestamp associated with this {@link Watermark} in milliseconds.
+	 */
+	public long getTimestamp() {
+		return timestamp;
+	}
+
+	@Override
+	public boolean equals(Object o) {
+		if (this == o) {
+			return true;
+		}
+		if (o == null || getClass() != o.getClass()) {
+			return false;
+		}
+
+		Watermark watermark = (Watermark) o;
+
+		return timestamp == watermark.timestamp;
+	}
+
+	@Override
+	public int hashCode() {
+		return (int) (timestamp ^ (timestamp >>> 32));
+	}
+
+	@Override
+	public String toString() {
+		return "Watermark{" +
+				"timestamp=" + timestamp +
+				'}';
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
index 371e20d..33fb29d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/BasicWindowBuffer.java
@@ -21,6 +21,7 @@ import java.util.LinkedList;
 import java.util.NoSuchElementException;
 
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -36,11 +37,11 @@ public class BasicWindowBuffer<T> extends WindowBuffer<T> {
 		this.buffer = new LinkedList<T>();
 	}
 
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		if (emitEmpty || !buffer.isEmpty()) {
 			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.addAll(buffer);
-			collector.collect(currentWindow);
+			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
 		} 
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
index 1f7c83e..195a966 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountGroupedPreReducer.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
@@ -37,7 +38,7 @@ public class JumpingCountGroupedPreReducer<T> extends TumblingGroupedPreReducer<
 	}
 
 	@Override
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		super.emitWindow(collector);
 		skipped = 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
index 355d0ce..17fe408 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingCountPreReducer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windowbuffer;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -39,7 +40,7 @@ public class JumpingCountPreReducer<T> extends TumblingPreReducer<T> {
 	}
 
 	@Override
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		super.emitWindow(collector);
 		skipped = 0;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
index f2386a8..a92fc98 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimeGroupedPreReducer.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T> {
@@ -42,7 +43,7 @@ public class JumpingTimeGroupedPreReducer<T> extends TumblingGroupedPreReducer<T
 	}
 
 	@Override
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		super.emitWindow(collector);
 		windowStartTime += slideSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
index 98c264d..1a47bc8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/JumpingTimePreReducer.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -44,7 +45,7 @@ public class JumpingTimePreReducer<T> extends TumblingPreReducer<T> {
 	}
 
 	@Override
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		super.emitWindow(collector);
 		windowStartTime += slideSize;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
index 3a2decf..e2c46a3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/SlidingPreReducer.java
@@ -22,6 +22,7 @@ import java.util.LinkedList;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -49,12 +50,12 @@ public abstract class SlidingPreReducer<T> extends WindowBuffer<T> implements Pr
 		this.serializer = serializer;
 	}
 
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		StreamWindow<T> currentWindow = createEmptyWindow();
 
 		try {
 			if (addFinalAggregate(currentWindow) || emitEmpty) {
-				collector.collect(currentWindow);
+				collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
 			} 
 			afterEmit();
 		} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
index d2f6234..37d3aae 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingGroupedPreReducer.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -56,14 +57,14 @@ public class TumblingGroupedPreReducer<T> extends WindowBuffer<T> implements Pre
 		this.evict = evict;
 	}
 
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 
 		if (!reducedValues.isEmpty()) {
 			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.addAll(reducedValues.values());
-			collector.collect(currentWindow);
+			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
 		} else if (emitEmpty) {
-			collector.collect(createEmptyWindow());
+			collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
 		}
 		if (evict) {
 			reducedValues.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
index f396e41..3a10be7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/TumblingPreReducer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windowbuffer;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -48,13 +49,13 @@ public class TumblingPreReducer<T> extends WindowBuffer<T> implements PreAggrega
 		this.evict = evict;
 	}
 
-	public void emitWindow(Collector<StreamWindow<T>> collector) {
+	public void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector) {
 		if (reduced != null) {
 			StreamWindow<T> currentWindow = createEmptyWindow();
 			currentWindow.add(reduced);
-			collector.collect(currentWindow);
+			collector.collect(new StreamRecord<StreamWindow<T>>(currentWindow));
 		} else if (emitEmpty) {
-			collector.collect(createEmptyWindow());
+			collector.collect(new StreamRecord<StreamWindow<T>>(createEmptyWindow()));
 		}
 
 		if (evict) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
index 5c5ea52..6e87d0b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/windowbuffer/WindowBuffer.java
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.windowing.windowbuffer;
 import java.io.Serializable;
 
 import org.apache.flink.streaming.api.windowing.StreamWindow;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.Collector;
 
 /**
@@ -39,7 +40,7 @@ public abstract class WindowBuffer<T> implements Serializable, Cloneable {
 
 	public abstract void evict(int n);
 
-	public abstract void emitWindow(Collector<StreamWindow<T>> collector);
+	public abstract void emitWindow(Collector<StreamRecord<StreamWindow<T>>> collector);
 
 	public abstract WindowBuffer<T> clone();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
index 8f8325f..40e84fc 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/BarrierBuffer.java
@@ -28,31 +28,30 @@ import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.streaming.runtime.tasks.StreamingSuperstep;
+import org.apache.flink.streaming.runtime.tasks.CheckpointBarrier;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Class encapsulating the functionality that is necessary to sync inputs on
- * superstep barriers. Once a barrier is received from an input channel, whe
- * should not process further buffers from that channel until we received the
- * barrier from all other channels as well. To avoid back-pressuring the
+ * The barrier buffer is responsible for implementing the blocking behaviour described
+ * here: {@link CheckpointBarrier}.
+ *
+ * <p>
+ * To avoid back-pressuring the
  * readers, we buffer up the new data received from the blocked channels until
  * the blocks are released.
- * 
  */
 public class BarrierBuffer {
 
 	private static final Logger LOG = LoggerFactory.getLogger(BarrierBuffer.class);
 
-	private Queue<SpillingBufferOrEvent> nonprocessed = new LinkedList<SpillingBufferOrEvent>();
-	private Queue<SpillingBufferOrEvent> blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+	private Queue<SpillingBufferOrEvent> nonProcessed = new LinkedList<SpillingBufferOrEvent>();
+	private Queue<SpillingBufferOrEvent> blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
 
 	private Set<Integer> blockedChannels = new HashSet<Integer>();
 	private int totalNumberOfInputChannels;
 
-	private StreamingSuperstep currentSuperstep;
-	private boolean superstepStarted;
+	private CheckpointBarrier currentBarrier;
 
 	private AbstractReader reader;
 
@@ -65,6 +64,8 @@ public class BarrierBuffer {
 
 	private BufferOrEvent endOfStreamEvent = null;
 
+	private long lastCheckpointId = Long.MIN_VALUE;
+
 	public BarrierBuffer(InputGate inputGate, AbstractReader reader) {
 		this.inputGate = inputGate;
 		totalNumberOfInputChannels = inputGate.getNumberOfInputChannels();
@@ -79,32 +80,18 @@ public class BarrierBuffer {
 	}
 
 	/**
-	 * Starts the next superstep in the buffer
-	 * 
-	 * @param superstep
-	 *            The next superstep
-	 */
-	protected void startSuperstep(StreamingSuperstep superstep) {
-		this.currentSuperstep = superstep;
-		this.superstepStarted = true;
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Superstep started with id: " + superstep.getId());
-		}
-	}
-
-	/**
-	 * Get then next non-blocked non-processed BufferOrEvent. Returns null if
-	 * not available.
+	 * Get then next non-blocked non-processed {@link BufferOrEvent}. Returns null if
+	 * none available.
 	 * 
 	 * @throws IOException
 	 */
-	protected BufferOrEvent getNonProcessed() throws IOException {
-		SpillingBufferOrEvent nextNonprocessed;
+	private BufferOrEvent getNonProcessed() throws IOException {
+		SpillingBufferOrEvent nextNonProcessed;
 
-		while ((nextNonprocessed = nonprocessed.poll()) != null) {
-			BufferOrEvent boe = nextNonprocessed.getBufferOrEvent();
+		while ((nextNonProcessed = nonProcessed.poll()) != null) {
+			BufferOrEvent boe = nextNonProcessed.getBufferOrEvent();
 			if (isBlocked(boe.getChannelIndex())) {
-				blockedNonprocessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
+				blockedNonProcessed.add(new SpillingBufferOrEvent(boe, bufferSpiller, spillReader));
 			} else {
 				return boe;
 			}
@@ -114,25 +101,24 @@ public class BarrierBuffer {
 	}
 
 	/**
-	 * Checks whether a given channel index is blocked for this inputgate
+	 * Checks whether the channel with the given index is blocked.
 	 * 
-	 * @param channelIndex
-	 *            The channel index to check
+	 * @param channelIndex The channel index to check
 	 */
-	protected boolean isBlocked(int channelIndex) {
+	private boolean isBlocked(int channelIndex) {
 		return blockedChannels.contains(channelIndex);
 	}
 
 	/**
-	 * Checks whether all channels are blocked meaning that barriers are
+	 * Checks whether all channels are blocked meaning that barriers have been
 	 * received from all channels
 	 */
-	protected boolean isAllBlocked() {
+	private boolean isAllBlocked() {
 		return blockedChannels.size() == totalNumberOfInputChannels;
 	}
 
 	/**
-	 * Returns the next non-blocked BufferOrEvent. This is a blocking operator.
+	 * Returns the next non-blocked {@link BufferOrEvent}. This is a blocking operator.
 	 */
 	public BufferOrEvent getNextNonBlocked() throws IOException, InterruptedException {
 		// If there are non-processed buffers from the previously blocked ones,
@@ -141,7 +127,7 @@ public class BarrierBuffer {
 
 		if (bufferOrEvent != null) {
 			return bufferOrEvent;
-		} else if (blockedNonprocessed.isEmpty() && inputFinished) {
+		} else if (blockedNonProcessed.isEmpty() && inputFinished) {
 			return endOfStreamEvent;
 		} else {
 			// If no non-processed, get new from input
@@ -162,7 +148,7 @@ public class BarrierBuffer {
 					} else {
 						if (isBlocked(bufferOrEvent.getChannelIndex())) {
 							// If channel blocked we just store it
-							blockedNonprocessed.add(new SpillingBufferOrEvent(bufferOrEvent,
+							blockedNonProcessed.add(new SpillingBufferOrEvent(bufferOrEvent,
 									bufferSpiller, spillReader));
 						} else {
 							return bufferOrEvent;
@@ -182,7 +168,7 @@ public class BarrierBuffer {
 	 * @param channelIndex
 	 *            The channel index to block.
 	 */
-	protected void blockChannel(int channelIndex) {
+	private void blockChannel(int channelIndex) {
 		if (!blockedChannels.contains(channelIndex)) {
 			blockedChannels.add(channelIndex);
 			if (LOG.isDebugEnabled()) {
@@ -199,16 +185,14 @@ public class BarrierBuffer {
 
 	/**
 	 * Releases the blocks on all channels.
-	 * 
-	 * @throws IOException
 	 */
-	protected void releaseBlocks() {
-		if (!nonprocessed.isEmpty()) {
+	private void releaseBlocks() {
+		if (!nonProcessed.isEmpty()) {
 			// sanity check
 			throw new RuntimeException("Error in barrier buffer logic");
 		}
-		nonprocessed = blockedNonprocessed;
-		blockedNonprocessed = new LinkedList<SpillingBufferOrEvent>();
+		nonProcessed = blockedNonProcessed;
+		blockedNonProcessed = new LinkedList<SpillingBufferOrEvent>();
 
 		try {
 			spillReader.setSpillFile(bufferSpiller.getSpillFile());
@@ -218,7 +202,7 @@ public class BarrierBuffer {
 		}
 
 		blockedChannels.clear();
-		superstepStarted = false;
+		currentBarrier = null;
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("All barriers received, blocks released");
 		}
@@ -228,28 +212,46 @@ public class BarrierBuffer {
 	 * Method that is executed once the barrier has been received from all
 	 * channels.
 	 */
-	protected void actOnAllBlocked() {
+	private void actOnAllBlocked() {
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("Publishing barrier to the vertex");
 		}
 
-		if (currentSuperstep != null && !inputFinished) {
-			reader.publish(currentSuperstep);
+		if (currentBarrier != null && !inputFinished) {
+			reader.publish(currentBarrier);
+			lastCheckpointId = currentBarrier.getId();
 		}
 
 		releaseBlocks();
 	}
 
 	/**
-	 * Processes a streaming superstep event
+	 * Processes one {@link org.apache.flink.streaming.runtime.tasks.CheckpointBarrier}
 	 * 
-	 * @param bufferOrEvent
-	 *            The BufferOrEvent containing the event
+	 * @param bufferOrEvent The {@link BufferOrEvent} containing the checkpoint barrier
 	 */
-	public void processSuperstep(BufferOrEvent bufferOrEvent) {
-		StreamingSuperstep superstep = (StreamingSuperstep) bufferOrEvent.getEvent();
-		if (!superstepStarted) {
-			startSuperstep(superstep);
+	public void processBarrier(BufferOrEvent bufferOrEvent) {
+		CheckpointBarrier receivedBarrier = (CheckpointBarrier) bufferOrEvent.getEvent();
+
+		if (receivedBarrier.getId() < lastCheckpointId) {
+			// a barrier from an old checkpoint, ignore these
+			return;
+		}
+
+		if (currentBarrier == null) {
+			this.currentBarrier = receivedBarrier;
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Checkpoint barrier received start waiting for checkpoint: {}", receivedBarrier);
+			}
+		} else if (receivedBarrier.getId() > currentBarrier.getId()) {
+			// we have a barrier from a more recent checkpoint, free all locks and start with
+			// this newer checkpoint
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Checkpoint barrier received while waiting on checkpoint {}. Restarting waiting with checkpoint {}: ", currentBarrier, receivedBarrier);
+			}
+			releaseBlocks();
+			currentBarrier = receivedBarrier;
+
 		}
 		blockChannel(bufferOrEvent.getChannelIndex());
 	}
@@ -269,11 +271,11 @@ public class BarrierBuffer {
 	}
 
 	public String toString() {
-		return nonprocessed.toString() + blockedNonprocessed.toString();
+		return nonProcessed.toString() + blockedNonProcessed.toString();
 	}
 
 	public boolean isEmpty() {
-		return nonprocessed.isEmpty() && blockedNonprocessed.isEmpty();
+		return nonProcessed.isEmpty() && blockedNonProcessed.isEmpty();
 	}
 
-}
\ No newline at end of file
+}


Mime
View raw message