flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [02/15] flink git commit: [FLINK-2636] [streaming] Create common type StreamElement for StreamRecord and Watermark
Date Tue, 08 Sep 2015 18:58:52 GMT
[FLINK-2636] [streaming] Create common type StreamElement for StreamRecord and Watermark


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c09d14a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c09d14a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c09d14a9

Branch: refs/heads/master
Commit: c09d14a9d836d099c32639180dd58216ae6149a4
Parents: 655a891
Author: Stephan Ewen <sewen@apache.org>
Authored: Tue Sep 8 15:21:23 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Sep 8 20:58:05 2015 +0200

----------------------------------------------------------------------
 .../api/common/typeutils/TypeSerializer.java    |  1 -
 .../streaming/api/watermark/Watermark.java      | 24 ++----
 .../runtime/io/RecordWriterOutput.java          | 13 ++--
 .../runtime/io/StreamInputProcessor.java        | 29 ++++---
 .../runtime/io/StreamTwoInputProcessor.java     | 63 ++++++++--------
 .../MultiplexingStreamRecordSerializer.java     | 79 +++++++++++---------
 .../runtime/streamrecord/StreamElement.java     | 62 +++++++++++++++
 .../runtime/streamrecord/StreamRecord.java      |  4 +-
 .../consumer/StreamTestSingleInputGate.java     |  5 +-
 .../api/operators/StreamFilterTest.java         |  1 -
 .../api/operators/StreamGroupedFoldTest.java    |  2 -
 .../api/operators/StreamGroupedReduceTest.java  |  1 -
 .../streaming/api/operators/StreamMapTest.java  |  1 -
 .../api/operators/StreamProjectTest.java        |  3 +-
 .../api/operators/co/CoStreamFlatMapTest.java   |  5 --
 .../runtime/tasks/StreamTaskTestHarness.java    |  6 +-
 16 files changed, 173 insertions(+), 126 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
index 542b059..45b0669 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.api.common.typeutils;
 
 import java.io.IOException;

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
index 1d88fe2..163791e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/watermark/Watermark.java
@@ -17,6 +17,8 @@
  */
 package org.apache.flink.streaming.api.watermark;
 
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+
 /**
  * A Watermark tells operators that receive it that no elements with a timestamp older or
equal
  * to the watermark timestamp should arrive at the operator. Watermarks are emitted at the
@@ -31,11 +33,11 @@ package org.apache.flink.streaming.api.watermark;
  * In some cases a watermark is only a heuristic and operators should be able to deal with
  * late elements. They can either discard those or update the result and emit updates/retractions
  * to downstream operations.
- *
  */
-public class Watermark {
+public class Watermark extends StreamElement {
 
-	private long timestamp;
+	/** The timestamp of the watermark */
+	private final long timestamp;
 
 	/**
 	 * Creates a new watermark with the given timestamp.
@@ -53,16 +55,8 @@ public class Watermark {
 
 	@Override
 	public boolean equals(Object o) {
-		if (this == o) {
-			return true;
-		}
-		if (o == null || getClass() != o.getClass()) {
-			return false;
-		}
-
-		Watermark watermark = (Watermark) o;
-
-		return timestamp == watermark.timestamp;
+		return this == o ||
+				o != null && o.getClass() == Watermark.class && ((Watermark) o).timestamp
== this.timestamp;
 	}
 
 	@Override
@@ -72,8 +66,6 @@ public class Watermark {
 
 	@Override
 	public String toString() {
-		return "Watermark{" +
-				"timestamp=" + timestamp +
-				'}';
+		return "Watermark @ " + timestamp;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
index 7048464..34e5800 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/RecordWriterOutput.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -36,9 +37,9 @@ import static com.google.common.base.Preconditions.checkNotNull;
  */
 public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>
{
 
-	private StreamRecordWriter<SerializationDelegate<Object>> recordWriter;
+	private StreamRecordWriter<SerializationDelegate<StreamElement>> recordWriter;
 	
-	private SerializationDelegate<Object> serializationDelegate;
+	private SerializationDelegate<StreamElement> serializationDelegate;
 
 	
 	@SuppressWarnings("unchecked")
@@ -51,19 +52,19 @@ public class RecordWriterOutput<OUT> implements Output<StreamRecord<OUT>>
{
 		
 		// generic hack: cast the writer to generic Object type so we can use it 
 		// with multiplexed records and watermarks
-		this.recordWriter = (StreamRecordWriter<SerializationDelegate<Object>>) 
+		this.recordWriter = (StreamRecordWriter<SerializationDelegate<StreamElement>>)

 				(StreamRecordWriter<?>) recordWriter;
 
-		TypeSerializer<Object> outRecordSerializer;
+		TypeSerializer<StreamElement> outRecordSerializer;
 		if (enableWatermarkMultiplexing) {
 			outRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outSerializer);
 		} else {
-			outRecordSerializer = (TypeSerializer<Object>)
+			outRecordSerializer = (TypeSerializer<StreamElement>)
 					(TypeSerializer<?>) new StreamRecordSerializer<OUT>(outSerializer);
 		}
 
 		if (outSerializer != null) {
-			serializationDelegate = new SerializationDelegate<Object>(outRecordSerializer);
+			serializationDelegate = new SerializationDelegate<StreamElement>(outRecordSerializer);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
index de021ff..cc91d63 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java
@@ -36,6 +36,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -53,9 +54,9 @@ import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext;
  */
 public class StreamInputProcessor<IN> extends AbstractReader implements StreamingReader
{
 	
-	private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
+	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
-	private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
+	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
@@ -68,9 +69,9 @@ public class StreamInputProcessor<IN> extends AbstractReader implements
Streamin
 	private final long[] watermarks;
 	private long lastEmittedWatermark;
 
-	private final DeserializationDelegate<Object> deserializationDelegate;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate;
 
-	@SuppressWarnings("unchecked")
+	@SuppressWarnings({"unchecked", "rawtypes"})
 	public StreamInputProcessor(InputGate[] inputGates, TypeSerializer<IN> inputSerializer,
 								EventListener<CheckpointBarrier> checkpointListener,
 								CheckpointingMode checkpointMode,
@@ -95,10 +96,10 @@ public class StreamInputProcessor<IN> extends AbstractReader implements
Streamin
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN> ser = new MultiplexingStreamRecordSerializer<IN>(inputSerializer);
-			this.deserializationDelegate = new NonReusingDeserializationDelegate<Object>(ser);
+			this.deserializationDelegate = new NonReusingDeserializationDelegate<StreamElement>(ser);
 		} else {
 			StreamRecordSerializer<IN> ser = new StreamRecordSerializer<IN>(inputSerializer);
-			this.deserializationDelegate = (NonReusingDeserializationDelegate<Object>)
+			this.deserializationDelegate = (NonReusingDeserializationDelegate<StreamElement>)
 					(NonReusingDeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN>>(ser);
 		}
 		
@@ -106,7 +107,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements
Streamin
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
 		}
 
 		watermarks = new long[inputGate.getNumberOfInputChannels()];
@@ -132,18 +133,15 @@ public class StreamInputProcessor<IN> extends AbstractReader implements
Streamin
 				}
 
 				if (result.isFullRecord()) {
-					Object recordOrWatermark = deserializationDelegate.getInstance();
+					StreamElement recordOrWatermark = deserializationDelegate.getInstance();
 
-					if (recordOrWatermark instanceof Watermark) {
-						Watermark mark = (Watermark) recordOrWatermark;
-						long watermarkMillis = mark.getTimestamp();
+					if (recordOrWatermark.isWatermark()) {
+						long watermarkMillis = recordOrWatermark.asWatermark().getTimestamp();
 						if (watermarkMillis > watermarks[currentChannel]) {
 							watermarks[currentChannel] = watermarkMillis;
 							long newMinWatermark = Long.MAX_VALUE;
 							for (long watermark : watermarks) {
-								if (watermark < newMinWatermark) {
-									newMinWatermark = watermark;
-								}
+								newMinWatermark = Math.min(watermark, newMinWatermark);
 							}
 							if (newMinWatermark > lastEmittedWatermark) {
 								lastEmittedWatermark = newMinWatermark;
@@ -154,8 +152,7 @@ public class StreamInputProcessor<IN> extends AbstractReader implements
Streamin
 					}
 					else {
 						// now we can do the actual processing
-						@SuppressWarnings("unchecked")
-						StreamRecord<IN> record = (StreamRecord<IN>) deserializationDelegate.getInstance();
+						StreamRecord<IN> record = recordOrWatermark.asRecord();
 						StreamingRuntimeContext ctx = streamOperator.getRuntimeContext();
 						if (ctx != null) {
 							ctx.setNextInput(record);

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index e0af729..7dffa71 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -35,6 +35,7 @@ import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
 import org.apache.flink.runtime.util.event.EventListener;
 import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -44,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collection;
 
 /**
@@ -61,9 +63,9 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 	@SuppressWarnings("unused")
 	private static final Logger LOG = LoggerFactory.getLogger(StreamTwoInputProcessor.class);
 
-	private final RecordDeserializer<DeserializationDelegate<Object>>[] recordDeserializers;
+	private final RecordDeserializer<DeserializationDelegate<StreamElement>>[] recordDeserializers;
 
-	private RecordDeserializer<DeserializationDelegate<Object>> currentRecordDeserializer;
+	private RecordDeserializer<DeserializationDelegate<StreamElement>> currentRecordDeserializer;
 
 	// We need to keep track of the channel from which a buffer came, so that we can
 	// appropriately map the watermarks to input channels
@@ -81,8 +83,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 
 	private final int numInputChannels1;
 
-	private final DeserializationDelegate<Object> deserializationDelegate1;
-	private final DeserializationDelegate<Object> deserializationDelegate2;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate1;
+	private final DeserializationDelegate<StreamElement> deserializationDelegate2;
 
 	@SuppressWarnings({"unchecked", "rawtypes"})
 	public StreamTwoInputProcessor(
@@ -113,21 +115,21 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN1> ser = new MultiplexingStreamRecordSerializer<IN1>(inputSerializer1);
-			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<Object>(ser);
+			this.deserializationDelegate1 = new NonReusingDeserializationDelegate<StreamElement>(ser);
 		}
 		else {
 			StreamRecordSerializer<IN1> ser = new StreamRecordSerializer<IN1>(inputSerializer1);
-			this.deserializationDelegate1 = (DeserializationDelegate<Object>)
+			this.deserializationDelegate1 = (DeserializationDelegate<StreamElement>)
 					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN1>>(ser);
 		}
 		
 		if (enableWatermarkMultiplexing) {
 			MultiplexingStreamRecordSerializer<IN2> ser = new MultiplexingStreamRecordSerializer<IN2>(inputSerializer2);
-			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<Object>(ser);
+			this.deserializationDelegate2 = new NonReusingDeserializationDelegate<StreamElement>(ser);
 		}
 		else {
 			StreamRecordSerializer<IN2> ser = new StreamRecordSerializer<IN2>(inputSerializer2);
-			this.deserializationDelegate2 = (DeserializationDelegate<Object>)
+			this.deserializationDelegate2 = (DeserializationDelegate<StreamElement>)
 					(DeserializationDelegate<?>) new NonReusingDeserializationDelegate<StreamRecord<IN2>>(ser);
 		}
 
@@ -135,7 +137,7 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 		this.recordDeserializers = new SpillingAdaptiveSpanningRecordDeserializer[inputGate.getNumberOfInputChannels()];
 		
 		for (int i = 0; i < recordDeserializers.length; i++) {
-			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<Object>>();
+			recordDeserializers[i] = new SpillingAdaptiveSpanningRecordDeserializer<DeserializationDelegate<StreamElement>>();
 		}
 
 		// determine which unioned channels belong to input 1 and which belong to input 2
@@ -148,15 +150,11 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 		int numInputChannels2 = inputGate.getNumberOfInputChannels() - numInputChannels1;
 
 		watermarks1 = new long[numInputChannels1];
-		for (int i = 0; i < numInputChannels1; i++) {
-			watermarks1[i] = Long.MIN_VALUE;
-		}
+		Arrays.fill(watermarks1, Long.MIN_VALUE);
 		lastEmittedWatermark1 = Long.MIN_VALUE;
 
 		watermarks2 = new long[numInputChannels2];
-		for (int i = 0; i < numInputChannels2; i++) {
-			watermarks2[i] = Long.MIN_VALUE;
-		}
+		Arrays.fill(watermarks2, Long.MIN_VALUE);
 		lastEmittedWatermark2 = Long.MIN_VALUE;
 	}
 
@@ -182,22 +180,25 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 
 				if (result.isFullRecord()) {
 					if (currentChannel < numInputChannels1) {
-						Object recordOrWatermark = deserializationDelegate1.getInstance();
-						if (recordOrWatermark instanceof Watermark) {
+						StreamElement recordOrWatermark = deserializationDelegate1.getInstance();
+						if (recordOrWatermark.isWatermark()) {
 							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
 							continue;
-						} else {
-							streamOperator.processElement1((StreamRecord<IN1>) deserializationDelegate1.getInstance());
+						}
+						else {
+							streamOperator.processElement1(recordOrWatermark.<IN1>asRecord());
 							return true;
 
 						}
-					} else {
-						Object recordOrWatermark = deserializationDelegate2.getInstance();
-						if (recordOrWatermark instanceof Watermark) {
-							handleWatermark(streamOperator, (Watermark) recordOrWatermark, currentChannel);
+					}
+					else {
+						StreamElement recordOrWatermark = deserializationDelegate2.getInstance();
+						if (recordOrWatermark.isWatermark()) {
+							handleWatermark(streamOperator, recordOrWatermark.asWatermark(), currentChannel);
 							continue;
-						} else {
-							streamOperator.processElement2((StreamRecord<IN2>) deserializationDelegate2.getInstance());
+						}
+						else {
+							streamOperator.processElement2(recordOrWatermark.<IN2>asRecord());
 							return true;
 						}
 					}
@@ -234,10 +235,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 			if (watermarkMillis > watermarks1[channelIndex]) {
 				watermarks1[channelIndex] = watermarkMillis;
 				long newMinWatermark = Long.MAX_VALUE;
-				for (long aWatermarks1 : watermarks1) {
-					if (aWatermarks1 < newMinWatermark) {
-						newMinWatermark = aWatermarks1;
-					}
+				for (long wm : watermarks1) {
+					newMinWatermark = Math.min(wm, newMinWatermark);
 				}
 				if (newMinWatermark > lastEmittedWatermark1) {
 					lastEmittedWatermark1 = newMinWatermark;
@@ -250,10 +249,8 @@ public class StreamTwoInputProcessor<IN1, IN2> extends AbstractReader
implements
 			if (watermarkMillis > watermarks2[channelIndex]) {
 				watermarks2[channelIndex] = watermarkMillis;
 				long newMinWatermark = Long.MAX_VALUE;
-				for (long aWatermarks2 : watermarks2) {
-					if (aWatermarks2 < newMinWatermark) {
-						newMinWatermark = aWatermarks2;
-					}
+				for (long wm : watermarks2) {
+					newMinWatermark = Math.min(wm, newMinWatermark);
 				}
 				if (newMinWatermark > lastEmittedWatermark2) {
 					lastEmittedWatermark2 = newMinWatermark;

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
index 075c4fc..156e5d6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java
@@ -27,22 +27,22 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import java.io.IOException;
 
 /**
- * Serializer for {@link StreamRecord} and {@link Watermark}. This does not behave like a
normal
+ * Serializer for {@link StreamRecord} and {@link org.apache.flink.streaming.api.watermark.Watermark}.
This does not behave like a normal
  * {@link TypeSerializer}, instead, this is only used at the
  * {@link org.apache.flink.streaming.runtime.tasks.StreamTask} level for transmitting
- * {@link StreamRecord StreamRecords} and {@link Watermark Watermarks}. This serializer
+ * {@link StreamRecord StreamRecords} and {@link org.apache.flink.streaming.api.watermark.Watermark
Watermarks}. This serializer
  * can handle both of them, therefore it returns {@link Object} the result has
  * to be cast to the correct type.
  *
  * @param <T> The type of value in the {@link org.apache.flink.streaming.runtime.streamrecord.StreamRecord}
  */
-public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<Object>
{
+public final class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement>
{
 
 	private static final long serialVersionUID = 1L;
 
 	private static final long IS_WATERMARK = Long.MIN_VALUE;
 	
-	protected final TypeSerializer<T> typeSerializer;
+	private final TypeSerializer<T> typeSerializer;
 
 	
 	public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) {
@@ -59,87 +59,94 @@ public final class MultiplexingStreamRecordSerializer<T> extends
TypeSerializer<
 	}
 
 	@Override
-	public TypeSerializer<Object> duplicate() {
-		return this;
+	public TypeSerializer<StreamElement> duplicate() {
+		TypeSerializer<T> copy = typeSerializer.duplicate();
+		return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy);
 	}
 
 	@Override
-	public Object createInstance() {
+	public StreamRecord<T> createInstance() {
 		return new StreamRecord<T>(typeSerializer.createInstance(), 0L);
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from) {
+	public StreamElement copy(StreamElement from) {
 		// we can reuse the timestamp since Instant is immutable
-		if (from instanceof StreamRecord) {
-			StreamRecord<T> fromRecord = (StreamRecord<T>) from;
+		if (from.isRecord()) {
+			StreamRecord<T> fromRecord = from.asRecord();
 			return new StreamRecord<T>(typeSerializer.copy(fromRecord.getValue()), fromRecord.getTimestamp());
-		} else if (from instanceof Watermark) {
+		}
+		else if (from.isWatermark()) {
 			// is immutable
 			return from;
-		} else {
+		}
+		else {
 			throw new RuntimeException("Cannot copy " + from);
 		}
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object copy(Object from, Object reuse) {
-		if (from instanceof StreamRecord && reuse instanceof StreamRecord) {
-			StreamRecord<T> fromRecord = (StreamRecord<T>) from;
-			StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+	public StreamElement copy(StreamElement from, StreamElement reuse) {
+		if (from.isRecord() && reuse.isRecord()) {
+			StreamRecord<T> fromRecord = from.asRecord();
+			StreamRecord<T> reuseRecord = reuse.asRecord();
 
-			reuseRecord.replace(typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()),
fromRecord.getTimestamp());
+			T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue());
+			reuseRecord.replace(valueCopy, fromRecord.getTimestamp());
 			return reuse;
-		} else if (from instanceof Watermark) {
+		}
+		else if (from.isWatermark()) {
 			// is immutable
 			return from;
-		} else {
-			throw new RuntimeException("Cannot copy " + from);
+		}
+		else {
+			throw new RuntimeException("Cannot copy " + from + " -> " + reuse);
 		}
 	}
 
 	@Override
 	public int getLength() {
-		return 0;
+		return -1;
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public void serialize(Object value, DataOutputView target) throws IOException {
-		if (value instanceof StreamRecord) {
-			StreamRecord<T> record = (StreamRecord<T>) value;
+	public void serialize(StreamElement value, DataOutputView target) throws IOException {
+		if (value.isRecord()) {
+			StreamRecord<T> record = value.asRecord();
 			target.writeLong(record.getTimestamp());
 			typeSerializer.serialize(record.getValue(), target);
-		} else if (value instanceof Watermark) {
+		}
+		else if (value.isWatermark()) {
 			target.writeLong(IS_WATERMARK);
-			target.writeLong(((Watermark) value).getTimestamp());
+			target.writeLong(value.asWatermark().getTimestamp());
+		}
+		else {
+			throw new RuntimeException();
 		}
 	}
 	
 	@Override
-	public Object deserialize(DataInputView source) throws IOException {
+	public StreamElement deserialize(DataInputView source) throws IOException {
 		long millis = source.readLong();
 
 		if (millis == IS_WATERMARK) {
 			return new Watermark(source.readLong());
-		} else {
+		}
+		else {
 			T element = typeSerializer.deserialize(source);
 			return new StreamRecord<T>(element, millis);
 		}
 	}
 
 	@Override
-	@SuppressWarnings("unchecked")
-	public Object deserialize(Object reuse, DataInputView source) throws IOException {
+	public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException
{
 		long millis = source.readLong();
 
 		if (millis == IS_WATERMARK) {
 			return new Watermark(source.readLong());
-
-		} else {
-			StreamRecord<T> reuseRecord = (StreamRecord<T>) reuse;
+		}
+		else {
+			StreamRecord<T> reuseRecord = reuse.asRecord();
 			T element = typeSerializer.deserialize(reuseRecord.getValue(), source);
 			reuseRecord.replace(element, millis);
 			return reuse;

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
new file mode 100644
index 0000000..80df72e
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElement.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.streamrecord;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+
+/**
+ * An element in a data stream. Can be a record or a Watermark.
+ */
+public abstract class StreamElement {
+	
+	/**
+	 * Checks whether this element is a watermark.
+	 * @return True, if this element is a watermark, false otherwise.
+	 */
+	public final boolean isWatermark() {
+		return getClass() == Watermark.class;
+	}
+
+	/**
+	 * Checks whether this element is a record.
+	 * @return True, if this element is a record, false otherwise.
+	 */
+	public final boolean isRecord() {
+		return getClass() == StreamRecord.class;
+	}
+
+	/**
+	 * Casts this element into a StreamRecord.
+	 * @return This element as a stream record.
+	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a stream
record.
+	 */
+	@SuppressWarnings("unchecked")
+	public final <E> StreamRecord<E> asRecord() {
+		return (StreamRecord<E>) this;
+	}
+
+	/**
+	 * Casts this element into a Watermark.
+	 * @return This element as a Watermark.
+	 * @throws java.lang.ClassCastException Thrown, if this element is actually not a Watermark.
+	 */
+	public final Watermark asWatermark() {
+		return (Watermark) this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
index 92ce66f..348b974 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamRecord.java
@@ -22,10 +22,12 @@ package org.apache.flink.streaming.runtime.streamrecord;
  * 
  * @param <T> The type encapsulated with the stream record.
  */
-public class StreamRecord<T> {
+public class StreamRecord<T> extends StreamElement {
 	
+	/** The actual value held by this record */
 	private T value;
 	
+	/** The timestamp of the record */
 	private long timestamp;
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
index b78ec44..1187fe6 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -20,7 +20,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.HeapMemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
@@ -32,6 +31,7 @@ import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -85,7 +85,8 @@ public class StreamTestSingleInputGate<T> extends TestSingleInputGate
{
 		for (int i = 0; i < numInputChannels; i++) {
 			final int channelIndex = i;
 			final RecordSerializer<SerializationDelegate<Object>> recordSerializer = new
SpanningRecordSerializer<SerializationDelegate<Object>>();
-			final SerializationDelegate<Object> delegate = new SerializationDelegate<Object>(new
MultiplexingStreamRecordSerializer<T>(serializer));
+			final SerializationDelegate<Object> delegate = (SerializationDelegate<Object>)
(SerializationDelegate<?>)
+					new SerializationDelegate<StreamElement>(new MultiplexingStreamRecordSerializer<T>(serializer));
 
 			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
 			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
index bf4fe40..047aad8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
index cb08e65..dcfe3de 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedFoldTest.java
@@ -21,7 +21,6 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.RichFoldFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -31,7 +30,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
index 9e35fa2..a2cd1fd 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamGroupedReduceTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
index 4d12492..f0113d1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamMapTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.joda.time.Instant;
 import org.junit.Assert;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
index bb9dad7..14abd18 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamProjectTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.streaming.api.operators;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
-import java.io.Serializable;
 import java.util.HashSet;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
@@ -42,7 +41,7 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
index 4c644a9..4dbf7b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/co/CoStreamFlatMapTest.java
@@ -23,19 +23,14 @@ import java.io.Serializable;
 import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.RichCoFlatMapFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.NoOpSink;
 import org.apache.flink.streaming.util.TestHarnessUtil;
 import org.apache.flink.streaming.util.TwoInputStreamOperatorTestHarness;
 import org.apache.flink.util.Collector;
 import org.junit.Assert;
-import org.junit.Ignore;
 import org.junit.Test;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/c09d14a9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index 96dbeab..06fca6b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -35,6 +35,7 @@ import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
 import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.util.InstantiationUtil;
 import org.junit.Assert;
 
@@ -79,7 +80,7 @@ public class StreamTaskTestHarness<OUT> {
 	private AbstractInvokable task;
 
 	private TypeSerializer<OUT> outputSerializer;
-	private TypeSerializer<Object> outputStreamRecordSerializer;
+	private TypeSerializer<StreamElement> outputStreamRecordSerializer;
 
 	private ConcurrentLinkedQueue<Object> outputList;
 
@@ -119,8 +120,7 @@ public class StreamTaskTestHarness<OUT> {
 	/**
 	 * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
 	 */
-	protected void initializeInputs() throws IOException, InterruptedException {
-	}
+	protected void initializeInputs() throws IOException, InterruptedException {}
 
 	@SuppressWarnings("unchecked")
 	private void initializeOutput() {


Mime
View raw message