flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [2/4] flink git commit: [FLINK-3435] [streaming] Proparly separate IngestionTime and EventTime
Date Wed, 24 Feb 2016 21:01:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index ed5064b..223afa8 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -247,7 +247,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	protected void emitWindow(Context context) throws Exception {
-		timestampedCollector.setTimestamp(context.window.maxTimestamp());
+		timestampedCollector.setAbsoluteTimestamp(context.window.maxTimestamp());
 
 		if (context.windowBuffer.size() > 0) {
 			userFunction.apply(

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 915950a..85cc93c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,6 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.operators.windowing;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -48,8 +49,6 @@ import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -94,8 +93,6 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
-
 	// ------------------------------------------------------------------------
 	// Configuration values and user functions
 	// ------------------------------------------------------------------------
@@ -249,7 +246,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window>
 		}
 
 		if (triggerResult.isFire()) {
-			timestampedCollector.setTimestamp(window.maxTimestamp());
+			timestampedCollector.setAbsoluteTimestamp(window.maxTimestamp());
 
 			MergingState<IN, ACC> windowState = getPartitionedState(window, windowSerializer,
 				windowStateDescriptor);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 896b8e5..af6349b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.DataInputView;
@@ -27,22 +26,24 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 
 import java.io.IOException;
 
+import static java.util.Objects.requireNonNull;
+
 /**
- * Serializer for {@link StreamRecord} and {@link org.apache.flink.streaming.api.watermark.Watermark}. This does not behave like a normal
- * {@link TypeSerializer}, instead, this is only used at the
- * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
- * {@link StreamRecord StreamRecords} and {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks}. This serializer
- * can handle both of them, therefore it returns {@link Object} the result has
- * to be cast to the correct type.
+ * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a normal
+ * {@link TypeSerializer}, instead, this is only used at the stream task/opertator level for
+ * transmitting StreamRecords} and Watermarks.
  *
- * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
+ * @param <T> The type of value in the StreamRecord
  */
 @Internal
 public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> {
 
 	private static final long serialVersionUID = 1L;
-
-	private static final long IS_WATERMARK = Long.MIN_VALUE;
+	
+	private static final int TAG_REC_WITH_TIMESTAMP = 0;
+	private static final int TAG_REC_WITHOUT_TIMESTAMP = 1;
+	private static final int TAG_WATERMARK = 2;
+	
 	
 	private final TypeSerializer<T> typeSerializer;
 
@@ -51,9 +52,16 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 		if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) {
 			throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer);
 		}
-		this.typeSerializer = Preconditions.checkNotNull(serializer);
+		this.typeSerializer = requireNonNull(serializer);
 	}
-	
+
+	public TypeSerializer<T> getContainedTypeSerializer() {
+		return this.typeSerializer;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
 	
 	@Override
 	public boolean isImmutableType() {
@@ -61,14 +69,23 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 	}
 
 	@Override
-	public TypeSerializer<StreamElement> duplicate() {
+	public MultiplexingStreamRecordSerializer<T> duplicate() {
 		TypeSerializer<T> copy = typeSerializer.duplicate();
 		return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public StreamRecord<T> createInstance() {
-		return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
+		return new StreamRecord<T>(typeSerializer.createInstance());
+	}
+
+	@Override
+	public int getLength() {
+		return -1;
 	}
 
 	@Override
@@ -76,14 +93,14 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 		// we can reuse the timestamp since Instant is immutable
 		if (from.isRecord()) {
 			StreamRecord<T> fromRecord = from.asRecord();
-			return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
+			return fromRecord.copy(typeSerializer.copy(fromRecord.getValue()));
 		}
 		else if (from.isWatermark()) {
 			// is immutable
 			return from;
 		}
 		else {
-			throw new RuntimeException("Cannot copy " + from);
+			throw new RuntimeException();
 		}
 	}
 
@@ -94,7 +111,7 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 			StreamRecord<T> reuseRecord = reuse.asRecord();
 
 			T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
-			reuseRecord.replace(valueCopy, fromRecord.getTimestamp());
+			fromRecord.copyTo(valueCopy, reuseRecord);
 			return reuse;
 		}
 		else if (from.isWatermark()) {
@@ -107,19 +124,41 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 	}
 
 	@Override
-	public int getLength() {
-		return -1;
+	public void copy(DataInputView source, DataOutputView target) throws IOException {
+		int tag = source.readByte();
+		target.write(tag);
+
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			// move timestamp
+			target.writeLong(source.readLong());
+			typeSerializer.copy(source, target);
+		}
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			typeSerializer.copy(source, target);
+		}
+		else if (tag == TAG_WATERMARK) {
+			target.writeLong(source.readLong());
+		}
+		else {
+			throw new IOException("Corrupt stream, found tag: " + tag);
+		}
 	}
 
 	@Override
 	public void serialize(StreamElement value, DataOutputView target) throws IOException {
 		if (value.isRecord()) {
 			StreamRecord<T> record = value.asRecord();
-			target.writeLong(record.getTimestamp());
+			
+			if (record.hasTimestamp()) {
+				target.write(TAG_REC_WITH_TIMESTAMP);
+				target.writeLong(record.getTimestamp());
+			} else {
+				target.write(TAG_REC_WITHOUT_TIMESTAMP);
+			}
 			typeSerializer.serialize(record.getValue(), target);
 		}
 		else if (value.isWatermark()) {
-			target.writeLong(IS_WATERMARK);
+			target.write(TAG_WATERMARK);
 			target.writeLong(value.asWatermark().getTimestamp());
 		}
 		else {
@@ -129,44 +168,50 @@ public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<
 	
 	@Override
 	public StreamElement deserialize(DataInputView source) throws IOException {
-		long millis = source.readLong();
-
-		if (millis == IS_WATERMARK) {
+		int tag = source.readByte();
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			long timestamp = source.readLong();
+			return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp);
+		}
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			return new StreamRecord<T>(typeSerializer.deserialize(source));
+		}
+		else if (tag == TAG_WATERMARK) {
 			return new Watermark(source.readLong());
 		}
 		else {
-			T element = typeSerializer.deserialize(source);
-			return new StreamRecord<T>(element, millis);
+			throw new IOException("Corrupt stream, found tag: " + tag);
 		}
 	}
 
 	@Override
 	public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException {
-		long millis = source.readLong();
-
-		if (millis == IS_WATERMARK) {
-			return new Watermark(source.readLong());
+		int tag = source.readByte();
+		if (tag == TAG_REC_WITH_TIMESTAMP) {
+			long timestamp = source.readLong();
+			T value = typeSerializer.deserialize(source);
+			StreamRecord<T> reuseRecord = reuse.asRecord();
+			reuseRecord.replace(value, timestamp);
+			return reuseRecord;
 		}
-		else {
+		else if (tag == TAG_REC_WITHOUT_TIMESTAMP) {
+			T value = typeSerializer.deserialize(source);
 			StreamRecord<T> reuseRecord = reuse.asRecord();
-			T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
-			reuseRecord.replace(element, millis);
-			return reuse;
+			reuseRecord.replace(value);
+			return reuseRecord;
 		}
-	}
-
-	@Override
-	public void copy(DataInputView source, DataOutputView target) throws IOException {
-		long millis = source.readLong();
-		target.writeLong(millis);
-
-		if (millis == IS_WATERMARK) {
-			target.writeLong(source.readLong());
-		} else {
-			typeSerializer.copy(source, target);
+		else if (tag == TAG_WATERMARK) {
+			return new Watermark(source.readLong());
+		}
+		else {
+			throw new IOException("Corrupt stream, found tag: " + tag);
 		}
 	}
 
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof MultiplexingStreamRecordSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index bd99be8..9f75161 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -17,16 +17,15 @@
 
 package org.apache.flink.streaming.runtime.streamrecord;
 
-
 import org.apache.flink.annotation.Internal;
 
 /**
- * One value in a data stream. This stores the value and the associated timestamp.
+ * One value in a data stream. This stores the value and an optional associated timestamp.
  * 
  * @param <T> The type encapsulated with the stream record.
  */
 @Internal
-public class StreamRecord<T> extends StreamElement {
+public final class StreamRecord<T> extends StreamElement {
 	
 	/** The actual value held by this record */
 	private T value;
@@ -34,31 +33,33 @@ public class StreamRecord<T> extends StreamElement {
 	/** The timestamp of the record */
 	private long timestamp;
 
+	/** Flag whether the timestamp is actually set */
+	private boolean hasTimestamp;
+	
 	/**
-	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
-	 * result of {@code new Instant(0)}.
+	 * Creates a new StreamRecord. The record does not have a timestamp.
 	 */
 	public StreamRecord(T value) {
-		this(value, Long.MIN_VALUE + 1);
-		// be careful to set it to MIN_VALUE + 1, because MIN_VALUE is reserved as the
-		// special tag to signify that a transmitted element is a Watermark in StreamRecordSerializer
+		this.value = value;
 	}
 
 	/**
-	 * Creates a new {@link StreamRecord} wrapping the given value. The timestamp is set to the
+	 * Creates a new StreamRecord wrapping the given value. The timestamp is set to the
 	 * given timestamp.
 	 *
 	 * @param value The value to wrap in this {@link StreamRecord}
 	 * @param timestamp The timestamp in milliseconds
 	 */
 	public StreamRecord(T value, long timestamp) {
-		if (timestamp == Long.MIN_VALUE) {
-			throw new IllegalArgumentException("Long.MIN_VALUE timestamp is reserved");
-		}
 		this.value = value;
 		this.timestamp = timestamp;
+		this.hasTimestamp = true;
 	}
 
+	// ------------------------------------------------------------------------
+	//  Accessors
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Returns the value wrapped in this stream value.
 	 */
@@ -70,9 +71,28 @@ public class StreamRecord<T> extends StreamElement {
 	 * Returns the timestamp associated with this stream value in milliseconds.
 	 */
 	public long getTimestamp() {
-		return timestamp;
+		if (hasTimestamp) {
+			return timestamp;
+		} else {
+			return Long.MIN_VALUE;
+//			throw new IllegalStateException(
+//					"Record has no timestamp. Is the time characteristic set to 'ProcessingTime', or " +
+//							"did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?");
+		}
 	}
 
+	/** Checks whether this record has a timestamp.
+	 * 
+ 	 * @return True if the record has a timestamp, false if not.
+	 */
+	public boolean hasTimestamp() {
+		return hasTimestamp;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Updating
+	// ------------------------------------------------------------------------
+	
 	/**
 	 * Replace the currently stored value by the given new value. This returns a StreamElement
 	 * with the generic type parameter that matches the new value while keeping the old
@@ -92,43 +112,81 @@ public class StreamRecord<T> extends StreamElement {
 	 * timestamp with the new timestamp. This returns a StreamElement with the generic type
 	 * parameter that matches the new value.
 	 *
-	 * @param value The new value to wrap in this {@link StreamRecord}
+	 * @param value The new value to wrap in this StreamRecord
 	 * @param timestamp The new timestamp in milliseconds
 	 * @return Returns the StreamElement with replaced value
 	 */
 	@SuppressWarnings("unchecked")
 	public <X> StreamRecord<X> replace(X value, long timestamp) {
-		if (timestamp == Long.MIN_VALUE) {
-			throw new IllegalArgumentException("Long.MIN_VALUE timestamp is reserved");
-		}
 		this.timestamp = timestamp;
 		this.value = (T) value;
+		this.hasTimestamp = true;
+		
 		return (StreamRecord<X>) this;
 	}
+	
+	public void setTimestamp(long timestamp) {
+		this.timestamp = timestamp;
+		this.hasTimestamp = true;
+	}
 
+	public void eraseTimestamp() {
+		this.hasTimestamp = false;
+	}
+	
+	// ------------------------------------------------------------------------
+	//  Copying
+	// ------------------------------------------------------------------------
+
+	/**
+	 * Creates a copy of this stream record. Uses the copied value as the value for the new
+	 * record, i.e., only copies timestamp fields.
+	 */
+	public StreamRecord<T> copy(T valueCopy) {
+		StreamRecord<T> copy = new StreamRecord<>(valueCopy);
+		copy.timestamp = this.timestamp;
+		copy.hasTimestamp = this.hasTimestamp;
+		return copy;
+	}
+
+	/**
+	 * Copies this record into the new stream record. Uses the copied value as the value for the new
+	 * record, i.e., only copies timestamp fields.
+	 */
+	public void copyTo(T valueCopy, StreamRecord<T> target) {
+		target.value = valueCopy;
+		target.timestamp = this.timestamp;
+		target.hasTimestamp = this.hasTimestamp;
+	}
+
+	// ------------------------------------------------------------------------
+	//  Utilities
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public boolean equals(Object o) {
 		if (this == o) {
 			return true;
 		}
-		if (o == null || getClass() != o.getClass()) {
+		else if (o != null && getClass() == o.getClass()) {
+			StreamRecord<?> that = (StreamRecord<?>) o;
+			return this.hasTimestamp == that.hasTimestamp &&
+					this.timestamp == that.timestamp &&
+					(this.value == null ? that.value == null : this.value.equals(that.value));
+		}
+		else {
 			return false;
 		}
-
-		StreamRecord<?> that = (StreamRecord<?>) o;
-
-		return value.equals(that.value) && timestamp == that.timestamp;
 	}
 
 	@Override
 	public int hashCode() {
 		int result = value != null ? value.hashCode() : 0;
-		result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
-		return result;
+		return 31 * result + (hasTimestamp ? (int) (timestamp ^ (timestamp >>> 32)) : 0);
 	}
 
 	@Override
 	public String toString() {
-		return "Record{" + value + "; " + timestamp + '}';
+		return "Record @ " + (hasTimestamp ? timestamp : "(undef)") + " : " + value;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
index 0fe9c35..d259e05 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializer.java
@@ -93,12 +93,12 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	
 	@Override
 	public StreamRecord<T> copy(StreamRecord<T> from) {
-		return new StreamRecord<T>(typeSerializer.copy(from.getValue()), from.getTimestamp());
+		return from.copy(typeSerializer.copy(from.getValue()));
 	}
 
 	@Override
 	public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) {
-		reuse.replace(typeSerializer.copy(from.getValue(), reuse.getValue()), 0);
+		from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse);
 		return reuse;
 	}
 
@@ -109,14 +109,13 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 	
 	@Override
 	public StreamRecord<T> deserialize(DataInputView source) throws IOException {
-		T element = typeSerializer.deserialize(source);
-		return new StreamRecord<T>(element, 0);
+		return new StreamRecord<T>(typeSerializer.deserialize(source));
 	}
 
 	@Override
 	public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException {
 		T element = typeSerializer.deserialize(reuse.getValue(), source);
-		reuse.replace(element, 0);
+		reuse.replace(element);
 		return reuse;
 	}
 
@@ -125,6 +124,8 @@ public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord
 		typeSerializer.copy(source, target);
 	}
 
+	// ------------------------------------------------------------------------
+	
 	@Override
 	public boolean equals(Object obj) {
 		if (obj instanceof StreamRecordSerializer) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index ceffc1c..a1ed828 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -46,7 +46,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 					getCheckpointBarrierListener(), 
 					configuration.getCheckpointMode(),
 					getEnvironment().getIOManager(),
-					getExecutionConfig().areTimestampsEnabled());
+					isSerializingTimestamps());
 
 			// make sure that stream tasks report their I/O statistics
 			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 4ce2f64..f3d3482 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -72,7 +72,7 @@ public class OperatorChain<OUT> {
 		
 		final ClassLoader userCodeClassloader = containingTask.getUserCodeClassLoader();
 		final StreamConfig configuration = containingTask.getConfiguration();
-		final boolean enableTimestamps = containingTask.getExecutionConfig().areTimestampsEnabled();
+		final boolean enableTimestamps = containingTask.isSerializingTimestamps();
 
 		// we read the chained configs, and the order of record writer registrations by output name
 		Map<Integer, StreamConfig> chainedConfigs = configuration.getTransitiveChainedTaskConfigs(userCodeClassloader);
@@ -346,8 +346,7 @@ public class OperatorChain<OUT> {
 		@Override
 		public void collect(StreamRecord<T> record) {
 			try {
-				StreamRecord<T> copy = new StreamRecord<>(serializer.copy(record.getValue()), record.getTimestamp());
-
+				StreamRecord<T> copy = record.copy(serializer.copy(record.getValue()));
 				operator.setKeyContextElement1(copy);
 				operator.processElement(copy);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 44ff957..af9278f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -20,10 +20,7 @@ package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Task for executing streaming sources.
@@ -56,57 +53,11 @@ public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends S
 
 	@Override
 	protected void run() throws Exception {
-		final Object checkpointLock = getCheckpointLock();
-		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<>(getHeadOutput(), checkpointLock);
-		headOperator.run(checkpointLock, output);
+		headOperator.run(getCheckpointLock(), getHeadOutput());
 	}
 	
 	@Override
 	protected void cancelTask() throws Exception {
 		headOperator.cancel();
 	}
-
-	// ------------------------------------------------------------------------
-	
-	/**
-	 * Special output for sources that ensures that sources synchronize on  the lock object before
-	 * emitting elements.
-	 *
-	 * <p>
-	 * This is required to ensure that no concurrent method calls on operators later in the chain
-	 * can occur. When operators register a timer the timer callback is synchronized
-	 * on the same lock object.
-	 *
-	 * @param <T> The type of elements emitted by the source.
-	 */
-	private class SourceOutput<T> implements Output<T> {
-		
-		private final Output<T> output;
-		private final Object lockObject;
-
-		public SourceOutput(Output<T> output, Object lockObject) {
-			this.output = output;
-			this.lockObject = lockObject;
-		}
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			synchronized (lockObject) {
-				output.emitWatermark(mark);
-			}
-		}
-
-		@Override
-		public void collect(T record) {
-			synchronized (lockObject) {
-				checkTimerException();
-				output.collect(record);
-			}
-		}
-
-		@Override
-		public void close() {
-			output.close();
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index 6c57026..59edb8f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -65,7 +65,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 			RecordWriterOutput<OUT>[] outputs = (RecordWriterOutput<OUT>[]) getStreamOutputs();
 
 			// If timestamps are enabled we make sure to remove cyclic watermark dependencies
-			if (getExecutionConfig().areTimestampsEnabled()) {
+			if (isSerializingTimestamps()) {
 				for (RecordWriterOutput<OUT> output : outputs) {
 					output.emitWatermark(new Watermark(Long.MAX_VALUE));
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 500a440..7cd4cf3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -43,6 +43,7 @@ import org.apache.flink.runtime.state.AsynchronousStateHandle;
 import org.apache.flink.runtime.state.KvStateSnapshot;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -373,6 +374,11 @@ public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
 		}
 	}
 
+	protected boolean isSerializingTimestamps() {
+		TimeCharacteristic tc = configuration.getTimeCharacteristic();
+		return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime;
+	}
+	
 	// ------------------------------------------------------------------------
 	//  Access to properties and utilities
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 132f965..ebc7789 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -71,7 +71,7 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 				getCheckpointBarrierListener(),
 				configuration.getCheckpointMode(),
 				getEnvironment().getIOManager(),
-				getExecutionConfig().areTimestampsEnabled());
+				isSerializingTimestamps());
 
 		// make sure that stream tasks report their I/O statistics
 		AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
new file mode 100644
index 0000000..4df09a3
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamITCase.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+import org.apache.flink.streaming.util.TestListResultSink;
+import org.apache.flink.util.Collector;
+
+import org.junit.Test;
+
+@SuppressWarnings("serial")
+public class CoStreamITCase extends StreamingMultipleProgramsTestBase {
+
+	@Test
+	public void test() throws Exception {
+
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		TestListResultSink<String> resultSink = new TestListResultSink<String>();
+
+		DataStream<Integer> src = env.fromElements(1, 3, 5);
+
+		DataStream<Integer> filter1 = src
+				.filter(new FilterFunction<Integer>() {
+					@Override
+					public boolean filter(Integer value) throws Exception {
+						return true;
+					}
+				})
+				
+				.keyBy(new KeySelector<Integer, Integer>() {
+					@Override
+					public Integer getKey(Integer value) throws Exception {
+						return value;
+					}
+				});
+
+		DataStream<Tuple2<Integer, Integer>> filter2 = src
+				.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
+
+					@Override
+					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
+						return new Tuple2<Integer, Integer>(value, value + 1);
+					}
+				})
+				.rebalance()
+				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
+
+					@Override
+					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
+						return true;
+					}
+				})
+				.disableChaining()
+				.keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+
+					@Override
+					public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
+						return value.f0;
+					}
+				});
+
+		DataStream<String> connected = filter1.connect(filter2)
+				.flatMap(new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>() {
+
+					@Override
+					public void flatMap1(Integer value, Collector<String> out) throws Exception {
+						out.collect(value.toString());
+					}
+		
+					@Override
+					public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
+						out.collect(value.toString());
+					}
+				});
+
+		connected.addSink(resultSink);
+
+		
+		env.execute();
+
+		List<String> expected = Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5");
+		List<String> result = resultSink.getResult();
+		Collections.sort(result);
+		assertEquals(expected, result);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
deleted file mode 100644
index 0f9cbe9..0000000
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoStreamTest extends StreamingMultipleProgramsTestBase {
-
-	private static ArrayList<String> expected = new ArrayList<String>();
-
-	@Test
-	public void test() {
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
-		DataStream<Integer> src = env.fromElements(1, 3, 5);
-
-		DataStream<Integer> filter1 = src.filter(new FilterFunction<Integer>() {
-	
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public boolean filter(Integer value) throws Exception {
-				return true;
-			}
-		}).keyBy(new KeySelector<Integer, Integer>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) throws Exception {
-				return value;
-			}
-		});
-
-		DataStream<Tuple2<Integer, Integer>> filter2 = src
-				.map(new MapFunction<Integer, Tuple2<Integer, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Integer, Integer> map(Integer value) throws Exception {
-						return new Tuple2<Integer, Integer>(value, value + 1);
-					}
-				})
-				.rebalance()
-				.filter(new FilterFunction<Tuple2<Integer, Integer>>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
-						return true;
-					}
-				}).disableChaining().keyBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
-						return value.f0;
-					}
-				});
-
-		DataStream<String> connected = filter1.connect(filter2).flatMap(new CoFlatMapFunction<Integer, Tuple2<Integer, Integer>, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public void flatMap1(Integer value, Collector<String> out) throws Exception {
-				out.collect(value.toString());
-			}
-
-			@Override
-			public void flatMap2(Tuple2<Integer, Integer> value, Collector<String> out) throws Exception {
-				out.collect(value.toString());
-			}
-		});
-
-		connected.addSink(resultSink);
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			e.printStackTrace();
-		}
-
-		expected = new ArrayList<String>();
-		expected.addAll(Arrays.asList("(1,2)", "(3,4)", "(5,6)", "1", "3", "5"));
-
-		List<String> result = resultSink.getResult();
-		Collections.sort(result);
-
-		assertEquals(expected, result);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
index 4f1eeb9..e3d733e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/AscendingTimestampExtractorTest.java
@@ -63,6 +63,17 @@ public class AscendingTimestampExtractorTest {
 		runInvalidTest(extractor);
 	}
 	
+	@Test
+	public void testInitialAndFinalWatermark() {
+		AscendingTimestampExtractor<Long> extractor = new LongExtractor();
+		assertEquals(Long.MIN_VALUE, extractor.getCurrentWatermark().getTimestamp());
+
+		extractor.extractTimestamp(Long.MIN_VALUE, -1L);
+		
+		extractor.extractTimestamp(Long.MAX_VALUE, -1L);
+		assertEquals(Long.MAX_VALUE - 1, extractor.getCurrentWatermark().getTimestamp());
+	}
+	
 	// ------------------------------------------------------------------------
 	
 	private void runValidTests(AscendingTimestampExtractor<Long> extractor) {
@@ -93,7 +104,7 @@ public class AscendingTimestampExtractorTest {
 		private static final long serialVersionUID = 1L;
 		
 		@Override
-		public long extractAscendingTimestamp(Long element, long currentTimestamp) {
+		public long extractAscendingTimestamp(Long element) {
 			return element;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
new file mode 100644
index 0000000..95366b7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/IngestionTimeExtractorTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.functions;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class IngestionTimeExtractorTest {
+	
+	@Test
+	public void testMonotonousTimestamps() {
+		AssignerWithPeriodicWatermarks<String> assigner = new IngestionTimeExtractor<>();
+		
+		long maxRecordSoFar = 0L;
+		long maxWatermarkSoFar = 0L;
+		
+		for (int i = 0; i < 1343; i++) {
+			if (i % 7 == 1) {
+				Watermark mark = assigner.getCurrentWatermark();
+				assertNotNull(mark);
+				
+				// increasing watermarks
+				assertTrue(mark.getTimestamp() >= maxWatermarkSoFar);
+				maxWatermarkSoFar = mark.getTimestamp();
+				
+				// tight watermarks
+				assertTrue(mark.getTimestamp() >= maxRecordSoFar - 1);
+			} else {
+				long next = assigner.extractTimestamp("a", Long.MIN_VALUE);
+				
+				// increasing timestamps
+				assertTrue(next >= maxRecordSoFar);
+				
+				// timestamps are never below or at the watermark
+				assertTrue(next > maxWatermarkSoFar);
+				
+				maxRecordSoFar = next;
+			}
+			
+			if (i % 9 == 0) {
+				try {
+					Thread.sleep(1);
+				} catch (InterruptedException ignored) {}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
index 2d9921a..6b36419 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/source/FileMonitoringFunctionTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *	 http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -19,10 +19,8 @@
 package org.apache.flink.streaming.api.functions.source;
 
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.junit.Test;
 
 /**
@@ -33,31 +31,37 @@ public class FileMonitoringFunctionTest {
 	@Test
 	public void testForEmptyLocation() throws Exception {
 		final FileMonitoringFunction fileMonitoringFunction
-			= new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
-
-        new Thread() {
-            @Override
-            public void run() {
-                try {
-                    Thread.sleep(1000L);
-                } catch (InterruptedException e) {
-                    e.printStackTrace();
-                }
-                fileMonitoringFunction.cancel();
-            }
-        }.start();
+				= new FileMonitoringFunction("?non-existing-path", 1L, FileMonitoringFunction.WatchType.ONLY_NEW_FILES);
+
+		new Thread() {
+			@Override
+			public void run() {
+				try {
+					Thread.sleep(1000L);
+				} catch (InterruptedException e) {
+					e.printStackTrace();
+				}
+				fileMonitoringFunction.cancel();
+			}
+		}.start();
 
 		fileMonitoringFunction.run(
-            new StreamSource.NonWatermarkContext<Tuple3<String, Long, Long>>(
-                new Object(),
-                new Output<StreamRecord<Tuple3<String, Long, Long>>>() {
-                    @Override
-                    public void emitWatermark(Watermark mark) { }
-                    @Override
-                    public void collect(StreamRecord<Tuple3<String, Long, Long>> record) { }
-                    @Override
-                    public void close() { }
-                })
-        );
+				new SourceFunction.SourceContext<Tuple3<String, Long, Long>>() {
+
+					@Override
+					public void collect(Tuple3<String, Long, Long> element) {}
+
+					@Override
+					public void collectWithTimestamp(Tuple3<String, Long, Long> element, long timestamp) {}
+
+					@Override
+					public void emitWatermark(Watermark mark) {}
+
+					@Override
+					public Object getCheckpointLock() { return null; }
+
+					@Override
+					public void close() {}
+				});
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
index b5f1e20..8a814ff 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java
@@ -76,7 +76,7 @@ public class StreamingJobGraphGeneratorTest {
 		}
 		config.setParallelism(dop);
 		
-		JobGraph jobGraph = compiler.createJobGraph("test");
+		JobGraph jobGraph = compiler.createJobGraph();
 		
 		ExecutionConfig executionConfig = InstantiationUtil.readObjectFromConfig(
 				jobGraph.getJobConfiguration(),

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
index 4f424da..4d5c881 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPeriodicWatermarksOperatorTest.java
@@ -113,6 +113,35 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		assertEquals(Long.MAX_VALUE, ((Watermark) testHarness.getOutput().poll()).getTimestamp());
 	}
 
+	@Test
+	public void testNegativeTimestamps() throws Exception {
+
+		final AssignerWithPeriodicWatermarks<Long> assigner = new NeverWatermarkExtractor();
+
+		final TimestampsAndPeriodicWatermarksOperator<Long> operator =
+				new TimestampsAndPeriodicWatermarksOperator<Long>(assigner);
+
+		final ExecutionConfig config = new ExecutionConfig();
+		config.setAutoWatermarkInterval(50);
+
+		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+				new OneInputStreamOperatorTestHarness<Long, Long>(operator, config);
+
+		testHarness.open();
+
+		long[] values = { Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE };
+		
+		for (long value : values) {
+			testHarness.processElement(new StreamRecord<>(value));
+		}
+
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+		
+		for (long value: values) {
+			assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
+		}
+	}
+
 	// ------------------------------------------------------------------------
 	
 	private Tuple2<Long, Long> validateElement(Object element, long nextElementValue, long currentWatermark) {
@@ -138,7 +167,7 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 	private static class LongExtractor implements AssignerWithPeriodicWatermarks<Long> {
 		private static final long serialVersionUID = 1L;
 
-		private long currentTimestamp = -1L;
+		private long currentTimestamp = Long.MIN_VALUE;
 
 		@Override
 		public long extractTimestamp(Long element, long previousElementTimestamp) {
@@ -147,8 +176,22 @@ public class TimestampsAndPeriodicWatermarksOperatorTest {
 		}
 
 		@Override
-		public long getCurrentWatermark() {
-			return currentTimestamp - 1;
+		public Watermark getCurrentWatermark() {
+			return new Watermark(currentTimestamp == Long.MIN_VALUE ? Long.MIN_VALUE : currentTimestamp - 1);
+		}
+	}
+
+	private static class NeverWatermarkExtractor implements AssignerWithPeriodicWatermarks<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			return element;
+		}
+
+		@Override
+		public Watermark getCurrentWatermark() {
+			return null;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
index 5a5fce6..07199ac 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/TimestampsAndPunctuatedWatermarksOperatorTest.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -76,6 +76,32 @@ public class TimestampsAndPunctuatedWatermarksOperatorTest {
 		assertEquals(10L, ((StreamRecord<Tuple2<Long, Boolean>>) output.poll()).getTimestamp());
 		assertEquals(Long.MAX_VALUE, ((Watermark) output.poll()).getTimestamp());
 	}
+
+	@Test
+	public void testZeroOnNegativeTimestamps() throws Exception {
+
+		final AssignerWithPunctuatedWatermarks<Long> assigner = new NeverWatermarkExtractor();
+
+		final TimestampsAndPunctuatedWatermarksOperator<Long> operator =
+				new TimestampsAndPunctuatedWatermarksOperator<Long>(assigner);
+
+		OneInputStreamOperatorTestHarness<Long, Long> testHarness =
+				new OneInputStreamOperatorTestHarness<Long, Long>(operator);
+
+		testHarness.open();
+
+		long[] values = { Long.MIN_VALUE, -1L, 0L, 1L, 2L, 3L, Long.MAX_VALUE };
+
+		for (long value : values) {
+			testHarness.processElement(new StreamRecord<>(value));
+		}
+
+		ConcurrentLinkedQueue<Object> output = testHarness.getOutput();
+
+		for (long value: values) {
+			assertEquals(value, ((StreamRecord<?>) output.poll()).getTimestamp());
+		}
+	}
 	
 	// ------------------------------------------------------------------------
 
@@ -88,8 +114,22 @@ public class TimestampsAndPunctuatedWatermarksOperatorTest {
 		}
 
 		@Override
-		public long checkAndGetNextWatermark(Tuple2<Long, Boolean> lastElement, long extractedTimestamp) {
-			return lastElement.f1 ? extractedTimestamp : -1L;
+		public Watermark checkAndGetNextWatermark(Tuple2<Long, Boolean> lastElement, long extractedTimestamp) {
+			return lastElement.f1 ? new Watermark(extractedTimestamp) : null;
+		}
+	}
+
+	private static class NeverWatermarkExtractor implements AssignerWithPunctuatedWatermarks<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public long extractTimestamp(Long element, long previousElementTimestamp) {
+			return element;
+		}
+
+		@Override
+		public Watermark checkAndGetNextWatermark(Long lastElement, long extractedTimestamp) {
+			return null;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
index 86dbb94..198649d 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/CoGroupJoinITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -337,8 +338,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long checkAndGetNextWatermark(Tuple2<String, Integer> element, long extractedTimestamp) {
-			return extractedTimestamp - 1;
+		public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> element, long extractedTimestamp) {
+			return new Watermark(extractedTimestamp - 1);
 		}
 	}
 
@@ -350,9 +351,8 @@ public class CoGroupJoinITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement,
-				long extractedTimestamp) {
-			return lastElement.f2 - 1;
+		public Watermark checkAndGetNextWatermark(Tuple3<String, String, Integer> lastElement, long extractedTimestamp) {
+			return new Watermark(lastElement.f2 - 1);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
index 592b98a..b05af54 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowFoldITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
@@ -185,9 +186,8 @@ public class WindowFoldITCase extends StreamingMultipleProgramsTestBase {
 		}
 
 		@Override
-		public long checkAndGetNextWatermark(Tuple2<String, Integer> lastElement,
-				long extractedTimestamp) {
-			return lastElement.f1 - 1;
+		public Watermark checkAndGetNextWatermark(Tuple2<String, Integer> lastElement, long extractedTimestamp) {
+			return new Watermark(lastElement.f1 - 1);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
new file mode 100644
index 0000000..1f0bf5a
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializerTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class MultiplexingStreamRecordSerializerTest {
+	
+	@Test
+	public void testDeepDuplication() {
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Long> serializer1 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+		
+		@SuppressWarnings("unchecked")
+		TypeSerializer<Long> serializer2 = (TypeSerializer<Long>) mock(TypeSerializer.class);
+		
+		when(serializer1.duplicate()).thenReturn(serializer2);
+		
+		MultiplexingStreamRecordSerializer<Long> streamRecSer = 
+				new MultiplexingStreamRecordSerializer<Long>(serializer1);
+		
+		assertEquals(serializer1, streamRecSer.getContainedTypeSerializer());
+
+		MultiplexingStreamRecordSerializer<Long> copy = streamRecSer.duplicate();
+		assertNotEquals(copy, streamRecSer);
+		assertNotEquals(copy.getContainedTypeSerializer(), streamRecSer.getContainedTypeSerializer());
+	}
+
+	@Test
+	public void testBasicProperties() {
+		MultiplexingStreamRecordSerializer<Long> streamRecSer = 
+				new MultiplexingStreamRecordSerializer<Long>(LongSerializer.INSTANCE);
+		
+		assertFalse(streamRecSer.isImmutableType());
+		assertEquals(Long.class, streamRecSer.createInstance().getValue().getClass());
+		assertEquals(-1L, streamRecSer.getLength());
+	}
+	
+	@Test
+	public void testSerialization() throws Exception {
+		final MultiplexingStreamRecordSerializer<String> serializer = 
+				new MultiplexingStreamRecordSerializer<String>(StringSerializer.INSTANCE);
+
+		StreamRecord<String> withoutTimestamp = new StreamRecord<>("test 1 2 分享基督耶穌的愛給們,開拓雙贏!");
+		assertEquals(withoutTimestamp, serializeAndDeserialize(withoutTimestamp, serializer));
+
+		StreamRecord<String> withTimestamp = new StreamRecord<>("one more test 拓 們 分", 77L);
+		assertEquals(withTimestamp, serializeAndDeserialize(withTimestamp, serializer));
+
+		StreamRecord<String> negativeTimestamp = new StreamRecord<>("他", Long.MIN_VALUE);
+		assertEquals(negativeTimestamp, serializeAndDeserialize(negativeTimestamp, serializer));
+
+		Watermark positiveWatermark = new Watermark(13);
+		assertEquals(positiveWatermark, serializeAndDeserialize(positiveWatermark, serializer));
+
+		Watermark negativeWatermark = new Watermark(-4647654567676555876L);
+		assertEquals(negativeWatermark, serializeAndDeserialize(negativeWatermark, serializer));
+	}
+	
+	@SuppressWarnings("unchecked")
+	private static <T, X extends StreamElement> X serializeAndDeserialize(
+			X record,
+			MultiplexingStreamRecordSerializer<T> serializer) throws IOException {
+		
+		DataOutputSerializer output = new DataOutputSerializer(32);
+		serializer.serialize(record, output);
+		
+		// additional binary copy step
+		DataInputDeserializer copyInput = new DataInputDeserializer(output.getByteArray(), 0, output.length());
+		DataOutputSerializer copyOutput = new DataOutputSerializer(32);
+		serializer.copy(copyInput, copyOutput);
+		
+		DataInputDeserializer input = new DataInputDeserializer(copyOutput.getByteArray(), 0, copyOutput.length());
+		return (X) serializer.deserialize(input);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
index d48f7f4..bdeb552 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordSerializerTest.java
@@ -20,6 +20,8 @@ package org.apache.flink.streaming.runtime.streamrecord;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.runtime.util.DataInputDeserializer;
+import org.apache.flink.runtime.util.DataOutputSerializer;
 import org.junit.Test;
 
 import static org.junit.Assert.*;
@@ -65,4 +67,20 @@ public class StreamRecordSerializerTest {
 			fail(e.getMessage());
 		}
 	}
+	
+	@Test
+	public void testDeserializedValuesHaveNoTimestamps() throws Exception {
+		final StreamRecord<Long> original = new StreamRecord<>(42L);
+		
+		StreamRecordSerializer<Long> streamRecSer = new StreamRecordSerializer<Long>(LongSerializer.INSTANCE);
+
+		DataOutputSerializer buffer = new DataOutputSerializer(16);
+		streamRecSer.serialize(original, buffer);
+		
+		DataInputDeserializer input = new DataInputDeserializer(buffer.getByteArray(), 0, buffer.length());
+		StreamRecord<Long> result = streamRecSer.deserialize(input);
+		
+		assertFalse(result.hasTimestamp());
+		assertEquals(original, result);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
new file mode 100644
index 0000000..5d34b74
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecordTest.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class StreamRecordTest {
+	
+	@Test
+	public void testWithNoTimestamp() {
+		StreamRecord<String> record = new StreamRecord<>("test");
+		
+		assertTrue(record.isRecord());
+		assertFalse(record.isWatermark());
+		
+		assertFalse(record.hasTimestamp());
+		assertEquals("test", record.getValue());
+		
+//		try {
+//			record.getTimestamp();
+//			fail("should throw an exception");
+//		} catch (IllegalStateException e) {
+//			assertTrue(e.getMessage().contains("timestamp"));
+//		}
+		// for now, the "no timestamp case" returns Long.MIN_VALUE
+		assertEquals(Long.MIN_VALUE, record.getTimestamp());
+		
+		assertNotNull(record.toString());
+		assertTrue(record.hashCode() == new StreamRecord<>("test").hashCode());
+		assertTrue(record.equals(new StreamRecord<>("test")));
+		
+		assertEquals(record, record.asRecord());
+		
+		try {
+			record.asWatermark();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testWithTimestamp() {
+		StreamRecord<String> record = new StreamRecord<>("foo", 42);
+
+		assertTrue(record.isRecord());
+		assertFalse(record.isWatermark());
+
+		assertTrue(record.hasTimestamp());
+		assertEquals(42L, record.getTimestamp());
+		
+		assertEquals("foo", record.getValue());
+		
+		assertNotNull(record.toString());
+		
+		assertTrue(record.hashCode() == new StreamRecord<>("foo", 42).hashCode());
+		assertTrue(record.hashCode() != new StreamRecord<>("foo").hashCode());
+
+		assertTrue(record.equals(new StreamRecord<>("foo", 42)));
+		assertFalse(record.equals(new StreamRecord<>("foo")));
+
+		assertEquals(record, record.asRecord());
+
+		try {
+			record.asWatermark();
+			fail("should throw an exception");
+		} catch (Exception e) {
+			// expected
+		}
+	}
+
+	@Test
+	public void testAllowedTimestampRange() {
+		assertEquals(0L, new StreamRecord<>("test", 0).getTimestamp());
+		assertEquals(-1L, new StreamRecord<>("test", -1).getTimestamp());
+		assertEquals(1L, new StreamRecord<>("test", 1).getTimestamp());
+		assertEquals(Long.MIN_VALUE, new StreamRecord<>("test", Long.MIN_VALUE).getTimestamp());
+		assertEquals(Long.MAX_VALUE, new StreamRecord<>("test", Long.MAX_VALUE).getTimestamp());
+	}
+	
+	@Test
+	public void testReplacePreservesTimestamp() {
+		StreamRecord<String> recNoTimestamp = new StreamRecord<>("o sole mio");
+		StreamRecord<Integer> newRecNoTimestamp = recNoTimestamp.replace(17);
+		assertFalse(newRecNoTimestamp.hasTimestamp());
+
+		StreamRecord<String> recWithTimestamp = new StreamRecord<>("la dolce vita", 99);
+		StreamRecord<Integer> newRecWithTimestamp = recWithTimestamp.replace(17);
+		
+		assertTrue(newRecWithTimestamp.hasTimestamp());
+		assertEquals(99L, newRecWithTimestamp.getTimestamp());
+	}
+
+	@Test
+	public void testReplaceWithTimestampOverridesTimestamp() {
+		StreamRecord<String> record = new StreamRecord<>("la divina comedia");
+		assertFalse(record.hasTimestamp());
+		
+		StreamRecord<Double> newRecord = record.replace(3.14, 123);
+		assertTrue(newRecord.hasTimestamp());
+		assertEquals(123L, newRecord.getTimestamp());
+	}
+	
+	@Test
+	public void testCopy() {
+		StreamRecord<String> recNoTimestamp = new StreamRecord<String>("test");
+		StreamRecord<String> recNoTimestampCopy = recNoTimestamp.copy("test");
+		assertEquals(recNoTimestamp, recNoTimestampCopy);
+
+		StreamRecord<String> recWithTimestamp = new StreamRecord<String>("test", 99);
+		StreamRecord<String> recWithTimestampCopy = recWithTimestamp.copy("test");
+		assertEquals(recWithTimestamp, recWithTimestampCopy);
+	}
+
+	@Test
+	public void testCopyTo() {
+		StreamRecord<String> recNoTimestamp = new StreamRecord<String>("test");
+		StreamRecord<String> recNoTimestampCopy = new StreamRecord<>(null);
+		recNoTimestamp.copyTo("test", recNoTimestampCopy);
+		assertEquals(recNoTimestamp, recNoTimestampCopy);
+
+		StreamRecord<String> recWithTimestamp = new StreamRecord<String>("test", 99);
+		StreamRecord<String> recWithTimestampCopy = new StreamRecord<>(null);
+		recWithTimestamp.copyTo("test", recWithTimestampCopy);
+		assertEquals(recWithTimestamp, recWithTimestampCopy);
+	}
+	
+	@Test
+	public void testSetAndEraseTimestamps() {
+		StreamRecord<String> rec = new StreamRecord<String>("hello");
+		assertFalse(rec.hasTimestamp());
+		
+		rec.setTimestamp(13456L);
+		assertTrue(rec.hasTimestamp());
+		assertEquals(13456L, rec.getTimestamp());
+		
+		rec.eraseTimestamp();
+		assertFalse(rec.hasTimestamp());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index d2a9069..8dc7edd 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleIn
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -103,7 +104,7 @@ public class StreamTaskTestHarness<OUT> {
 		this.jobConfig = new Configuration();
 		this.taskConfig = new Configuration();
 		this.executionConfig = new ExecutionConfig();
-		executionConfig.enableTimestamps();
+		
 		try {
 			InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
 		} catch (IOException e) {
@@ -113,6 +114,7 @@ public class StreamTaskTestHarness<OUT> {
 		streamConfig = new StreamConfig(taskConfig);
 		streamConfig.setChainStart();
 		streamConfig.setBufferTimeout(0);
+		streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
 		outputSerializer = outputType.createSerializer(executionConfig);
 		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/ceb64248/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
index cdc2c53..32d57ca 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -15,25 +15,31 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.operators.Triggerable;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
+
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.concurrent.Semaphore;
 
 /**
@@ -43,19 +49,25 @@ import java.util.concurrent.Semaphore;
  * These tests ensure that exceptions are properly forwarded from the timer thread to
  * the task thread and that operator methods are not invoked concurrently.
  */
+@RunWith(Parameterized.class)
 public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
+	private final TimeCharacteristic timeCharacteristic;
+	
+	public StreamTaskTimerITCase(TimeCharacteristic characteristic) {
+		timeCharacteristic = characteristic;
+	}
+
+
 	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
-	 *
-	 * <p>
-	 * This test never finishes if exceptions from the timer thread are not forwarded. Thus
-	 * a success here means that the exception forwarding works.
+	 * Note: this test fails if we don't check for exceptions in the source contexts and do not
+	 * synchronize in the source contexts.
 	 */
 	@Test
 	public void testOperatorChainedToSource() throws Exception {
+		
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
 		env.setParallelism(1);
 
 		DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -86,12 +98,13 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 	}
 
 	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
+	 * Note: this test fails if we don't check for exceptions in the source contexts and do not
+	 * synchronize in the source contexts.
 	 */
 	@Test
 	public void testOneInputOperatorWithoutChaining() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
 		env.setParallelism(1);
 
 		DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -120,14 +133,11 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 		}
 		Assert.assertTrue(testSuccess);
 	}
-
-	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
-	 */
+	
 	@Test
 	public void testTwoInputOperatorWithoutChaining() throws Exception {
 		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setStreamTimeCharacteristic(timeCharacteristic);
 		env.setParallelism(1);
 
 		DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -185,7 +195,7 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 				first = false;
 			}
 			numElements++;
-
+			
 			semaphore.release();
 		}
 
@@ -212,7 +222,10 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 
 		@Override
 		public void processWatermark(Watermark mark) throws Exception {
-			//ignore
+			if (!semaphore.tryAcquire()) {
+				Assert.fail("Concurrent invocation of operator functions.");
+			}
+			semaphore.release();
 		}
 	}
 
@@ -310,4 +323,16 @@ public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
 			running = false;
 		}
 	}
+	
+	// ------------------------------------------------------------------------
+	//  parametrization
+	// ------------------------------------------------------------------------
+
+	@Parameterized.Parameters(name = "Time Characteristic = {0}")
+	public static Collection<Object[]> executionModes() {
+		return Arrays.asList(
+				new Object[] { TimeCharacteristic.ProcessingTime },
+				new Object[] { TimeCharacteristic.IngestionTime },
+				new Object[] { TimeCharacteristic.EventTime });
+	}
 }


Mime
View raw message