flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [5/8] flink git commit: [FLINK-1967] Introduce (Event)time in Streaming
Date Tue, 21 Jul 2015 10:45:15 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
index a9ebf5b..9d6e88e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTask.java
@@ -18,17 +18,11 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
-import java.io.IOException;
-
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.IndexedMutableReader;
-import org.apache.flink.streaming.runtime.io.IndexedReaderIterator;
-import org.apache.flink.streaming.runtime.io.InputGateFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,56 +30,26 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 
 	private static final Logger LOG = LoggerFactory.getLogger(OneInputStreamTask.class);
 
-	protected StreamRecordSerializer<IN> inSerializer;
-	private IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>> inputs;
-	protected IndexedReaderIterator<StreamRecord<IN>> recordIterator;
-
+	private StreamInputProcessor<IN> inputProcessor;
 
 	@Override
 	public void registerInputOutput() {
 		super.registerInputOutput();
 
-		inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
+		TypeSerializer<IN> inSerializer = configuration.getTypeSerializerIn1(getUserCodeClassLoader());
 
 		int numberOfInputs = configuration.getNumberOfInputs();
 
 		if (numberOfInputs > 0) {
-			InputGate inputGate = InputGateFactory.createInputGate(getEnvironment().getAllInputGates());
-			inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(inputGate);
+			InputGate[] inputGates = getEnvironment().getAllInputGates();
+			inputProcessor = new StreamInputProcessor<IN>(inputGates, inSerializer, getExecutionConfig().areTimestampsEnabled());
+
+			inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
 
 			AccumulatorRegistry registry = getEnvironment().getAccumulatorRegistry();
 			AccumulatorRegistry.Reporter reporter = registry.getReadWriteReporter();
 
-			inputs.setReporter(reporter);
-
-			inputs.registerTaskEventListener(getSuperstepListener(), StreamingSuperstep.class);
-
-			recordIterator = new IndexedReaderIterator<StreamRecord<IN>>(inputs, inSerializer);
-		}
-	}
-
-	/*
-	 * Reads the next record from the reader iterator and stores it in the
-	 * nextRecord variable
-	 */
-	protected StreamRecord<IN> readNext() throws IOException {
-		StreamRecord<IN> nextRecord = inSerializer.createInstance();
-		try {
-			return recordIterator.next(nextRecord);
-		} catch (IOException e) {
-			if (isRunning) {
-				throw new RuntimeException("Could not read next record.", e);
-			} else {
-				// Task already cancelled do nothing
-				return null;
-			}
-		} catch (IllegalStateException e) {
-			if (isRunning) {
-				throw new RuntimeException("Could not read next record.", e);
-			} else {
-				// Task already cancelled do nothing
-				return null;
-			}
+			inputProcessor.setReporter(reporter);
 		}
 	}
 
@@ -103,10 +67,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 			openOperator();
 			operatorOpen = true;
 
-			StreamRecord<IN> nextRecord;
-			while (isRunning && (nextRecord = readNext()) != null) {
-				headContext.setNextInput(nextRecord.getObject());
-				streamOperator.processElement(nextRecord.getObject());
+			while (inputProcessor.processInput(streamOperator)) {
+				// nothing to do, just keep processing
 			}
 
 			closeOperator();
@@ -123,8 +85,7 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 			if (operatorOpen) {
 				try {
 					closeOperator();
-				}
-				catch (Throwable t) {
+				} catch (Throwable t) {
 					LOG.warn("Exception while closing operator.", t);
 				}
 			}
@@ -134,8 +95,8 @@ public class OneInputStreamTask<IN, OUT> extends StreamTask<OUT, OneInputStreamO
 		finally {
 			this.isRunning = false;
 			// Cleanup
-			inputs.clearBuffers();
-			inputs.cleanup();
+			inputProcessor.clearBuffers();
+			inputProcessor.cleanup();
 			outputHandler.flushOutputs();
 			clearBuffers();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
index 41ee388..cf17b3e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/OutputHandler.java
@@ -24,15 +24,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import com.google.common.base.Preconditions;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.collector.CollectorWrapper;
-import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.io.CollectorWrapper;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.api.collector.selector.OutputSelectorWrapper;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamEdge;
@@ -41,6 +41,7 @@ import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.runtime.io.RecordWriterFactory;
 import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
 import org.apache.flink.util.Collector;
@@ -53,11 +54,11 @@ public class OutputHandler<OUT> {
 	private StreamTask<OUT, ?> vertex;
 	private StreamConfig configuration;
 	private ClassLoader cl;
-	private Output<OUT> outerOutput;
+	private Output<StreamRecord<OUT>> outerOutput;
 
 	public List<StreamOperator<?>> chainedOperators;
 
-	private Map<StreamEdge, StreamOutput<?>> outputMap;
+	private Map<StreamEdge, RecordWriterOutput<?>> outputMap;
 
 	private Map<Integer, StreamConfig> chainedConfigs;
 	private List<StreamEdge> outEdgesInOrder;
@@ -75,7 +76,7 @@ public class OutputHandler<OUT> {
 		this.vertex = vertex;
 		this.configuration = new StreamConfig(vertex.getTaskConfiguration());
 		this.chainedOperators = new ArrayList<StreamOperator<?>>();
-		this.outputMap = new HashMap<StreamEdge, StreamOutput<?>>();
+		this.outputMap = new HashMap<StreamEdge, RecordWriterOutput<?>>();
 		this.cl = vertex.getUserCodeClassLoader();
 
 		// We read the chained configs, and the order of record writer
@@ -90,7 +91,7 @@ public class OutputHandler<OUT> {
 		// We iterate through all the out edges from this job vertex and create
 		// a stream output
 		for (StreamEdge outEdge : outEdgesInOrder) {
-			StreamOutput<?> streamOutput = createStreamOutput(
+			RecordWriterOutput<?> streamOutput = createStreamOutput(
 					outEdge,
 					outEdge.getTargetId(),
 					chainedConfigs.get(outEdge.getSourceId()),
@@ -108,13 +109,13 @@ public class OutputHandler<OUT> {
 	}
 
 	public void broadcastBarrier(long id, long timestamp) throws IOException, InterruptedException {
-		StreamingSuperstep barrier = new StreamingSuperstep(id, timestamp);
-		for (StreamOutput<?> streamOutput : outputMap.values()) {
+		CheckpointBarrier barrier = new CheckpointBarrier(id, timestamp);
+		for (RecordWriterOutput<?> streamOutput : outputMap.values()) {
 			streamOutput.broadcastEvent(barrier);
 		}
 	}
 
-	public Collection<StreamOutput<?>> getOutputs() {
+	public Collection<RecordWriterOutput<?>> getOutputs() {
 		return outputMap.values();
 	}
 	
@@ -134,8 +135,7 @@ public class OutputHandler<OUT> {
 	 * config
 	 */
 	@SuppressWarnings({"unchecked", "rawtypes"})
-	private <X> Output<X> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
-		Preconditions.checkNotNull(accumulatorMap);
+	private <X> Output<StreamRecord<X>> createChainedCollector(StreamConfig chainedTaskConfig, Map<String, Accumulator<?,?>> accumulatorMap) {
 
 
 		// We create a wrapper that will encapsulate the chained operators and
@@ -163,7 +163,7 @@ public class OutputHandler<OUT> {
 		if (chainedTaskConfig.isChainStart()) {
 			// The current task is the first chained task at this vertex so we
 			// return the wrapper
-			return (Output<X>) wrapper;
+			return (Output<StreamRecord<X>>) wrapper;
 		} else {
 			// The current task is a part of the chain so we get the chainable
 			// operator which will be returned and set it up using the wrapper
@@ -177,17 +177,21 @@ public class OutputHandler<OUT> {
 
 			chainedOperators.add(chainableOperator);
 			if (vertex.getExecutionConfig().isObjectReuseEnabled() || chainableOperator.isInputCopyingDisabled()) {
-				return new OperatorCollector<X>(chainableOperator);
+				return new ChainingOutput<X>(chainableOperator);
 			} else {
-				return new CopyingOperatorCollector<X>(
-						chainableOperator,
-						(TypeSerializer<X>) chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()).getObjectSerializer());
+				StreamRecordSerializer serializerIn1;
+				if (vertex.getExecutionConfig().areTimestampsEnabled()) {
+					serializerIn1 = new MultiplexingStreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+				} else {
+					serializerIn1 = new StreamRecordSerializer(chainedTaskConfig.getTypeSerializerIn1(vertex.getUserCodeClassLoader()));
+				}
+				return new CopyingChainingOutput<X>(chainableOperator, (TypeSerializer<StreamRecord<X>>) serializerIn1);
 			}
 		}
 
 	}
 
-	public Output<OUT> getOutput() {
+	public Output<StreamRecord<OUT>> getOutput() {
 		return outerOutput;
 	}
 
@@ -201,17 +205,11 @@ public class OutputHandler<OUT> {
 	 * 		The config of upStream task
 	 * @return The created StreamOutput
 	 */
-	private <T> StreamOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
+	private <T> RecordWriterOutput<T> createStreamOutput(StreamEdge edge, Integer outputVertex,
 			StreamConfig upStreamConfig, int outputIndex, AccumulatorRegistry.Reporter reporter) {
 
-		StreamRecordSerializer<T> outSerializer = upStreamConfig
-				.getTypeSerializerOut1(vertex.userClassLoader);
-		SerializationDelegate<StreamRecord<T>> outSerializationDelegate = null;
+		TypeSerializer<T> outSerializer = upStreamConfig.getTypeSerializerOut1(vertex.userClassLoader);
 
-		if (outSerializer != null) {
-			outSerializationDelegate = new SerializationDelegate<StreamRecord<T>>(outSerializer);
-			outSerializationDelegate.setInstance(outSerializer.createInstance());
-		}
 
 		@SuppressWarnings("unchecked")
 		StreamPartitioner<T> outputPartitioner = (StreamPartitioner<T>) edge.getPartitioner();
@@ -223,7 +221,8 @@ public class OutputHandler<OUT> {
 
 		output.setReporter(reporter);
 
-		StreamOutput<T> streamOutput = new StreamOutput<T>(output, outSerializationDelegate);
+		@SuppressWarnings("unchecked")
+		RecordWriterOutput<T> streamOutput = new RecordWriterOutput<T>((RecordWriter) output, outSerializer, vertex.getExecutionConfig().areTimestampsEnabled());
 
 		if (LOG.isTraceEnabled()) {
 			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
@@ -234,27 +233,27 @@ public class OutputHandler<OUT> {
 	}
 
 	public void flushOutputs() throws IOException, InterruptedException {
-		for (StreamOutput<?> streamOutput : getOutputs()) {
+		for (RecordWriterOutput<?> streamOutput : getOutputs()) {
 			streamOutput.close();
 		}
 	}
 
 	public void clearWriters() {
-		for (StreamOutput<?> output : outputMap.values()) {
+		for (RecordWriterOutput<?> output : outputMap.values()) {
 			output.clearBuffers();
 		}
 	}
 
-	private static class OperatorCollector<T> implements Output<T> {
-
-		protected OneInputStreamOperator<Object, T> operator;
+	private static class ChainingOutput<T> implements Output<StreamRecord<T>> {
+		protected OneInputStreamOperator operator;
 
-		public OperatorCollector(OneInputStreamOperator<Object, T> operator) {
+		public ChainingOutput(OneInputStreamOperator<?, T> operator) {
 			this.operator = operator;
 		}
 
 		@Override
-		public void collect(T record) {
+		@SuppressWarnings("unchecked")
+		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(record);
@@ -267,7 +266,19 @@ public class OutputHandler<OUT> {
 		}
 
 		@Override
-		public final void close() {
+		public void emitWatermark(Watermark mark) {
+			try {
+				operator.processWatermark(mark);
+			} catch (Exception e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Could not forward element to operator: {}", e);
+				}
+				throw new RuntimeException(e);
+			}
+		}
+
+		@Override
+		public void close() {
 			try {
 				operator.close();
 			} catch (Exception e) {
@@ -278,17 +289,18 @@ public class OutputHandler<OUT> {
 		}
 	}
 
-	private static class CopyingOperatorCollector<T> extends OperatorCollector<T> {
-		private final TypeSerializer<T> serializer;
+	private static class CopyingChainingOutput<T> extends ChainingOutput<T> {
+		private final TypeSerializer<StreamRecord<T>> serializer;
 
-		@SuppressWarnings({ "rawtypes", "unchecked" })
-		public CopyingOperatorCollector(OneInputStreamOperator operator, TypeSerializer<T> serializer) {
+		public CopyingChainingOutput(OneInputStreamOperator<?, T> operator,
+				TypeSerializer<StreamRecord<T>> serializer) {
 			super(operator);
 			this.serializer = serializer;
 		}
 
 		@Override
-		public void collect(T record) {
+		@SuppressWarnings("unchecked")
+		public void collect(StreamRecord<T> record) {
 			try {
 				operator.getRuntimeContext().setNextInput(record);
 				operator.processElement(serializer.copy(record));

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 35b5341..1940c11 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -18,7 +18,10 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -40,6 +43,8 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 
 	@Override
 	public void invoke() throws Exception {
+		final SourceOutput<StreamRecord<OUT>> output = new SourceOutput<StreamRecord<OUT>>(outputHandler.getOutput(), checkpointLock);
+
 		this.isRunning = true;
 
 		boolean operatorOpen = false;
@@ -52,7 +57,7 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 			openOperator();
 			operatorOpen = true;
 
-			streamOperator.run(checkpointLock, outputHandler.getOutput());
+			streamOperator.run(checkpointLock, output);
 
 			closeOperator();
 			operatorOpen = false;
@@ -89,4 +94,34 @@ public class SourceStreamTask<OUT> extends StreamTask<OUT, StreamSource<OUT>> {
 		super.cancel();
 		streamOperator.cancel();
 	}
+
+	private static class SourceOutput<T> implements Output<T> {
+		private final Output<T> output;
+		private final Object lockObject;
+
+		public SourceOutput(Output<T> output, Object lockObject) {
+			this.output = output;
+			this.lockObject = lockObject;
+		}
+
+		@Override
+		public void emitWatermark(Watermark mark) {
+			synchronized (lockObject) {
+				output.emitWatermark(mark);
+			}
+		}
+
+		@Override
+		public void collect(T record) {
+			synchronized (lockObject) {
+				output.collect(record);
+			}
+
+		}
+
+		@Override
+		public void close() {
+			output.close();
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index e5d58d3..1736e52 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -25,9 +25,10 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.streaming.api.collector.StreamOutput;
+import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +77,7 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 			LOG.debug("Iteration source {} invoked", getName());
 		}
 
-		Collection<StreamOutput<?>> outputs = outputHandler.getOutputs();
+		Collection<RecordWriterOutput<?>> outputs = outputHandler.getOutputs();
 
 		try {
 			StreamRecord<OUT> nextRecord;
@@ -90,8 +91,8 @@ public class StreamIterationHead<OUT> extends OneInputStreamTask<OUT, OUT> {
 				if (nextRecord == null) {
 					break;
 				}
-				for (StreamOutput<?> output : outputs) {
-					((StreamOutput<OUT>) output).collect(nextRecord.getObject());
+				for (RecordWriterOutput<?> output : outputs) {
+					((RecordWriterOutput<OUT>) output).collect(nextRecord);
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
index b6e3889..9fbc3a7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationTail.java
@@ -20,6 +20,9 @@ package org.apache.flink.streaming.runtime.tasks;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.io.BlockingQueueBroker;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.util.StringUtils;
@@ -43,6 +46,7 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 	@Override
 	public void registerInputOutput() {
 		super.registerInputOutput();
+
 		try {
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
@@ -53,59 +57,33 @@ public class StreamIterationTail<IN> extends OneInputStreamTask<IN, IN> {
 			throw new StreamTaskException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);
 		}
+		this.streamOperator = new RecordPusher();
 	}
 
-	@Override
-	public void invoke() throws Exception {
-		isRunning = true;
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Iteration sink {} invoked", getName());
-		}
-
-		try {
-			forwardRecords();
+	class RecordPusher extends AbstractStreamOperator<IN> implements OneInputStreamOperator<IN, IN> {
+		private static final long serialVersionUID = 1L;
 
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Iteration sink {} invoke finished", getName());
+		@Override
+		public void processElement(StreamRecord<IN> record) throws Exception {
+			try {
+				if (shouldWait) {
+					dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
+				} else {
+					dataChannel.put(record);
+				}
+			} catch (InterruptedException e) {
+				if (LOG.isErrorEnabled()) {
+					LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
+							StringUtils.stringifyException(e));
+				}
+				throw e;
 			}
 		}
-		catch (Exception e) {
-			LOG.error("Iteration tail " + getEnvironment().getTaskNameWithSubtasks() + " failed", e);
-			throw e;
-		}
-		finally {
-			// Cleanup
-			isRunning = false;
-			clearBuffers();
-		}
-	}
 
-	protected void forwardRecords() throws Exception {
-		StreamRecord<IN> reuse = inSerializer.createInstance();
-		while ((reuse = recordIterator.next(reuse)) != null) {
-			if (!pushToQueue(reuse)) {
-				break;
-			}
-			reuse = inSerializer.createInstance();
+		@Override
+		public void processWatermark(Watermark mark) throws Exception {
+			// ignore
 		}
 	}
 
-	private boolean pushToQueue(StreamRecord<IN> record) throws InterruptedException {
-		try {
-			if (shouldWait) {
-				return dataChannel.offer(record, iterationWaitTime, TimeUnit.MILLISECONDS);
-			} else {
-				dataChannel.put(record);
-				return true;
-			}
-		} catch (InterruptedException e) {
-			if (LOG.isErrorEnabled()) {
-				LOG.error("Pushing back record at iteration %s failed due to: {}", iterationId,
-						StringUtils.stringifyException(e));
-				throw e;
-			}
-			return false;
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4ffc8f5..286202f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -73,12 +73,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 	protected StreamingRuntimeContext headContext;
 
 	protected ClassLoader userClassLoader;
-
-	private EventListener<TaskEvent> superstepListener;
+	
+	private EventListener<TaskEvent> checkpointBarrierListener;
 
 	public StreamTask() {
 		streamOperator = null;
-		superstepListener = new SuperstepEventListener();
+		checkpointBarrierListener = new CheckpointBarrierListener();
 		contexts = new ArrayList<StreamingRuntimeContext>();
 	}
 
@@ -171,7 +171,9 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	protected void openOperator() throws Exception {
 		for (StreamOperator<?> operator : outputHandler.getChainedOperators()) {
-			operator.open(getTaskConfiguration());
+			if (operator != null) {
+				operator.open(getTaskConfiguration());
+			}
 		}
 	}
 
@@ -179,7 +181,10 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		// We need to close them first to last, since upstream operators in the chain might emit
 		// elements in their close methods.
 		for (int i = outputHandler.getChainedOperators().size()-1; i >= 0; i--) {
-			outputHandler.getChainedOperators().get(i).close();
+			StreamOperator<?> operator = outputHandler.getChainedOperators().get(i);
+			if (operator != null) {
+				operator.close();
+			}
 		}
 	}
 
@@ -194,8 +199,8 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 		this.isRunning = false;
 	}
 
-	public EventListener<TaskEvent> getSuperstepListener() {
-		return this.superstepListener;
+	public EventListener<TaskEvent> getCheckpointBarrierListener() {
+		return this.checkpointBarrierListener;
 	}
 
 	// ------------------------------------------------------------------------
@@ -305,12 +310,12 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs
 
 	// ------------------------------------------------------------------------
 
-	private class SuperstepEventListener implements EventListener<TaskEvent> {
+	private class CheckpointBarrierListener implements EventListener<TaskEvent> {
 
 		@Override
 		public void onEvent(TaskEvent event) {
 			try {
-				StreamingSuperstep sStep = (StreamingSuperstep) event;
+				CheckpointBarrier sStep = (CheckpointBarrier) event;
 				triggerCheckpoint(sStep.getId(), sStep.getTimestamp());
 			}
 			catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
index 7eff16e..7518124 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingRuntimeContext.java
@@ -39,6 +39,7 @@ import org.apache.flink.runtime.operators.util.TaskConfig;
 import org.apache.flink.runtime.state.StateHandleProvider;
 import org.apache.flink.streaming.api.state.PartitionedStreamOperatorState;
 import org.apache.flink.streaming.api.state.StreamOperatorState;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 
 /**
  * Implementation of the {@link RuntimeContext}, created by runtime stream UDF
@@ -167,10 +168,10 @@ public class StreamingRuntimeContext extends RuntimeUDFContext {
 	 *            Next input of the operator.
 	 */
 	@SuppressWarnings("unchecked")
-	public void setNextInput(Object nextRecord) {
+	public void setNextInput(StreamRecord<?> nextRecord) {
 		if (statePartitioner != null) {
 			for (PartitionedStreamOperatorState state : partitionedStates) {
-				state.setCurrentInput(nextRecord);
+				state.setCurrentInput(nextRecord.getValue());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
deleted file mode 100644
index f749773..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamingSuperstep.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import java.io.IOException;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.TaskEvent;
-
-public class StreamingSuperstep extends TaskEvent {
-
-	protected long id;
-	protected long timestamp;
-
-	public StreamingSuperstep() {}
-
-	public StreamingSuperstep(long id, long timestamp) {
-		this.id = id;
-		this.timestamp = timestamp;
-	}
-
-	public long getId() {
-		return id;
-	}
-
-	public long getTimestamp() {
-		return id;
-	}
-
-	// ------------------------------------------------------------------------
-	
-	@Override
-	public void write(DataOutputView out) throws IOException {
-		out.writeLong(id);
-		out.writeLong(timestamp);
-	}
-
-	@Override
-	public void read(DataInputView in) throws IOException {
-		id = in.readLong();
-		timestamp = in.readLong();
-	}
-	
-	// ------------------------------------------------------------------------
-
-	@Override
-	public int hashCode() {
-		return (int) (id ^ (id >>> 32) ^ timestamp ^(timestamp >>> 32));
-	}
-
-	@Override
-	public boolean equals(Object other) {
-		if (other == null || !(other instanceof StreamingSuperstep)) {
-			return false;
-		}
-		else {
-			StreamingSuperstep that = (StreamingSuperstep) other;
-			return that.id == this.id && that.timestamp == this.timestamp;
-		}
-	}
-
-	@Override
-	public String toString() {
-		return String.format("StreamingSuperstep %d @ %d", id, timestamp);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
index 2052877..507b813 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTask.java
@@ -21,15 +21,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.runtime.io.CoReaderIterator;
-import org.apache.flink.streaming.runtime.io.CoRecordReader;
-import org.apache.flink.streaming.runtime.io.InputGateFactory;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -37,11 +33,41 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 
 	private static final Logger LOG = LoggerFactory.getLogger(TwoInputStreamTask.class);
 
-	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
-	protected StreamRecordSerializer<IN2> inputDeserializer2 = null;
+	StreamTwoInputProcessor<IN1, IN2> inputProcessor;
 
-	CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>> coReader;
-	CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> coIter;
+	@Override
+	public void registerInputOutput() {
+		super.registerInputOutput();
+
+		TypeSerializer<IN1> inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
+		TypeSerializer<IN2> inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
+
+		int numberOfInputs = configuration.getNumberOfInputs();
+
+		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
+
+		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
+
+		for (int i = 0; i < numberOfInputs; i++) {
+			int inputType = inEdges.get(i).getTypeNumber();
+			InputGate reader = getEnvironment().getInputGate(i);
+			switch (inputType) {
+				case 1:
+					inputList1.add(reader);
+					break;
+				case 2:
+					inputList2.add(reader);
+					break;
+				default:
+					throw new RuntimeException("Invalid input type number: " + inputType);
+			}
+		}
+
+		inputProcessor = new StreamTwoInputProcessor<IN1, IN2>(inputList1, inputList2, inputDeserializer1, inputDeserializer2, getExecutionConfig().areTimestampsEnabled());
+
+		inputProcessor.registerTaskEventListener(getCheckpointBarrierListener(), CheckpointBarrier.class);
+	}
 
 	@Override
 	public void invoke() throws Exception {
@@ -58,38 +84,8 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 			openOperator();
 			operatorOpen = true;
 
-			int next;
-			StreamRecord<IN1> reuse1 = inputDeserializer1.createInstance();
-			StreamRecord<IN2> reuse2 = inputDeserializer2.createInstance();
-
-			while (isRunning) {
-				try {
-					next = coIter.next(reuse1, reuse2);
-				} catch (IOException e) {
-					if (isRunning) {
-						throw new RuntimeException("Could not read next record.", e);
-					} else {
-						// Task already cancelled do nothing
-						next = 0;
-					}
-				} catch (IllegalStateException e) {
-					if (isRunning) {
-						throw new RuntimeException("Could not read next record.", e);
-					} else {
-						// Task already cancelled do nothing
-						next = 0;
-					}
-				}
-
-				if (next == 0) {
-					break;
-				} else if (next == 1) {
-					streamOperator.processElement1(reuse1.getObject());
-					reuse1 = inputDeserializer1.createInstance();
-				} else {
-					streamOperator.processElement2(reuse2.getObject());
-					reuse2 = inputDeserializer2.createInstance();
-				}
+			while (inputProcessor.processInput(streamOperator)) {
+				// do nothing, just keep processing
 			}
 
 			closeOperator();
@@ -124,47 +120,9 @@ public class TwoInputStreamTask<IN1, IN2, OUT> extends StreamTask<OUT, TwoInputS
 	}
 
 	@Override
-	public void registerInputOutput() {
-		super.registerInputOutput();
-
-		inputDeserializer1 = configuration.getTypeSerializerIn1(userClassLoader);
-		inputDeserializer2 = configuration.getTypeSerializerIn2(userClassLoader);
-
-		int numberOfInputs = configuration.getNumberOfInputs();
-
-		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
-		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
-
-		List<StreamEdge> inEdges = configuration.getInPhysicalEdges(userClassLoader);
-
-		for (int i = 0; i < numberOfInputs; i++) {
-			int inputType = inEdges.get(i).getTypeNumber();
-			InputGate reader = getEnvironment().getInputGate(i);
-			switch (inputType) {
-				case 1:
-					inputList1.add(reader);
-					break;
-				case 2:
-					inputList2.add(reader);
-					break;
-				default:
-					throw new RuntimeException("Invalid input type number: " + inputType);
-			}
-		}
-
-		final InputGate reader1 = InputGateFactory.createInputGate(inputList1);
-		final InputGate reader2 = InputGateFactory.createInputGate(inputList2);
-
-		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
-				reader1, reader2);
-		coIter = new CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>>(coReader,
-				inputDeserializer1, inputDeserializer2);
-	}
-
-	@Override
 	public void clearBuffers() throws IOException {
 		super.clearBuffers();
-		coReader.clearBuffers();
-		coReader.cleanup();
+		inputProcessor.clearBuffers();
+		inputProcessor.cleanup();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
new file mode 100644
index 0000000..a20436a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/StreamTestSingleInputGate.java
@@ -0,0 +1,232 @@
+/*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+// We have it in this package because we could not mock the methods otherwise
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordSerializer;
+import org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.plugable.SerializationDelegate;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test {@link InputGate} that allows setting multiple channels. Use
+ * {@link #sendElement(Object, int)} to offer an element on a specific channel. Use
+ * {@link #sendEvent(AbstractEvent, int)} to offer an event on the specified channel. Use
+ * {@link #endInput()} to notify all channels of input end.
+ */
+public class StreamTestSingleInputGate<T> extends TestSingleInputGate {
+
+	private final int numInputChannels;
+
+	private final TestInputChannel[] inputChannels;
+
+	private final int bufferSize;
+
+	private TypeSerializer<T> serializer;
+
+	private ConcurrentLinkedQueue<InputValue<Object>>[] inputQueues;
+
+	public StreamTestSingleInputGate(
+			int numInputChannels,
+			int bufferSize,
+			TypeSerializer<T> serializer) throws IOException, InterruptedException {
+		super(numInputChannels, false);
+
+		this.bufferSize = bufferSize;
+		this.serializer = serializer;
+
+		this.numInputChannels = numInputChannels;
+		inputChannels = new TestInputChannel[numInputChannels];
+
+		inputQueues = new ConcurrentLinkedQueue[numInputChannels];
+
+		setupInputChannels();
+	}
+
+	@SuppressWarnings("unchecked")
+	private void setupInputChannels() throws IOException, InterruptedException {
+
+		for (int i = 0; i < numInputChannels; i++) {
+			final int channelIndex = i;
+			final RecordSerializer<SerializationDelegate<StreamRecord<T>>> recordSerializer = new SpanningRecordSerializer<SerializationDelegate<StreamRecord<T>>>();
+			final SerializationDelegate delegate = new SerializationDelegate(new MultiplexingStreamRecordSerializer<T>(serializer));
+
+			inputQueues[channelIndex] = new ConcurrentLinkedQueue<InputValue<Object>>();
+			inputChannels[channelIndex] = new TestInputChannel(inputGate, i);
+
+
+			final Answer<Buffer> answer = new Answer<Buffer>() {
+				@Override
+				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+					InputValue<Object> input = inputQueues[channelIndex].poll();
+					if (input != null && input.isStreamEnd()) {
+						when(inputChannels[channelIndex].getInputChannel().isReleased()).thenReturn(
+								true);
+						return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+					} else if (input != null && input.isStreamRecord()) {
+						Object inputElement = input.getStreamRecord();
+						final Buffer buffer = new Buffer(new MemorySegment(new byte[bufferSize]),
+								mock(BufferRecycler.class));
+						recordSerializer.setNextBuffer(buffer);
+						delegate.setInstance(inputElement);
+						recordSerializer.addRecord(delegate);
+
+						// Call getCurrentBuffer to ensure size is set
+						return recordSerializer.getCurrentBuffer();
+					} else if (input != null && input.isEvent()) {
+						AbstractEvent event = input.getEvent();
+						return EventSerializer.toBuffer(event);
+					} else {
+						synchronized (inputQueues[channelIndex]) {
+							inputQueues[channelIndex].wait();
+							return answer(invocationOnMock);
+						}
+					}
+				}
+			};
+
+			when(inputChannels[channelIndex].getInputChannel().getNextBuffer()).thenAnswer(answer);
+
+			inputGate.setInputChannel(new IntermediateResultPartitionID(),
+					inputChannels[channelIndex].getInputChannel());
+		}
+	}
+
+	public void sendElement(Object element, int channel) {
+		synchronized (inputQueues[channel]) {
+			inputQueues[channel].add(InputValue.element(element));
+			inputQueues[channel].notifyAll();
+		}
+		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+	}
+
+	public void sendEvent(AbstractEvent event, int channel) {
+		synchronized (inputQueues[channel]) {
+			inputQueues[channel].add(InputValue.event(event));
+			inputQueues[channel].notifyAll();
+		}
+		inputGate.onAvailableBuffer(inputChannels[channel].getInputChannel());
+	}
+
+	public void endInput() {
+		for (int i = 0; i < numInputChannels; i++) {
+			synchronized (inputQueues[i]) {
+				inputQueues[i].add(InputValue.streamEnd());
+				inputQueues[i].notifyAll();
+			}
+			inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
+		}
+	}
+
+	/**
+	 * Returns true iff all input queues are empty.
+	 */
+	public boolean allQueuesEmpty() {
+//		for (int i = 0; i < numInputChannels; i++) {
+//			synchronized (inputQueues[i]) {
+//				inputQueues[i].add(InputValue.<T>event(new DummyEvent()));
+//				inputQueues[i].notifyAll();
+//				inputGate.onAvailableBuffer(inputChannels[i].getInputChannel());
+//			}
+//		}
+
+		for (int i = 0; i < numInputChannels; i++) {
+			if (inputQueues[i].size() > 0) {
+				return false;
+			}
+		}
+		return true;
+	}
+
+	public static class InputValue<T> {
+		private Object elementOrEvent;
+		private boolean isStreamEnd;
+		private boolean isStreamRecord;
+		private boolean isEvent;
+
+		private InputValue(Object elementOrEvent, boolean isStreamEnd, boolean isEvent, boolean isStreamRecord) {
+			this.elementOrEvent = elementOrEvent;
+			this.isStreamEnd = isStreamEnd;
+			this.isStreamRecord = isStreamRecord;
+			this.isEvent = isEvent;
+		}
+
+		public static <X> InputValue<X> element(Object element) {
+			return new InputValue<X>(element, false, false, true);
+		}
+
+		public static <X> InputValue<X> streamEnd() {
+			return new InputValue<X>(null, true, false, false);
+		}
+
+		public static <X> InputValue<X> event(AbstractEvent event) {
+			return new InputValue<X>(event, false, true, false);
+		}
+
+		public Object getStreamRecord() {
+			return elementOrEvent;
+		}
+
+		public AbstractEvent getEvent() {
+			return (AbstractEvent) elementOrEvent;
+		}
+
+		public boolean isStreamEnd() {
+			return isStreamEnd;
+		}
+
+		public boolean isStreamRecord() {
+			return isStreamRecord;
+		}
+
+		public boolean isEvent() {
+			return isEvent;
+		}
+	}
+
+	public static class DummyEvent extends TaskEvent {
+		@Override
+		public void write(DataOutputView out) throws IOException {
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
deleted file mode 100644
index 118b23d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.collector;
-
-import static org.junit.Assert.assertArrayEquals;
-
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamtask.MockRecordWriter;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.MockRecordWriterFactory;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class StreamCollectorTest {
-
-	@Test
-	public void testCollect() {
-		MockRecordWriter recWriter = MockRecordWriterFactory.create();
-		SerializationDelegate<StreamRecord<Tuple1<Integer>>> sd = new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(
-				null);
-		sd.setInstance(new StreamRecord<Tuple1<Integer>>().setObject(new Tuple1<Integer>()));
-
-		Collector<Tuple1<Integer>> collector = new StreamOutput<Tuple1<Integer>>(recWriter, sd);
-		collector.collect(new Tuple1<Integer>(3));
-		collector.collect(new Tuple1<Integer>(4));
-		collector.collect(new Tuple1<Integer>(5));
-		collector.collect(new Tuple1<Integer>(6));
-
-		assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray());
-	}
-
-	@Test
-	public void testClose() {
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
index f241955..e4dadf0 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/functions/ListSourceContext.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.functions;
 
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
 
 import java.util.List;
 
@@ -60,7 +61,22 @@ public class ListSourceContext<T> implements SourceFunction.SourceContext<T> {
 	}
 
 	@Override
+	public void collectWithTimestamp(T element, long timestamp) {
+		target.add(element);
+	}
+
+	@Override
+	public void emitWatermark(Watermark mark) {
+		// don't do anything
+	}
+
+	@Override
 	public Object getCheckpointLock() {
 		return lock;
 	}
+
+	@Override
+	public void close() {
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
deleted file mode 100644
index dbbde29..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/CounterTest.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.operators.StreamCounter;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class CounterTest {
-
-	@Test
-	public void counterTest() {
-		StreamCounter<String> operator = new StreamCounter<String>();
-
-		List<Long> expected = Arrays.asList(1L, 2L, 3L);
-		List<Long> actual = MockContext.createAndExecute(operator, Arrays.asList("one", "two", "three"));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
deleted file mode 100644
index ebde006..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FilterTest.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.operators.StreamFilter;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class FilterTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	static class MyFilter implements FilterFunction<Integer> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Integer value) throws Exception {
-			return value % 2 == 0;
-		}
-	}
-
-	@Test 
-	public void test() {
-		StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
-
-		List<Integer> expected = Arrays.asList(2, 4, 6);
-		List<Integer> actual = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
deleted file mode 100644
index 7f914dd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/FlatMapTest.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.operators.StreamFlatMap;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class FlatMapTest {
-
-	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
-			if (value % 2 == 0) {
-				out.collect(value);
-				out.collect(value * value);
-			}
-		}
-	}
-
-	@Test
-	public void flatMapTest() {
-		StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new MyFlatMap());
-		
-		List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
-		List<Integer> actual = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
-		
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
deleted file mode 100644
index 7a45035..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedFoldTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.operators.StreamGroupedFold;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedFoldTest {
-
-	private static class MyFolder implements FoldFunction<Integer, String> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String fold(String accumulator, Integer value) throws Exception {
-			return accumulator + value.toString();
-		}
-
-	}
-
-	@Test
-	public void test() {
-		TypeInformation<String> outType = TypeExtractor.getForObject("A string");
-
-		StreamGroupedFold<Integer, String> operator1 = new StreamGroupedFold<Integer, String>(
-				new MyFolder(), new KeySelector<Integer, String>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public String getKey(Integer value) throws Exception {
-				return value.toString();
-			}
-		}, "100", outType);
-
-		List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
-		List<String> actual = MockContext.createAndExecute(operator1,
-				Arrays.asList(1, 1, 2, 2, 3));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
deleted file mode 100644
index b9e9717..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/GroupedReduceTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedReduceTest {
-
-	private static class MyReducer implements ReduceFunction<Integer> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Integer reduce(Integer value1, Integer value2) throws Exception {
-			return value1 + value2;
-		}
-
-	}
-
-	@Test
-	public void test() {
-		StreamGroupedReduce<Integer> operator1 = new StreamGroupedReduce<Integer>(
-				new MyReducer(), new KeySelector<Integer, Integer>() {
-
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Integer value) throws Exception {
-						return value;
-					}
-				});
-
-		List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
-		List<Integer> actual = MockContext.createAndExecute(operator1,
-				Arrays.asList(1, 1, 2, 2, 3));
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
deleted file mode 100644
index 394b5a4..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/MapTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.operators.StreamMap;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class MapTest {
-
-	private static class Map implements MapFunction<Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map(Integer value) throws Exception {
-			return "+" + (value + 1);
-		}
-	}
-	
-	@Test
-	public void mapTest() {
-		StreamMap<Integer, String> operator = new StreamMap<Integer, String>(new Map());
-		
-		List<String> expectedList = Arrays.asList("+2", "+3", "+4");
-		List<String> actualList = MockContext.createAndExecute(operator, Arrays.asList(1, 2, 3));
-		
-		assertEquals(expectedList, actualList);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
deleted file mode 100644
index d9cc607..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/ProjectTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.operators;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.api.java.typeutils.runtime.TupleSerializer;
-import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class ProjectTest implements Serializable {
-	private static final long serialVersionUID = 1L;
-
-	@Test
-	public void operatorTest() {
-
-		TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
-				.getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
-
-		int[] fields = new int[]{4, 4, 3};
-
-		TupleSerializer<Tuple3<Integer, Integer, String>> serializer =
-				new TupleTypeInfo<Tuple3<Integer, Integer, String>>(StreamProjection.extractFieldTypes(fields, inType))
-						.createSerializer(new ExecutionConfig());
-		@SuppressWarnings("unchecked")
-		StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> operator =
-				new StreamProject<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
-						fields, serializer);
-
-		List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer,
-				String, Integer>>();
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
-		input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
-
-		List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
-		expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
-		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
-		expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
-		expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
-
-		assertEquals(expected, MockContext.createAndExecute(operator, input));
-	}
-
-
-	// tests using projection from the API without explicitly specifying the types
-	private static final long MEMORY_SIZE = 32;
-	private static HashSet<Tuple2<Long, Double>> expected = new HashSet<Tuple2<Long, Double>>();
-	private static HashSet<Tuple2<Long, Double>> actual = new HashSet<Tuple2<Long, Double>>();
-
-	@Test
-	public void APIWithoutTypesTest() {
-
-		for (Long i = 1L; i < 11L; i++) {
-			expected.add(new Tuple2<Long, Double>(i, i.doubleValue()));
-		}
-
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
-
-		env.generateSequence(1, 10).map(new MapFunction<Long, Tuple3<Long, Character, Double>>() {
-				@Override
-				public Tuple3<Long, Character, Double> map(Long value) throws Exception {
-					return new Tuple3<Long, Character, Double>(value, 'c', value.doubleValue());
-				}
-			})
-			.project(0, 2)
-			.addSink(new SinkFunction<Tuple>() {
-				@Override
-				@SuppressWarnings("unchecked")
-				public void invoke(Tuple value) throws Exception {
-					actual.add( (Tuple2<Long,Double>) value);
-				}
-			});
-
-		try {
-			env.execute();
-		} catch (Exception e) {
-			fail(e.getMessage());
-		}
-
-		assertEquals(expected, actual);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
new file mode 100644
index 0000000..3e662ba
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamCounterTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamCounter}. These test that:
+ *
+ * <ul>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamCounterTest {
+
+	@Test
+	public void testCount() throws Exception {
+		StreamCounter<String> operator = new StreamCounter<String>();
+
+		OneInputStreamOperatorTestHarness<String, Long> testHarness = new OneInputStreamOperatorTestHarness<String, Long>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<String>("eins", initialTime + 1));
+		testHarness.processElement(new StreamRecord<String>("zwei", initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<String>("drei", initialTime + 3));
+
+		expectedOutput.add(new StreamRecord<Long>(1L, initialTime + 1));
+		expectedOutput.add(new StreamRecord<Long>(2L, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<Long>(3L, initialTime + 3));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
new file mode 100644
index 0000000..f672a89
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFilterTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.RichFilterFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamFilter}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamFilterTest {
+
+	static class MyFilter implements FilterFunction<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public boolean filter(Integer value) throws Exception {
+			return value % 2 == 0;
+		}
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testFilter() throws Exception {
+		StreamFilter<Integer> operator = new StreamFilter<Integer>(new MyFilter());
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
+		testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
+		testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
+		testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
+		testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
+
+		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
+		expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		StreamFilter<String> operator = new StreamFilter<String>(new TestOpenCloseFilterFunction());
+
+		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<String>("fooHello", initialTime));
+		testHarness.processElement(new StreamRecord<String>("bar", initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFilterFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseFilterFunction extends RichFilterFunction<String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public boolean filter(String value) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			return value.startsWith("foo");
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
new file mode 100644
index 0000000..ac7caa7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/StreamFlatMapTest.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.Collector;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Tests for {@link StreamMap}. These test that:
+ *
+ * <ul>
+ *     <li>RichFunction methods are called correctly</li>
+ *     <li>Timestamps of processed elements match the input timestamp</li>
+ *     <li>Watermarks are correctly forwarded</li>
+ * </ul>
+ */
+public class StreamFlatMapTest {
+
+	public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+			if (value % 2 == 0) {
+				out.collect(value);
+				out.collect(value * value);
+			}
+		}
+	}
+
+	@Test
+	public void testFlatMap() throws Exception {
+		StreamFlatMap<Integer, Integer> operator = new StreamFlatMap<Integer, Integer>(new MyFlatMap());
+
+		OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<Integer, Integer>(operator);
+
+		long initialTime = 0L;
+		ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<Integer>(1, initialTime + 1));
+		testHarness.processElement(new StreamRecord<Integer>(2, initialTime + 2));
+		testHarness.processWatermark(new Watermark(initialTime + 2));
+		testHarness.processElement(new StreamRecord<Integer>(3, initialTime + 3));
+		testHarness.processElement(new StreamRecord<Integer>(4, initialTime + 4));
+		testHarness.processElement(new StreamRecord<Integer>(5, initialTime + 5));
+		testHarness.processElement(new StreamRecord<Integer>(6, initialTime + 6));
+		testHarness.processElement(new StreamRecord<Integer>(7, initialTime + 7));
+		testHarness.processElement(new StreamRecord<Integer>(8, initialTime + 8));
+
+		expectedOutput.add(new StreamRecord<Integer>(2, initialTime + 2));
+		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 2));
+		expectedOutput.add(new Watermark(initialTime + 2));
+		expectedOutput.add(new StreamRecord<Integer>(4, initialTime + 4));
+		expectedOutput.add(new StreamRecord<Integer>(16, initialTime + 4));
+		expectedOutput.add(new StreamRecord<Integer>(6, initialTime + 6));
+		expectedOutput.add(new StreamRecord<Integer>(36, initialTime + 6));
+		expectedOutput.add(new StreamRecord<Integer>(8, initialTime + 8));
+		expectedOutput.add(new StreamRecord<Integer>(64, initialTime + 8));
+
+		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+	}
+
+	@Test
+	public void testOpenClose() throws Exception {
+		StreamFlatMap<String, String> operator = new StreamFlatMap<String, String>(new TestOpenCloseFlatMapFunction());
+
+		OneInputStreamOperatorTestHarness<String, String> testHarness = new OneInputStreamOperatorTestHarness<String, String>(operator);
+
+		long initialTime = 0L;
+
+		testHarness.open();
+
+		testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
+
+		testHarness.close();
+
+		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseFlatMapFunction.closeCalled);
+		Assert.assertTrue("Output contains no elements.", testHarness.getOutput().size() > 0);
+	}
+
+	// This must only be used in one test, otherwise the static fields will be changed
+	// by several tests concurrently
+	private static class TestOpenCloseFlatMapFunction extends RichFlatMapFunction<String, String> {
+		private static final long serialVersionUID = 1L;
+
+		public static boolean openCalled = false;
+		public static boolean closeCalled = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+			if (closeCalled) {
+				Assert.fail("Close called before open.");
+			}
+			openCalled = true;
+		}
+
+		@Override
+		public void close() throws Exception {
+			super.close();
+			if (!openCalled) {
+				Assert.fail("Open was not called before close.");
+			}
+			closeCalled = true;
+		}
+
+		@Override
+		public void flatMap(String value, Collector<String> out) throws Exception {
+			if (!openCalled) {
+				Assert.fail("Open was not called before run.");
+			}
+			out.collect(value);
+		}
+	}
+}


Mime
View raw message