flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/3] flink git commit: [FLINK-1505] [distributed runtime] Separate reader API from result consumption
Date Mon, 23 Feb 2015 09:23:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master a911559a1 -> 5232c56b1


http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
deleted file mode 100644
index 1cfe75d..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/MockProducer.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.partition;
-
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferPool;
-import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
-public class MockProducer implements Callable<Boolean> {
-
-	private static final int SLEEP_TIME_MS = 20;
-
-	private final IntermediateResultPartitionQueue queue;
-
-	private final BufferPool bufferPool;
-
-	private final int numBuffersToProduce;
-
-	private final boolean slowProducer;
-
-	private final AtomicInteger discardAfter = new AtomicInteger(Integer.MAX_VALUE);
-
-	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
-
-	public MockProducer(IntermediateResultPartitionQueue queue, BufferPool bufferPool, int numBuffersToProduce, boolean slowProducer) {
-		this.queue = queue;
-		this.bufferPool = bufferPool;
-		this.numBuffersToProduce = numBuffersToProduce;
-		this.slowProducer = slowProducer;
-	}
-
-	@Override
-	public Boolean call() throws Exception {
-		try {
-			int currentNumber = 0;
-
-			for (int i = 0; i < numBuffersToProduce; i++) {
-				if (i >= discardAfter.get()) {
-					queue.discard();
-					return true;
-				}
-
-				Buffer buffer = bufferPool.requestBufferBlocking();
-
-				currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
-
-				queue.add(buffer);
-
-				if (slowProducer) {
-					Thread.sleep(SLEEP_TIME_MS);
-				}
-			}
-
-			queue.finish();
-		}
-		catch (Throwable t) {
-			error.compareAndSet(null, t);
-			return false;
-		}
-
-		return true;
-	}
-
-	void discard() {
-		discardAfter.set(0);
-	}
-
-	public void discardAfter(int numBuffers) {
-		discardAfter.set(numBuffers);
-	}
-
-	public Throwable getError() {
-		return error.get();
-	}
-
-	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
-		MemorySegment segment = buffer.getMemorySegment();
-
-		for (int i = 4; i < segment.size(); i += 4) {
-			segment.putInt(i, currentNumber++);
-		}
-
-		return currentNumber;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
new file mode 100644
index 0000000..e5c87e9
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import com.google.common.base.Optional;
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
+import org.apache.flink.runtime.io.network.util.TestTaskEvent;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.junit.Test;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class SingleInputGateTest {
+
+	@Test
+	public void testBackwardsEventWithUninitializedChannel() throws Exception {
+		// Setup environment
+		final TaskEventDispatcher taskEventDispatcher = mock(TaskEventDispatcher.class);
+		when(taskEventDispatcher.publish(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), any(TaskEvent.class))).thenReturn(true);
+
+		final IntermediateResultPartitionQueueIterator iterator = mock(IntermediateResultPartitionQueueIterator.class);
+		when(iterator.getNextBuffer()).thenReturn(new Buffer(new MemorySegment(new byte[1024]), mock(BufferRecycler.class)));
+
+		final IntermediateResultPartitionManager partitionManager = mock(IntermediateResultPartitionManager.class);
+		when(partitionManager.getIntermediateResultPartitionIterator(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), anyInt(), any(Optional.class))).thenReturn(iterator);
+
+		// Setup reader with one local and one unknown input channel
+		final IntermediateDataSetID resultId = new IntermediateDataSetID();
+
+		final SingleInputGate inputGate = new SingleInputGate(resultId, 0, 2);
+		final BufferPool bufferPool = mock(BufferPool.class);
+		when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
+
+		inputGate.setBufferPool(bufferPool);
+
+		// Local
+		ExecutionAttemptID localProducer = new ExecutionAttemptID();
+		IntermediateResultPartitionID localPartitionId = new IntermediateResultPartitionID();
+
+		InputChannel local = new LocalInputChannel(inputGate, 0, localProducer, localPartitionId, partitionManager, taskEventDispatcher);
+
+		// Unknown
+		ExecutionAttemptID unknownProducer = new ExecutionAttemptID();
+		IntermediateResultPartitionID unknownPartitionId = new IntermediateResultPartitionID();
+
+		InputChannel unknown = new UnknownInputChannel(inputGate, 1, unknownProducer, unknownPartitionId, partitionManager, taskEventDispatcher, mock(ConnectionManager.class));
+
+		// Set channels
+		inputGate.setInputChannel(localPartitionId, local);
+		inputGate.setInputChannel(unknownPartitionId, unknown);
+
+		// Request partitions
+		inputGate.requestPartitions();
+
+		// Only the local channel can request
+		verify(partitionManager, times(1)).getIntermediateResultPartitionIterator(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), anyInt(), any(Optional.class));
+
+		// Send event backwards and initialize unknown channel afterwards
+		final TaskEvent event = new TestTaskEvent();
+		inputGate.sendTaskEvent(event);
+
+		// Only the local channel can send out the event
+		verify(taskEventDispatcher, times(1)).publish(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), any(TaskEvent.class));
+
+		// After the update, the pending event should be send to local channel
+		inputGate.updateInputChannel(new PartitionInfo(unknownPartitionId, unknownProducer, PartitionInfo.PartitionLocation.LOCAL, null));
+
+		verify(partitionManager, times(2)).getIntermediateResultPartitionIterator(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), anyInt(), any(Optional.class));
+		verify(taskEventDispatcher, times(2)).publish(any(ExecutionAttemptID.class), any(IntermediateResultPartitionID.class), any(TaskEvent.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
new file mode 100644
index 0000000..c1db44d
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.consumer;
+
+import org.apache.flink.runtime.io.network.util.MockInputChannel;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class UnionInputGateTest {
+
+	@Test
+	public void testChannelMapping() throws Exception {
+
+		final SingleInputGate ig1 = new SingleInputGate(new IntermediateDataSetID(), 0, 3);
+		final SingleInputGate ig2 = new SingleInputGate(new IntermediateDataSetID(), 0, 5);
+
+		final UnionInputGate union = new UnionInputGate(new SingleInputGate[]{ig1, ig2});
+
+		assertEquals(ig1.getNumberOfInputChannels() + ig2.getNumberOfInputChannels(), union.getNumberOfInputChannels());
+
+		final MockInputChannel[][] inputChannels = new MockInputChannel[][]{
+				MockInputChannel.createInputChannels(ig1, 3),
+				MockInputChannel.createInputChannels(ig2, 5)
+		};
+
+		inputChannels[0][0].readBuffer(); // 0 => 0
+		inputChannels[1][2].readBuffer(); // 2 => 5
+		inputChannels[1][0].readBuffer(); // 0 => 3
+		inputChannels[1][1].readBuffer(); // 1 => 4
+		inputChannels[0][1].readBuffer(); // 1 => 1
+		inputChannels[1][3].readBuffer(); // 3 => 6
+		inputChannels[0][2].readBuffer(); // 1 => 2
+		inputChannels[1][4].readBuffer(); // 4 => 7
+
+		assertEquals(0, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(5, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(3, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(4, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(1, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(6, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(2, union.getNextBufferOrEvent().getChannelIndex());
+		assertEquals(7, union.getNextBufferOrEvent().getChannelIndex());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
index e85a7f7..118b00b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/queue/PipelinedPartitionQueueTest.java
@@ -23,9 +23,9 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.partition.MockConsumer;
-import org.apache.flink.runtime.io.network.partition.MockNotificationListener;
-import org.apache.flink.runtime.io.network.partition.MockProducer;
+import org.apache.flink.runtime.io.network.util.MockConsumer;
+import org.apache.flink.runtime.io.network.util.MockNotificationListener;
+import org.apache.flink.runtime.io.network.util.MockProducer;
 import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator.AlreadySubscribedException;
 import org.apache.flink.runtime.util.event.NotificationListener;
 import org.junit.Before;

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
new file mode 100644
index 0000000..62375a6
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockConsumer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueueIterator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MockConsumer implements Callable<Boolean> {
+
+	private static final int SLEEP_TIME_MS = 20;
+
+	private final IntermediateResultPartitionQueueIterator iterator;
+
+	private final boolean slowConsumer;
+
+	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+	public MockConsumer(IntermediateResultPartitionQueueIterator iterator, boolean slowConsumer) {
+		this.iterator = iterator;
+		this.slowConsumer = slowConsumer;
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+		MockNotificationListener listener = new MockNotificationListener();
+
+		int currentNumber = 0;
+
+		try {
+			while (true) {
+				Buffer buffer = iterator.getNextBuffer();
+
+				if (slowConsumer) {
+					Thread.sleep(SLEEP_TIME_MS);
+				}
+
+				if (buffer == null) {
+					if (iterator.subscribe(listener)) {
+						listener.waitForNotification();
+					}
+					else if (iterator.isConsumed()) {
+						break;
+					}
+				}
+				else {
+					try {
+						if (buffer.isBuffer()) {
+							currentNumber = verifyBufferFilledWithAscendingNumbers(buffer, currentNumber);
+						}
+					}
+					finally {
+						buffer.recycle();
+					}
+				}
+			}
+		}
+		catch (Throwable t) {
+			error.compareAndSet(null, t);
+			return false;
+		}
+
+		return true;
+	}
+
+	public Throwable getError() {
+		return error.get();
+	}
+
+	private int verifyBufferFilledWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		for (int i = 4; i < segment.size(); i += 4) {
+			if (segment.getInt(i) != currentNumber++) {
+				throw new IllegalStateException("Read unexpected number from buffer.");
+			}
+		}
+
+		return currentNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
new file mode 100644
index 0000000..301169a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockInputChannel.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.stubbing.OngoingStubbing;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * A mocked input channel.
+ */
+public class MockInputChannel {
+
+	private final InputChannel mock = Mockito.mock(InputChannel.class);
+
+	private final SingleInputGate inputGate;
+
+	// Abusing Mockito here... ;)
+	protected OngoingStubbing<Buffer> stubbing;
+
+	public MockInputChannel(SingleInputGate inputGate, int channelIndex) {
+		checkArgument(channelIndex >= 0);
+		this.inputGate = checkNotNull(inputGate);
+
+		when(mock.getChannelIndex()).thenReturn(channelIndex);
+	}
+
+	public MockInputChannel read(Buffer buffer) throws IOException {
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenReturn(buffer);
+		}
+		else {
+			stubbing = stubbing.thenReturn(buffer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public MockInputChannel readBuffer() throws IOException {
+		final Buffer buffer = mock(Buffer.class);
+		when(buffer.isBuffer()).thenReturn(true);
+
+		return read(buffer);
+	}
+
+	public MockInputChannel readEvent() throws IOException {
+		return read(EventSerializer.toBuffer(new TestTaskEvent()));
+	}
+
+	public MockInputChannel readEndOfSuperstepEvent() throws IOException {
+		return read(EventSerializer.toBuffer(EndOfSuperstepEvent.INSTANCE));
+	}
+
+	public MockInputChannel readEndOfPartitionEvent() throws IOException {
+		final Answer<Buffer> answer = new Answer<Buffer>() {
+			@Override
+			public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
+				// Return true after finishing
+				when(mock.isReleased()).thenReturn(true);
+
+				return EventSerializer.toBuffer(EndOfPartitionEvent.INSTANCE);
+			}
+		};
+
+		if (stubbing == null) {
+			stubbing = when(mock.getNextBuffer()).thenAnswer(answer);
+		}
+		else {
+			stubbing = stubbing.thenAnswer(answer);
+		}
+
+		inputGate.onAvailableBuffer(mock);
+
+		return this;
+	}
+
+	public InputChannel getInputChannel() {
+		return mock;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static MockInputChannel[] createInputChannels(SingleInputGate inputGate, int numberOfInputChannels) {
+		checkNotNull(inputGate);
+		checkArgument(numberOfInputChannels > 0);
+
+		MockInputChannel[] mocks = new MockInputChannel[numberOfInputChannels];
+
+		for (int i = 0; i < numberOfInputChannels; i++) {
+			mocks[i] = new MockInputChannel(inputGate, i);
+
+			inputGate.setInputChannel(new IntermediateResultPartitionID(), mocks[i].getInputChannel());
+		}
+
+		return mocks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
new file mode 100644
index 0000000..56e0025
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockNotificationListener.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.util.event.NotificationListener;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MockNotificationListener implements NotificationListener {
+
+	final AtomicInteger numNotifications = new AtomicInteger();
+
+	@Override
+	public void onNotification() {
+		synchronized (numNotifications) {
+			numNotifications.incrementAndGet();
+
+			numNotifications.notifyAll();
+		}
+	}
+
+	public void waitForNotification() throws InterruptedException {
+
+		int current = numNotifications.get();
+
+		synchronized (numNotifications) {
+			while (current == numNotifications.get()) {
+				numNotifications.wait();
+			}
+		}
+	}
+
+	public int getNumberOfNotifications() {
+		return numNotifications.get();
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
new file mode 100644
index 0000000..44d8ffe
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockProducer.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.partition.queue.IntermediateResultPartitionQueue;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class MockProducer implements Callable<Boolean> {
+
+	private static final int SLEEP_TIME_MS = 20;
+
+	private final IntermediateResultPartitionQueue queue;
+
+	private final BufferPool bufferPool;
+
+	private final int numBuffersToProduce;
+
+	private final boolean slowProducer;
+
+	private final AtomicInteger discardAfter = new AtomicInteger(Integer.MAX_VALUE);
+
+	private final AtomicReference<Throwable> error = new AtomicReference<Throwable>();
+
+	public MockProducer(IntermediateResultPartitionQueue queue, BufferPool bufferPool, int numBuffersToProduce, boolean slowProducer) {
+		this.queue = queue;
+		this.bufferPool = bufferPool;
+		this.numBuffersToProduce = numBuffersToProduce;
+		this.slowProducer = slowProducer;
+	}
+
+	@Override
+	public Boolean call() throws Exception {
+		try {
+			int currentNumber = 0;
+
+			for (int i = 0; i < numBuffersToProduce; i++) {
+				if (i >= discardAfter.get()) {
+					queue.discard();
+					return true;
+				}
+
+				Buffer buffer = bufferPool.requestBufferBlocking();
+
+				currentNumber = fillBufferWithAscendingNumbers(buffer, currentNumber);
+
+				queue.add(buffer);
+
+				if (slowProducer) {
+					Thread.sleep(SLEEP_TIME_MS);
+				}
+			}
+
+			queue.finish();
+		}
+		catch (Throwable t) {
+			error.compareAndSet(null, t);
+			return false;
+		}
+
+		return true;
+	}
+
+	void discard() {
+		discardAfter.set(0);
+	}
+
+	public void discardAfter(int numBuffers) {
+		discardAfter.set(numBuffers);
+	}
+
+	public Throwable getError() {
+		return error.get();
+	}
+
+	public static int fillBufferWithAscendingNumbers(Buffer buffer, int currentNumber) {
+		MemorySegment segment = buffer.getMemorySegment();
+
+		for (int i = 4; i < segment.size(); i += 4) {
+			segment.putInt(i, currentNumber++);
+		}
+
+		return currentNumber;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
new file mode 100644
index 0000000..3c708ac
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/MockSingleInputGate.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static org.mockito.Mockito.spy;
+
+public class MockSingleInputGate {
+
+	protected final SingleInputGate inputGate;
+
+	protected final MockInputChannel[] inputChannels;
+
+	public MockSingleInputGate(int numberOfInputChannels) {
+		this(numberOfInputChannels, true);
+	}
+
+	public MockSingleInputGate(int numberOfInputChannels, boolean initialize) {
+		checkArgument(numberOfInputChannels >= 1);
+
+		this.inputGate = spy(new SingleInputGate(new IntermediateDataSetID(), 0, numberOfInputChannels));
+
+		this.inputChannels = new MockInputChannel[numberOfInputChannels];
+
+		if (initialize) {
+			for (int i = 0; i < numberOfInputChannels; i++) {
+				inputChannels[i] = new MockInputChannel(inputGate, i);
+				inputGate.setInputChannel(new IntermediateResultPartitionID(), inputChannels[i].getInputChannel());
+			}
+		}
+	}
+
+	public MockSingleInputGate read(Buffer buffer, int channelIndex) throws IOException {
+		checkElementIndex(channelIndex, inputGate.getNumberOfInputChannels());
+
+		inputChannels[channelIndex].read(buffer);
+
+		return this;
+	}
+
+	public MockSingleInputGate readBuffer() throws IOException {
+		return readBuffer(0);
+	}
+
+	public MockSingleInputGate readBuffer(int channelIndex) throws IOException {
+		inputChannels[channelIndex].readBuffer();
+
+		return this;
+	}
+
+	public MockSingleInputGate readEvent() throws IOException {
+		return readEvent(0);
+	}
+
+	public MockSingleInputGate readEvent(int channelIndex) throws IOException {
+		inputChannels[channelIndex].readEvent();
+
+		return this;
+	}
+
+	public MockSingleInputGate readEndOfSuperstepEvent() throws IOException {
+		for (MockInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfSuperstepEvent();
+		}
+
+		return this;
+	}
+
+	public MockSingleInputGate readEndOfSuperstepEvent(int channelIndex) throws IOException {
+		inputChannels[channelIndex].readEndOfSuperstepEvent();
+
+		return this;
+	}
+
+	public MockSingleInputGate readEndOfPartitionEvent() throws IOException {
+		for (MockInputChannel inputChannel : inputChannels) {
+			inputChannel.readEndOfPartitionEvent();
+		}
+
+		return this;
+	}
+
+	public MockSingleInputGate readEndOfPartitionEvent(int channelIndex) throws IOException {
+		inputChannels[channelIndex].readEndOfPartitionEvent();
+
+		return this;
+	}
+
+	public SingleInputGate getInputGate() {
+		return inputGate;
+	}
+
+	// ------------------------------------------------------------------------
+
+	public List<Integer> readAllChannels() throws IOException {
+		final List<Integer> readOrder = new ArrayList<Integer>(inputChannels.length);
+
+		for (int i = 0; i < inputChannels.length; i++) {
+			readOrder.add(i);
+		}
+
+		Collections.shuffle(readOrder);
+
+		for (int channelIndex : readOrder) {
+			inputChannels[channelIndex].readBuffer();
+		}
+
+		return readOrder;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
new file mode 100644
index 0000000..4f547aa
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/util/TestTaskEvent.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.util;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.TaskEvent;
+
+import java.io.IOException;
+
+public class TestTaskEvent extends TaskEvent {
+
+	private double val0;
+
+	private long val1;
+
+	public TestTaskEvent() {
+		this(0, 0);
+	}
+
+	public TestTaskEvent(double val0, long val1) {
+		this.val0 = val0;
+		this.val1 = val1;
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		out.writeDouble(val0);
+		out.writeLong(val1);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		val0 = in.readDouble();
+		val1 = in.readLong();
+	}
+
+	@Override
+	public boolean equals(Object obj) {
+		if (obj instanceof TestTaskEvent) {
+			TestTaskEvent other = (TestTaskEvent) obj;
+
+			return val0 == other.val0 && val1 == other.val1;
+		}
+
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
index 9ce0d50..1a4b7f1 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
@@ -21,7 +21,7 @@ package org.apache.flink.runtime.operators;
 import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
 import org.apache.flink.api.java.record.io.DelimitedOutputFormat;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
 import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
 import org.apache.flink.runtime.operators.testutils.InfiniteInputIterator;
 import org.apache.flink.runtime.operators.testutils.TaskCancelThread;
@@ -144,7 +144,7 @@ public class DataSinkTaskTest extends TaskTestBase
 
 		super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
 
-		MockIteratorBufferReader<?>[] readers = new MockIteratorBufferReader[4];
+		IteratorWrappingMockSingleInputGate<?>[] readers = new IteratorWrappingMockSingleInputGate[4];
 		readers[0] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, 0, 0, false), 0, false);
 		readers[1] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt, 0, false), 0, false);
 		readers[2] = super.addInput(new UniformRecordGenerator(keyCnt, valCnt, keyCnt * 2, 0, false), 0, false);
@@ -157,7 +157,7 @@ public class DataSinkTaskTest extends TaskTestBase
 		try {
 			// For the union reader to work, we need to start notifications *after* the union reader
 			// has been initialized.
-			for (MockIteratorBufferReader<?> reader : readers) {
+			for (IteratorWrappingMockSingleInputGate<?> reader : readers) {
 				reader.read();
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
index 7aab050..7fb13e3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/MockEnvironment.java
@@ -27,14 +27,14 @@ import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
 import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -69,7 +69,7 @@ public class MockEnvironment implements Environment {
 
 	private final Configuration taskConfiguration;
 
-	private final List<BufferReader> inputs;
+	private final List<InputGate> inputs;
 
 	private final List<BufferWriter> outputs;
 
@@ -82,7 +82,7 @@ public class MockEnvironment implements Environment {
 	public MockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.inputs = new LinkedList<BufferReader>();
+		this.inputs = new LinkedList<InputGate>();
 		this.outputs = new LinkedList<BufferWriter>();
 
 		this.memManager = new DefaultMemoryManager(memorySize, 1);
@@ -91,11 +91,11 @@ public class MockEnvironment implements Environment {
 		this.bufferSize = bufferSize;
 	}
 
-	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> inputIterator) {
+	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
 		try {
-			final MockIteratorBufferReader<Record> reader = new MockIteratorBufferReader<Record>(bufferSize, Record.class, inputIterator);
+			final IteratorWrappingMockSingleInputGate<Record> reader = new IteratorWrappingMockSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
 
-			inputs.add(reader.getMock());
+			inputs.add(reader.getInputGate());
 
 			return reader;
 		}
@@ -235,13 +235,15 @@ public class MockEnvironment implements Environment {
 	}
 
 	@Override
-	public BufferReader getReader(int index) {
+	public InputGate getInputGate(int index) {
 		return inputs.get(index);
 	}
 
 	@Override
-	public BufferReader[] getAllReaders() {
-		return inputs.toArray(new BufferReader[inputs.size()]);
+	public InputGate[] getAllInputGates() {
+		InputGate[] gates = new InputGate[inputs.size()];
+		inputs.toArray(gates);
+		return gates;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
index 23cb23b..e0776d9 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/TaskTestBase.java
@@ -27,7 +27,7 @@ import org.apache.flink.api.java.record.io.FileOutputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileSystem.WriteMode;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.io.network.api.reader.MockIteratorBufferReader;
+import org.apache.flink.runtime.io.network.api.reader.IteratorWrappingMockSingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 import org.apache.flink.runtime.operators.PactDriver;
@@ -55,14 +55,14 @@ public abstract class TaskTestBase {
 		this.mockEnv = new MockEnvironment(this.memorySize, this.inputSplitProvider, bufferSize);
 	}
 
-	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
-		final MockIteratorBufferReader<Record> reader = addInput(input, groupId, true);
+	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
+		final IteratorWrappingMockSingleInputGate<Record> reader = addInput(input, groupId, true);
 
 		return reader;
 	}
 
-	public MockIteratorBufferReader<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
-		final MockIteratorBufferReader<Record> reader = this.mockEnv.addInput(input);
+	public IteratorWrappingMockSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
+		final IteratorWrappingMockSingleInputGate<Record> reader = this.mockEnv.addInput(input);
 		TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
 		conf.addInputToGroup(groupId);
 		conf.setInputSerializer(RecordSerializerFactory.get(), groupId);

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
deleted file mode 100644
index 625b93f..0000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/EventNotificationHandlerTest.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.util.event;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.event.task.IntegerTaskEvent;
-import org.apache.flink.runtime.event.task.StringTaskEvent;
-import org.junit.Test;
-
-/**
- * This class contains unit tests for the {@link EventNotificationHandler}.
- * 
- */
-public class EventNotificationHandlerTest {
-	/**
-	 * A test implementation of an {@link EventListener}.
-	 * 
-	 */
-	private static class TestEventListener implements EventListener<TaskEvent> {
-
-		/**
-		 * The event that was last received by this event listener.
-		 */
-		private TaskEvent receivedEvent = null;
-
-		/**
-		 * {@inheritDoc}
-		 * @param event
-		 */
-		@Override
-		public void onEvent(TaskEvent event) {
-
-			this.receivedEvent = event;
-		}
-
-		/**
-		 * Returns the event which was last received by this event listener. If no event
-		 * has been received so far the return value is <code>null</code>.
-		 * 
-		 * @return the event which was last received, possibly <code>null</code>
-		 */
-		public TaskEvent getLastReceivedEvent() {
-
-			return this.receivedEvent;
-		}
-	}
-
-	/**
-	 * Tests the publish/subscribe mechanisms implemented in the {@link EventNotificationHandler}.
-	 */
-	@Test
-	public void testEventNotificationManager() {
-
-		final EventNotificationHandler evm = new EventNotificationHandler();
-		final TestEventListener listener = new TestEventListener();
-
-		evm.subscribe(listener, StringTaskEvent.class);
-
-		final StringTaskEvent stringTaskEvent1 = new StringTaskEvent("Test 1");
-		final StringTaskEvent stringTaskEvent2 = new StringTaskEvent("Test 2");
-
-		evm.publish(stringTaskEvent1);
-		evm.publish(new IntegerTaskEvent(5));
-
-		assertNotNull(listener.getLastReceivedEvent());
-		StringTaskEvent receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
-		assertEquals(stringTaskEvent1, receivedStringEvent);
-
-		evm.unsubscribe(listener, StringTaskEvent.class);
-
-		evm.publish(stringTaskEvent2);
-
-		assertNotNull(listener.getLastReceivedEvent());
-		receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
-		assertEquals(stringTaskEvent1, receivedStringEvent);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
new file mode 100644
index 0000000..5c6aeb7
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/event/TaskEventHandlerTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.util.event;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.event.task.IntegerTaskEvent;
+import org.apache.flink.runtime.event.task.StringTaskEvent;
+import org.apache.flink.runtime.io.network.api.TaskEventHandler;
+import org.junit.Test;
+
+/**
+ * This class contains unit tests for the {@link TaskEventHandler}.
+ * 
+ */
+public class TaskEventHandlerTest {
+	/**
+	 * A test implementation of an {@link EventListener}.
+	 * 
+	 */
+	private static class TestEventListener implements EventListener<TaskEvent> {
+
+		/**
+		 * The event that was last received by this event listener.
+		 */
+		private TaskEvent receivedEvent = null;
+
+		/**
+		 * {@inheritDoc}
+		 * @param event
+		 */
+		@Override
+		public void onEvent(TaskEvent event) {
+
+			this.receivedEvent = event;
+		}
+
+		/**
+		 * Returns the event which was last received by this event listener. If no event
+		 * has been received so far the return value is <code>null</code>.
+		 * 
+		 * @return the event which was last received, possibly <code>null</code>
+		 */
+		public TaskEvent getLastReceivedEvent() {
+
+			return this.receivedEvent;
+		}
+	}
+
+	/**
+	 * Tests the publish/subscribe mechanisms implemented in the {@link TaskEventHandler}.
+	 */
+	@Test
+	public void testEventNotificationManager() {
+
+		final TaskEventHandler evm = new TaskEventHandler();
+		final TestEventListener listener = new TestEventListener();
+
+		evm.subscribe(listener, StringTaskEvent.class);
+
+		final StringTaskEvent stringTaskEvent1 = new StringTaskEvent("Test 1");
+		final StringTaskEvent stringTaskEvent2 = new StringTaskEvent("Test 2");
+
+		evm.publish(stringTaskEvent1);
+		evm.publish(new IntegerTaskEvent(5));
+
+		assertNotNull(listener.getLastReceivedEvent());
+		StringTaskEvent receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
+		assertEquals(stringTaskEvent1, receivedStringEvent);
+
+		evm.unsubscribe(listener, StringTaskEvent.class);
+
+		evm.publish(stringTaskEvent2);
+
+		assertNotNull(listener.getLastReceivedEvent());
+		receivedStringEvent = (StringTaskEvent) listener.getLastReceivedEvent();
+		assertEquals(stringTaskEvent1, receivedStringEvent);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
index 57a61da..9bb1a3b 100644
--- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
+++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/Tasks.scala
@@ -72,7 +72,8 @@ object Tasks {
     var reader: RecordReader[IntegerRecord] = _
     var writer: RecordWriter[IntegerRecord] = _
     override def registerInputOutput(): Unit = {
-      reader = new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
+      reader = new RecordReader[IntegerRecord](getEnvironment.getInputGate(0),
+        classOf[IntegerRecord])
       writer = new RecordWriter[IntegerRecord](getEnvironment.getWriter(0))
     }
 
@@ -101,7 +102,7 @@ object Tasks {
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
+      reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -158,7 +159,7 @@ object Tasks {
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
+      reader = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -173,8 +174,8 @@ object Tasks {
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader1 = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
-      reader2 = new RecordReader[IntegerRecord](env.getReader(1), classOf[IntegerRecord])
+      reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+      reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -191,9 +192,9 @@ object Tasks {
     override def registerInputOutput(): Unit = {
       val env = getEnvironment
 
-      reader1 = new RecordReader[IntegerRecord](env.getReader(0), classOf[IntegerRecord])
-      reader2 = new RecordReader[IntegerRecord](env.getReader(1), classOf[IntegerRecord])
-      reader3 = new RecordReader[IntegerRecord](env.getReader(2), classOf[IntegerRecord])
+      reader1 = new RecordReader[IntegerRecord](env.getInputGate(0), classOf[IntegerRecord])
+      reader2 = new RecordReader[IntegerRecord](env.getInputGate(1), classOf[IntegerRecord])
+      reader3 = new RecordReader[IntegerRecord](env.getInputGate(2), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -239,7 +240,7 @@ object Tasks {
 
   class ExceptionReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
+      new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {
@@ -280,7 +281,7 @@ object Tasks {
 
   class BlockingReceiver extends AbstractInvokable {
     override def registerInputOutput(): Unit = {
-      new RecordReader[IntegerRecord](getEnvironment.getReader(0), classOf[IntegerRecord])
+      new RecordReader[IntegerRecord](getEnvironment.getInputGate(0), classOf[IntegerRecord])
     }
 
     override def invoke(): Unit = {

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
index de4660a..df7bcad 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/CoStreamVertex.java
@@ -17,11 +17,8 @@
 
 package org.apache.flink.streaming.api.streamvertex;
 
-import java.util.ArrayList;
-
-import org.apache.flink.runtime.io.network.api.reader.BufferReader;
-import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
-import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -31,6 +28,8 @@ import org.apache.flink.streaming.io.CoRecordReader;
 import org.apache.flink.streaming.io.IndexedReaderIterator;
 import org.apache.flink.util.MutableObjectIterator;
 
+import java.util.ArrayList;
+
 public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
 	protected StreamRecordSerializer<IN1> inputDeserializer1 = null;
@@ -77,12 +76,12 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 
 		int numberOfInputs = configuration.getNumberOfInputs();
 
-		ArrayList<BufferReader> inputList1 = new ArrayList<BufferReader>();
-		ArrayList<BufferReader> inputList2 = new ArrayList<BufferReader>();
+		ArrayList<InputGate> inputList1 = new ArrayList<InputGate>();
+		ArrayList<InputGate> inputList2 = new ArrayList<InputGate>();
 
 		for (int i = 0; i < numberOfInputs; i++) {
 			int inputType = configuration.getInputIndex(i);
-			BufferReader reader = getEnvironment().getReader(i);
+			InputGate reader = getEnvironment().getInputGate(i);
 			switch (inputType) {
 			case 1:
 				inputList1.add(reader);
@@ -95,11 +94,11 @@ public class CoStreamVertex<IN1, IN2, OUT> extends StreamVertex<IN1, OUT> {
 			}
 		}
 
-		final BufferReaderBase reader1 = inputList1.size() == 1 ? inputList1.get(0)
-				: new UnionBufferReader(inputList1.toArray(new BufferReader[inputList1.size()]));
+		final InputGate reader1 = inputList1.size() == 1 ? inputList1.get(0)
+				: new UnionInputGate(inputList1.toArray(new InputGate[inputList1.size()]));
 
-		final BufferReaderBase reader2 = inputList2.size() == 1 ? inputList2.get(0)
-				: new UnionBufferReader(inputList2.toArray(new BufferReader[inputList2.size()]));
+		final InputGate reader2 = inputList2.size() == 1 ? inputList2.get(0)
+				: new UnionInputGate(inputList2.toArray(new InputGate[inputList2.size()]));
 
 		coReader = new CoRecordReader<DeserializationDelegate<StreamRecord<IN1>>, DeserializationDelegate<StreamRecord<IN2>>>(
 				reader1, reader2);

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
index de3fd2b..73dbfce 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/InputHandler.java
@@ -19,7 +19,7 @@ package org.apache.flink.streaming.api.streamvertex;
 
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.io.network.api.reader.UnionBufferReader;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
 import org.apache.flink.streaming.api.streamrecord.StreamRecord;
@@ -54,14 +54,12 @@ public class InputHandler<IN> {
 		if (numberOfInputs > 0) {
 
 			if (numberOfInputs < 2) {
-
 				inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
-						streamVertex.getEnvironment().getReader(0));
+						streamVertex.getEnvironment().getInputGate(0));
 
 			} else {
-				UnionBufferReader reader = new UnionBufferReader(streamVertex.getEnvironment()
-						.getAllReaders());
-				inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(reader);
+				inputs = new IndexedMutableReader<DeserializationDelegate<StreamRecord<IN>>>(
+						new UnionInputGate(streamVertex.getEnvironment().getAllInputGates()));
 			}
 
 			inputIter = createInputIterator();

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
index 0b1b373..bb3a659 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoRecordReader.java
@@ -18,13 +18,13 @@
 package org.apache.flink.streaming.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.TaskEvent;
-import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
+import org.apache.flink.runtime.io.network.api.reader.AbstractReader;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
-import org.apache.flink.runtime.io.network.api.reader.ReaderBase;
 import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
 import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate;
 import org.apache.flink.runtime.util.event.EventListener;
 
 import java.io.IOException;
@@ -36,11 +36,11 @@ import java.util.concurrent.LinkedBlockingQueue;
  * types to read records effectively.
  */
 @SuppressWarnings("rawtypes")
-public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> implements ReaderBase, EventListener<BufferReaderBase> {
+public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadableWritable> extends AbstractReader implements EventListener<InputGate> {
 
-	private final BufferReaderBase bufferReader1;
+	private final InputGate bufferReader1;
 
-	private final BufferReaderBase bufferReader2;
+	private final InputGate bufferReader2;
 
 	private final BlockingQueue<Integer> availableRecordReaders = new LinkedBlockingQueue<Integer>();
 
@@ -57,7 +57,9 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 
 	private boolean hasRequestedPartitions;
 
-	public CoRecordReader(BufferReaderBase bufferReader1, BufferReaderBase bufferReader2) {
+	public CoRecordReader(InputGate bufferReader1, InputGate bufferReader2) {
+		super(new UnionInputGate(bufferReader1, bufferReader2));
+
 		this.bufferReader1 = bufferReader1;
 		this.bufferReader2 = bufferReader2;
 
@@ -72,14 +74,14 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 			reader2RecordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T2>();
 		}
 
-		bufferReader1.subscribeToReader(this);
-		bufferReader2.subscribeToReader(this);
+		bufferReader1.registerListener(this);
+		bufferReader2.registerListener(this);
 	}
 
 	public void requestPartitionsOnce() throws IOException {
 		if (!hasRequestedPartitions) {
-			bufferReader1.requestPartitionsOnce();
-			bufferReader2.requestPartitionsOnce();
+			bufferReader1.requestPartitions();
+			bufferReader2.requestPartitions();
 
 			hasRequestedPartitions = true;
 		}
@@ -115,18 +117,20 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 							return 1;
 						}
 					}
+					else {
 
-					final Buffer nextBuffer = bufferReader1.getNextBufferBlocking();
-					final int channelIndex = bufferReader1.getChannelIndexOfLastBuffer();
+						final BufferOrEvent boe = bufferReader1.getNextBufferOrEvent();
 
-					if (nextBuffer == null) {
-						currentReaderIndex = 0;
+						if (boe.isBuffer()) {
+							reader1currentRecordDeserializer = reader1RecordDeserializers[boe.getChannelIndex()];
+							reader1currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						}
+						else if (handleEvent(boe.getEvent())) {
+							currentReaderIndex = 0;
 
-						break;
+							break;
+						}
 					}
-
-					reader1currentRecordDeserializer = reader1RecordDeserializers[channelIndex];
-					reader1currentRecordDeserializer.setNextBuffer(nextBuffer);
 				}
 			}
 			else if (currentReaderIndex == 2) {
@@ -145,18 +149,19 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 							return 2;
 						}
 					}
+					else {
+						final BufferOrEvent boe = bufferReader2.getNextBufferOrEvent();
 
-					final Buffer nextBuffer = bufferReader2.getNextBufferBlocking();
-					final int channelIndex = bufferReader2.getChannelIndexOfLastBuffer();
-
-					if (nextBuffer == null) {
-						currentReaderIndex = 0;
+						if (boe.isBuffer()) {
+							reader2currentRecordDeserializer = reader2RecordDeserializers[boe.getChannelIndex()];
+							reader2currentRecordDeserializer.setNextBuffer(boe.getBuffer());
+						}
+						else if (handleEvent(boe.getEvent())) {
+							currentReaderIndex = 0;
 
-						break;
+							break;
+						}
 					}
-
-					reader2currentRecordDeserializer = reader2RecordDeserializers[channelIndex];
-					reader2currentRecordDeserializer.setNextBuffer(nextBuffer);
 				}
 			}
 			else {
@@ -174,7 +179,7 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 	// ------------------------------------------------------------------------
 
 	@Override
-	public void onEvent(BufferReaderBase bufferReader) {
+	public void onEvent(InputGate bufferReader) {
 		if (bufferReader == bufferReader1) {
 			availableRecordReaders.add(1);
 		}
@@ -182,40 +187,4 @@ public class CoRecordReader<T1 extends IOReadableWritable, T2 extends IOReadable
 			availableRecordReaders.add(2);
 		}
 	}
-
-	// ------------------------------------------------------------------------
-
-	@Override
-	public boolean isFinished() {
-		return bufferReader1.isFinished() && bufferReader2.isFinished();
-	}
-
-	@Override
-	public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
-		bufferReader1.subscribeToTaskEvent(eventListener, eventType);
-		bufferReader2.subscribeToTaskEvent(eventListener, eventType);
-	}
-
-	@Override
-	public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException {
-		bufferReader1.sendTaskEvent(event);
-		bufferReader2.sendTaskEvent(event);
-	}
-
-	@Override
-	public void setIterativeReader() {
-		bufferReader1.setIterativeReader();
-		bufferReader2.setIterativeReader();
-	}
-
-	@Override
-	public void startNextSuperstep() {
-		bufferReader1.startNextSuperstep();
-		bufferReader2.startNextSuperstep();
-	}
-
-	@Override
-	public boolean hasReachedEndOfSuperstep() {
-		return bufferReader1.hasReachedEndOfSuperstep() && bufferReader2.hasReachedEndOfSuperstep();
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
index 4178130..175dba2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedMutableReader.java
@@ -19,22 +19,18 @@
 package org.apache.flink.streaming.io;
 
 import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.api.reader.BufferReaderBase;
 import org.apache.flink.runtime.io.network.api.reader.MutableRecordReader;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 
 public class IndexedMutableReader<T extends IOReadableWritable> extends MutableRecordReader<T> {
 
-	BufferReaderBase reader;
+	InputGate reader;
 
-	public IndexedMutableReader(BufferReaderBase reader) {
+	public IndexedMutableReader(InputGate reader) {
 		super(reader);
 		this.reader = reader;
 	}
 
-	public int getLastChannelIndex() {
-		return reader.getChannelIndexOfLastBuffer();
-	}
-
 	public int getNumberOfInputChannels() {
 		return reader.getNumberOfInputChannels();
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
index 6c8187b..18cdd4e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/IndexedReaderIterator.java
@@ -24,19 +24,10 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
 
 public class IndexedReaderIterator<T> extends ReaderIterator<T> {
 
-	private IndexedMutableReader<DeserializationDelegate<T>> reader;
-
-	public IndexedReaderIterator(IndexedMutableReader<DeserializationDelegate<T>> reader,
+	public IndexedReaderIterator(
+			IndexedMutableReader<DeserializationDelegate<T>> reader,
 			TypeSerializer<T> serializer) {
-		super(reader, serializer);
-		this.reader = reader;
-	}
 
-	public int getLastChannelIndex() {
-		return reader.getLastChannelIndex();
-	}
-
-	public int getNumberOfInputChannels() {
-		return reader.getNumberOfInputChannels();
+		super(reader, serializer);
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
index 03038b3..4b13165 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -86,11 +86,6 @@ public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
 				return null;
 			}
 		}
-
-		@Override
-		public int getLastChannelIndex() {
-			return 0;
-		}
 	}
 
 	public List<OUT> getOutputs() {

http://git-wip-us.apache.org/repos/asf/flink/blob/5232c56b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
index f52de22..36c7cba 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java
@@ -200,7 +200,7 @@ public class NetworkStackThroughputITCase {
 
 		@Override
 		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(getEnvironment().getReader(0), SpeedTestRecord.class);
+			this.reader = new RecordReader<SpeedTestRecord>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
 			this.writer = new RecordWriter<SpeedTestRecord>(getEnvironment().getWriter(0));
 		}
 
@@ -222,7 +222,7 @@ public class NetworkStackThroughputITCase {
 
 		@Override
 		public void registerInputOutput() {
-			this.reader = new RecordReader<SpeedTestRecord>(getEnvironment().getReader(0), SpeedTestRecord.class);
+			this.reader = new RecordReader<SpeedTestRecord>(getEnvironment().getInputGate(0), SpeedTestRecord.class);
 		}
 
 		@Override


Mime
View raw message