flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [14/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:22 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
new file mode 100644
index 0000000..7a529b9
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/TaskEventDispatcher.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import com.google.common.collect.HashBasedTable;
+import com.google.common.collect.Table;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * The task event dispatcher dispatches events flowing backwards from a consumer
+ * to a producer. It only supports programs, where the producer and consumer
+ * are running at the same time.
+ * <p>
+ * The publish method is either called from the local input channel or the
+ * network I/O thread.
+ */
+public class TaskEventDispatcher {
+
+	Table<ExecutionAttemptID, IntermediateResultPartitionID, BufferWriter> registeredWriters = HashBasedTable.create();
+
+	public void registerWriterForIncomingTaskEvents(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, BufferWriter listener) {
+		synchronized (registeredWriters) {
+			if (registeredWriters.put(executionId, partitionId, listener) != null) {
+				throw new IllegalStateException("Event dispatcher already contains buffer writer.");
+			}
+		}
+	}
+
+	public void unregisterWriters(ExecutionAttemptID executionId) {
+		synchronized (registeredWriters) {
+			List<IntermediateResultPartitionID> writersToUnregister = new ArrayList<IntermediateResultPartitionID>();
+
+			for (IntermediateResultPartitionID partitionId : registeredWriters.row(executionId).keySet()) {
+				writersToUnregister.add(partitionId);
+			}
+
+			for(IntermediateResultPartitionID partitionId : writersToUnregister) {
+				registeredWriters.remove(executionId, partitionId);
+			}
+		}
+	}
+
+	/**
+	 * Publishes the event to the registered {@link EventListener} instance.
+	 * <p>
+	 * This method is either called from a local input channel or the network
+	 * I/O thread on behalf of a remote input channel.
+	 */
+	public boolean publish(ExecutionAttemptID executionId, IntermediateResultPartitionID partitionId, TaskEvent event) {
+		EventListener<TaskEvent> listener = registeredWriters.get(executionId, partitionId);
+
+		if (listener != null) {
+			listener.onEvent(event);
+			return true;
+		}
+
+		return false;
+	}
+
+	int getNumberOfRegisteredWriters() {
+		synchronized (registeredWriters) {
+			return registeredWriters.size();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
deleted file mode 100644
index 3af9aef..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractRecordReader.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-import org.apache.flink.runtime.event.task.EventNotificationManager;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records,
- * and dealing with reads from single gates (single end points) or multiple gates (union).
- */
-public abstract class AbstractRecordReader implements ReaderBase {
-	
-	
-	private final EventNotificationManager eventHandler = new EventNotificationManager();
-	
-	private int numEventsUntilEndOfSuperstep = -1;
-	
-	private int endOfSuperstepEventsCount;
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	@Override
-	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.eventHandler.subscribeToEvent(eventListener, eventType);
-	}
-
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener The listener object to cancel the subscription for.
-	 * @param eventType The type of the event to cancel the subscription for.
-	 */
-	@Override
-	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.eventHandler.unsubscribeFromEvent(eventListener, eventType);
-	}
-	
-	
-	protected void handleEvent(AbstractTaskEvent evt) {
-		this.eventHandler.deliverEvent(evt);
-	}
-	
-	@Override
-	public void setIterative(int numEventsUntilEndOfSuperstep) {
-		this.numEventsUntilEndOfSuperstep = numEventsUntilEndOfSuperstep;
-	}
-
-	@Override
-	public void startNextSuperstep() {
-		if (this.numEventsUntilEndOfSuperstep == -1) {
-			throw new IllegalStateException("Called 'startNextSuperstep()' in a non-iterative reader.");
-		}
-		else if (endOfSuperstepEventsCount < numEventsUntilEndOfSuperstep) {
-			throw new IllegalStateException("Premature 'startNextSuperstep()'. Not yet reached the end-of-superstep.");
-		}
-		this.endOfSuperstepEventsCount = 0;
-	}
-	
-	@Override
-	public boolean hasReachedEndOfSuperstep() {
-		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-	}
-	
-	protected boolean incrementEndOfSuperstepEventAndCheck() {
-		if (numEventsUntilEndOfSuperstep == -1) {
-			throw new IllegalStateException("Received EndOfSuperstep event in a non-iterative reader.");
-		}
-		
-		endOfSuperstepEventsCount++;
-		
-		if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) {
-			throw new IllegalStateException("Received EndOfSuperstep events beyond the number to indicate the end of the superstep");
-		}
-		
-		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
deleted file mode 100644
index e308da8..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractSingleGateRecordReader.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * This is an abstract base class for a record reader, either dealing with mutable or immutable records.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritable> extends AbstractRecordReader {
-	
-	/**
-	 * The input gate associated with the record reader.
-	 */
-	protected final InputGate<T> inputGate;
-	
-	// --------------------------------------------------------------------------------------------
-
-	protected AbstractSingleGateRecordReader(AbstractInvokable invokable) {
-		this.inputGate = invokable.getEnvironment().createAndRegisterInputGate();
-	}
-
-	/**
-	 * Returns the number of input channels wired to this reader's input gate.
-	 * 
-	 * @return the number of input channels wired to this reader's input gate
-	 */
-	public int getNumberOfInputChannels() {
-		return this.inputGate.getNumberOfInputChannels();
-	}
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	@Override
-	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
-		// Delegate call to input gate to send events
-		this.inputGate.publishEvent(event);
-	}
-	
-	@Override
-	public void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException, InterruptedException {
-		if(inputNumber==0) {
-			publishEvent(event);
-		}else {
-			throw new IOException("RecordReader has only 1 input");
-		}
-	}
-
-	public InputGate<T> getInputGate() {
-		return this.inputGate;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
deleted file mode 100644
index 00ccfee..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/AbstractUnionRecordReader.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.RecordAvailabilityListener;
-
-public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> extends AbstractRecordReader implements RecordAvailabilityListener<T> {
-
-	/**
-	 * The set of all input gates.
-	 */
-	private final InputGate<T>[] allInputGates;
-	
-	/**
-	 * The set of unclosed input gates.
-	 */
-	private final Set<InputGate<T>> remainingInputGates;
-
-	/**
-	 * Queue with indices of channels that store at least one available record.
-	 */
-	private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>();
-	
-	/**
-	 * The next input gate to read a record from.
-	 */
-	private InputGate<T> nextInputGateToReadFrom;
-
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.remainingInputGates.isEmpty();
-	}
-	
-	/**
-	 * Constructs a new mutable union record reader.
-	 * 
-	 * @param recordReaders
-	 *        the individual mutable record readers whose input is used to construct the union
-	 */
-	@SuppressWarnings("unchecked")
-	protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
-
-		if (recordReaders == null) {
-			throw new IllegalArgumentException("Provided argument recordReaders is null");
-		}
-
-		if (recordReaders.length < 2) {
-			throw new IllegalArgumentException(
-				"The mutable union record reader must at least be initialized with two individual mutable record readers");
-		}
-		
-		this.allInputGates = new InputGate[recordReaders.length];
-		this.remainingInputGates = new HashSet<InputGate<T>>((int) (recordReaders.length * 1.6f));
-		
-		for (int i = 0; i < recordReaders.length; i++) {
-			InputGate<T> inputGate = recordReaders[i].getInputGate();
-			inputGate.registerRecordAvailabilityListener(this);
-			this.allInputGates[i] = inputGate;
-			this.remainingInputGates.add(inputGate);
-		}
-	}
-	
-	
-	@Override
-	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
-		for (InputGate<T> gate : this.allInputGates) {
-			gate.publishEvent(event);
-		}
-	}
-	
-	@Override
-	public void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException,
-			InterruptedException {
-		allInputGates[inputNumber].publishEvent(event);
-	}
-	
-	@Override
-	public void reportRecordAvailability(InputGate<T> inputGate) {
-		synchronized (this.availableInputGates) {
-			this.availableInputGates.add(inputGate);
-			this.availableInputGates.notifyAll();
-		}
-	}
-	
-	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
-
-		while (true) {
-			// has the current input gate more data?
-			if (this.nextInputGateToReadFrom == null) {
-				if (this.remainingInputGates.isEmpty()) {
-					return false;
-				}
-				
-				this.nextInputGateToReadFrom = getNextAvailableInputGate();
-			}
-
-			InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target);
-			switch (result) {
-				case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we can stay on the same channel
-					return true;
-					
-				case LAST_RECORD_FROM_BUFFER: // record is available, but we need to re-check the channels
-					this.nextInputGateToReadFrom = null;
-					return true;
-					
-				case END_OF_SUPERSTEP:
-					this.nextInputGateToReadFrom = null;
-					if (incrementEndOfSuperstepEventAndCheck()) {
-						return false; // end of the superstep
-					}
-					else {
-						break; // fall through and wait for next record/event
-					}
-					
-				case TASK_EVENT:	// event for the subscribers is available
-					handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
-					this.nextInputGateToReadFrom = null;
-					break;
-					
-				case END_OF_STREAM: // one gate is empty
-					this.remainingInputGates.remove(this.nextInputGateToReadFrom);
-					this.nextInputGateToReadFrom = null;
-					break;
-					
-				case NONE: // gate processed an internal event and could not return a record on this call
-					this.nextInputGateToReadFrom = null;
-					break;
-			}
-		}
-	}
-	
-	private InputGate<T> getNextAvailableInputGate() throws InterruptedException {
-		synchronized (this.availableInputGates) {
-			while (this.availableInputGates.isEmpty()) {
-				this.availableInputGates.wait();
-			}
-			return this.availableInputGates.pop();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
deleted file mode 100644
index 8eb117d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/BufferWriter.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class BufferWriter {
-
-	protected final OutputGate outputGate;
-
-	public BufferWriter(AbstractInvokable invokable) {
-		this.outputGate = invokable.getEnvironment().createAndRegisterOutputGate();
-	}
-
-	public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
-		this.outputGate.sendBuffer(buffer, targetChannel);
-	}
-
-	public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
-		this.outputGate.sendEvent(event, targetChannel);
-	}
-
-	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
-		this.outputGate.sendBufferAndEvent(buffer, event, targetChannel);
-	}
-
-	public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException {
-		this.outputGate.broadcastBuffer(buffer);
-	}
-
-	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
-		this.outputGate.broadcastEvent(event);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.outputGate.subscribeToEvent(eventListener, eventType);
-	}
-
-	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
-		this.outputGate.unsubscribeFromEvent(eventListener, eventType);
-	}
-
-	public void sendEndOfSuperstep() throws IOException, InterruptedException {
-		this.outputGate.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
deleted file mode 100644
index c780f87..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ChannelSelector.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * Objects implementing this interface are passed to an {@link org.apache.flink.runtime.io.network.gates.OutputGate}. When a record is sent through the output
- * gate, the channel selector object is called to determine to which {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the record
- * shall be passed on.
- * 
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public interface ChannelSelector<T extends IOReadableWritable> {
-
-	/**
-	 * Called to determine to which attached {@link org.apache.flink.runtime.io.network.channels.OutputChannel} objects the given record shall be forwarded.
-	 * 
-	 * @param record
-	 *        the record to the determine the output channels for
-	 * @param numberOfOutputChannels
-	 *        the total number of output channels which are attached to respective output gate
-	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
-	 *         which the record shall be forwarded
-	 */
-	int[] selectChannels(T record, int numberOfOutputChannels);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
new file mode 100644
index 0000000..49d7958
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfPartitionEvent.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.RuntimeEvent;
+
+import java.io.IOException;
+
+public class EndOfPartitionEvent extends RuntimeEvent {
+
+	public static final EndOfPartitionEvent INSTANCE = new EndOfPartitionEvent();
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		// Nothing to do here
+	}
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		// Nothing to do here
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
new file mode 100644
index 0000000..5d0199c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/EndOfSuperstepEvent.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.event.task.RuntimeEvent;
+
+import java.io.IOException;
+
+/**
+ * Marks the end of a superstep of one particular iteration head
+ */
+public class EndOfSuperstepEvent extends RuntimeEvent {
+
+	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
deleted file mode 100644
index 04027f6..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableReader.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
-	
-	boolean next(T target) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
deleted file mode 100644
index e3a9522..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableRecordReader.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> {
-	
-	private boolean endOfStream;
-	
-	
-	/**
-	 * Constructs a new mutable record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase The application that instantiated the record reader.
-	 */
-	public MutableRecordReader(AbstractInvokable taskBase) {
-		super(taskBase);
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	@Override
-	public boolean next(final T target) throws IOException, InterruptedException {
-		if (this.endOfStream) {
-			return false;
-			
-		}
-		while (true) {
-			InputChannelResult result = this.inputGate.readRecord(target);
-			switch (result) {
-				case INTERMEDIATE_RECORD_FROM_BUFFER:
-				case LAST_RECORD_FROM_BUFFER:
-					return true;
-					
-				case END_OF_SUPERSTEP:
-					if (incrementEndOfSuperstepEventAndCheck()) {
-						return false; // end of the superstep
-					}
-					else {
-						break; // fall through and wait for next record/event
-					}
-					
-				case TASK_EVENT:
-					handleEvent(this.inputGate.getCurrentEvent());
-					break;	// fall through to get next record
-				
-				case END_OF_STREAM:
-					this.endOfStream = true;
-					return false;
-					
-				default:
-					; // fall through to get next record
-			}
-		}
-	}
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.endOfStream;
-	}
-
-	@Override
-	public void setIterative(int numEventsUntilEndOfSuperstep) {
-		// sanity check for debug purposes
-		if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) {
-			throw new IllegalArgumentException("Number of events till end of superstep is different from the number of input channels.");
-		}
-		super.setIterative(numEventsUntilEndOfSuperstep);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
deleted file mode 100644
index 2f6e2d2..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/MutableUnionRecordReader.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public class MutableUnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements MutableReader<T> {
-
-	
-	/**
-	 * Constructs a new mutable union record reader.
-	 * 
-	 * @param recordReaders
-	 *        the individual mutable record readers whose input is used to construct the union
-	 */
-	public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
-		super(recordReaders);
-	}
-
-	@Override
-	public boolean next(T target) throws IOException, InterruptedException {
-		return getNextRecord(target);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
deleted file mode 100644
index 9be0978..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/Reader.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * A reader interface to read records from an input.
- * 
- * @param <T> The type of the record that can be emitted with this record writer
- */
-public interface Reader<T extends IOReadableWritable> extends ReaderBase {
-
-	boolean hasNext() throws IOException, InterruptedException;
-
-	T next() throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
deleted file mode 100644
index 0b8069a..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/ReaderBase.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.runtime.event.task.AbstractTaskEvent;
-import org.apache.flink.runtime.event.task.EventListener;
-
-
-/**
- *
- */
-public interface ReaderBase {
-
-	boolean isInputClosed();
-	
-	/**
-	 * Subscribes the listener object to receive events of the given type.
-	 * 
-	 * @param eventListener
-	 *        the listener object to register
-	 * @param eventType
-	 *        the type of event to register the listener for
-	 */
-	void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
-	
-	/**
-	 * Removes the subscription for events of the given type for the listener object.
-	 * 
-	 * @param eventListener
-	 *        the listener object to cancel the subscription for
-	 * @param eventType
-	 *        the type of the event to cancel the subscription for
-	 */
-	void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType);
-
-	/**
-	 * Publishes an event.
-	 * 
-	 * @param event
-	 *        the event to be published
-	 * @throws IOException
-	 *         thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *         thrown if the thread is interrupted while waiting for the event to be published
-	 */
-	void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException;
-	
-	/**
-	 * Publishes an event to a specific input.
-	 * 
-	 * @param event
-	 *            the event to be published
-	 * @param inputNumber
-	 *            the number of the input that we want to publish the event to
-	 * 
-	 * @throws IOException
-	 *             thrown if an error occurs while transmitting the event
-	 * @throws InterruptedException
-	 *             thrown if the thread is interrupted while waiting for the
-	 *             event to be published
-	 */
-	void publishEvent(AbstractTaskEvent event, int inputNumber) throws IOException, InterruptedException;
-	
-	
-	void setIterative(int numEventsUntilEndOfSuperstep);
-
-	
-	void startNextSuperstep();
-	
-	boolean hasReachedEndOfSuperstep();
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
deleted file mode 100644
index fc97736..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordReader.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.io.network.gates.InputChannelResult;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A record writer connects an input gate to an application. It allows the application
- * query for incoming records and read them from input gate.
- * 
- * @param <T> The type of the record that can be read from this record reader.
- */
-public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements Reader<T> {
-	
-	private final Class<T> recordType;
-	
-	/**
-	 * Stores the last read record.
-	 */
-	private T lookahead;
-
-	/**
-	 * Stores if more no more records will be received from the assigned input gate.
-	 */
-	private boolean noMoreRecordsWillFollow;
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Constructs a new record reader and registers a new input gate with the application's environment.
-	 * 
-	 * @param taskBase
-	 *        The application that instantiated the record reader.
-	 * @param recordType
-	 *        The class of records that can be read from the record reader.
-	 */
-	public RecordReader(AbstractInvokable taskBase, Class<T> recordType) {
-		super(taskBase);
-		this.recordType = recordType;
-	}
-	
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Checks if at least one more record can be read from the associated input gate. This method may block
-	 * until the associated input gate is able to read the record from one of its input channels.
-	 * 
-	 * @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
-	 *         <code>false</code>
-	 */
-	@Override
-	public boolean hasNext() throws IOException, InterruptedException{
-		if (this.lookahead != null) {
-			return true;
-		} else {
-			if (this.noMoreRecordsWillFollow) {
-				return false;
-			}
-			
-			T record = instantiateRecordType();
-			
-			while (true) {
-				InputChannelResult result = this.inputGate.readRecord(record);
-				switch (result) {
-					case INTERMEDIATE_RECORD_FROM_BUFFER:
-					case LAST_RECORD_FROM_BUFFER:
-						this.lookahead = record;
-						return true;
-						
-					case END_OF_SUPERSTEP:
-						if (incrementEndOfSuperstepEventAndCheck()) {
-							return false;
-						}
-						else {
-							break; // fall through and wait for next record/event
-						}
-						
-					case TASK_EVENT:
-						handleEvent(this.inputGate.getCurrentEvent());
-						break;
-						
-					case END_OF_STREAM:
-						this.noMoreRecordsWillFollow = true;
-						return false;
-				
-					default:
-						; // fall through the loop
-				}
-			}
-		}
-	}
-
-	/**
-	 * Reads the current record from the associated input gate.
-	 * 
-	 * @return the current record from the associated input gate.
-	 * @throws IOException
-	 *         thrown if any error occurs while reading the record from the input gate
-	 */
-	@Override
-	public T next() throws IOException, InterruptedException {
-		if (hasNext()) {
-			T tmp = this.lookahead;
-			this.lookahead = null;
-			return tmp;
-		} else {
-			return null;
-		}
-	}
-	
-	@Override
-	public boolean isInputClosed() {
-		return this.noMoreRecordsWillFollow;
-	}
-	
-	private T instantiateRecordType() {
-		try {
-			return this.recordType.newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
deleted file mode 100644
index 3ddd564..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RecordWriter.java
+++ /dev/null
@@ -1,152 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
-import org.apache.flink.runtime.io.network.serialization.RecordSerializer;
-import org.apache.flink.runtime.io.network.serialization.SpanningRecordSerializer;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-
-/**
- * A record writer connects the application to an output gate. It allows the application
- * of emit (send out) to the output gate. The output gate will then take care of distributing
- * the emitted records among the output channels.
- * 
- * @param <T>
- *        the type of the record that can be emitted with this record writer
- */
-public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
-
-	private final BufferProvider bufferPool;
-
-	private final ChannelSelector<T> channelSelector;
-
-	private int numChannels;
-
-	/** RecordSerializer per outgoing channel */
-	private RecordSerializer<T>[] serializers;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public RecordWriter(AbstractInvokable invokable) {
-		this(invokable, new RoundRobinChannelSelector<T>());
-	}
-
-	public RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
-		// initialize the gate
-		super(invokable);
-
-		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
-		this.channelSelector = channelSelector;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("unchecked")
-	public void initializeSerializers() {
-		this.numChannels = this.outputGate.getNumChannels();
-		this.serializers = new RecordSerializer[numChannels];
-		for (int i = 0; i < this.numChannels; i++) {
-			this.serializers[i] = new SpanningRecordSerializer<T>();
-		}
-	}
-
-	public void emit(final T record) throws IOException, InterruptedException {
-		for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
-			// serialize with corresponding serializer and send full buffer
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			RecordSerializer.SerializationResult result = serializer.addRecord(record);
-			while (result.isFullBuffer()) {
-				Buffer buffer = serializer.getCurrentBuffer();
-				if (buffer != null) {
-					sendBuffer(buffer, targetChannel);
-				}
-
-				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-				result = serializer.setNextBuffer(buffer);
-			}
-		}
-	}
-
-	public void flush() throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			Buffer buffer = serializer.getCurrentBuffer();
-			if (buffer != null) {
-				sendBuffer(buffer, targetChannel);
-			}
-
-			serializer.clear();
-		}
-	}
-
-	@Override
-	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			Buffer buffer = serializer.getCurrentBuffer();
-			if (buffer == null) {
-				super.sendEvent(event, targetChannel);
-			} else {
-				super.sendBufferAndEvent(buffer, event, targetChannel);
-
-				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-				serializer.setNextBuffer(buffer);
-			}
-		}
-	}
-
-	@Override
-	public void sendEndOfSuperstep() throws IOException, InterruptedException {
-		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
-			RecordSerializer<T> serializer = this.serializers[targetChannel];
-
-			Buffer buffer = serializer.getCurrentBuffer();
-			if (buffer == null) {
-				super.sendEvent(EndOfSuperstepEvent.INSTANCE, targetChannel);
-			} else {
-				super.sendBufferAndEvent(buffer, EndOfSuperstepEvent.INSTANCE, targetChannel);
-
-				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
-				serializer.setNextBuffer(buffer);
-			}
-		}
-	}
-	
-	public void clearBuffers() {
-		if (this.serializers != null) {
-			for (RecordSerializer<?> s: this.serializers) {
-				Buffer b = s.getCurrentBuffer();
-				if (b != null) {
-					b.recycleBuffer();
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
deleted file mode 100644
index 668046f..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/RoundRobinChannelSelector.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-/**
- * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
- * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
-
- * @param <T>
- *        the type of record which is sent through the attached output gate
- */
-public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
-
-	/**
-	 * Stores the index of the channel to send the next record to.
-	 */
-	private final int[] nextChannelToSendTo = new int[1];
-
-	/**
-	 * Constructs a new default channel selector.
-	 */
-	public RoundRobinChannelSelector() {
-		this.nextChannelToSendTo[0] = 0;
-	}
-
-
-	@Override
-	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
-
-		this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels;
-
-		return this.nextChannelToSendTo;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
deleted file mode 100644
index 2f7ba1d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/UnionRecordReader.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network.api;
-
-import java.io.IOException;
-
-import org.apache.flink.core.io.IOReadableWritable;
-
-public final class UnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements Reader<T> {
-	
-	private final Class<T> recordType;
-	
-	private T lookahead;
-	
-
-	public UnionRecordReader(MutableRecordReader<T>[] recordReaders, Class<T> recordType) {
-		super(recordReaders);
-		this.recordType = recordType;
-	}
-
-	@Override
-	public boolean hasNext() throws IOException, InterruptedException {
-		if (this.lookahead != null) {
-			return true;
-		} else {
-			T record = instantiateRecordType();
-			if (getNextRecord(record)) {
-				this.lookahead = record;
-				return true;
-			} else {
-				return false;
-			}
-		}
-	}
-
-	@Override
-	public T next() throws IOException, InterruptedException {
-		if (hasNext()) {
-			T tmp = this.lookahead;
-			this.lookahead = null;
-			return tmp;
-		} else {
-			return null;
-		}
-	}
-	
-	private T instantiateRecordType() {
-		try {
-			return this.recordType.newInstance();
-		} catch (InstantiationException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		} catch (IllegalAccessException e) {
-			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
new file mode 100644
index 0000000..cf4c302
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
+import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.util.event.EventListener;
+
+/**
+ * A record-oriented runtime result reader, which wraps a {@link BufferReaderBase}.
+ * <p>
+ * This abstract base class is used by both the mutable and immutable record
+ * reader.
+ *
+ * @param <T> The type of the record that can be read with this record reader.
+ */
+abstract class AbstractRecordReader<T extends IOReadableWritable> implements ReaderBase {
+
+	private final BufferReaderBase reader;
+
+	private final RecordDeserializer<T>[] recordDeserializers;
+
+	private RecordDeserializer<T> currentRecordDeserializer;
+
+	private boolean isFinished;
+
+	protected AbstractRecordReader(BufferReaderBase reader) {
+		this.reader = reader;
+
+		// Initialize one deserializer per input channel
+		this.recordDeserializers = new AdaptiveSpanningRecordDeserializer[reader.getNumberOfInputChannels()];
+		for (int i = 0; i < recordDeserializers.length; i++) {
+			recordDeserializers[i] = new AdaptiveSpanningRecordDeserializer<T>();
+		}
+	}
+
+	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+		if (isFinished) {
+			return false;
+		}
+
+		while (true) {
+			if (currentRecordDeserializer != null) {
+				DeserializationResult result = currentRecordDeserializer.getNextRecord(target);
+
+				if (result.isBufferConsumed()) {
+					currentRecordDeserializer.getCurrentBuffer().recycle();
+					currentRecordDeserializer = null;
+				}
+
+				if (result.isFullRecord()) {
+					return true;
+				}
+			}
+
+			final Buffer nextBuffer = reader.getNextBuffer();
+			final int channelIndex = reader.getChannelIndexOfLastBuffer();
+
+			if (nextBuffer == null) {
+				if (reader.isFinished()) {
+					isFinished = true;
+					return false;
+				}
+				else if (reader.hasReachedEndOfSuperstep()) {
+					return false;
+				}
+				else {
+					// More data is coming...
+					continue;
+				}
+			}
+
+			currentRecordDeserializer = recordDeserializers[channelIndex];
+			currentRecordDeserializer.setNextBuffer(nextBuffer);
+		}
+	}
+
+	public void clearBuffers() {
+		for (RecordDeserializer<?> deserializer : recordDeserializers) {
+			Buffer buffer = deserializer.getCurrentBuffer();
+			if (buffer != null && !buffer.isRecycled()) {
+				buffer.recycle();
+			}
+		}
+	}
+
+	@Override
+	public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException {
+		reader.sendTaskEvent(event);
+	}
+
+	@Override
+	public boolean isFinished() {
+		return reader.isFinished();
+	}
+
+	@Override
+	public void subscribeToTaskEvent(EventListener<TaskEvent> eventListener, Class<? extends TaskEvent> eventType) {
+		reader.subscribeToTaskEvent(eventListener, eventType);
+	}
+
+	@Override
+	public void setIterativeReader() {
+		reader.setIterativeReader();
+	}
+
+	@Override
+	public void startNextSuperstep() {
+		reader.startNextSuperstep();
+	}
+
+	@Override
+	public boolean hasReachedEndOfSuperstep() {
+		return reader.hasReachedEndOfSuperstep();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
new file mode 100644
index 0000000..1df7216
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReader.java
@@ -0,0 +1,485 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import com.google.common.collect.Maps;
+import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionInfo;
+import org.apache.flink.runtime.deployment.PartitionInfo.PartitionLocation;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.event.task.TaskEvent;
+import org.apache.flink.runtime.execution.RuntimeEnvironment;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.ConnectionManager;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.io.network.TaskEventDispatcher;
+import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
+import org.apache.flink.runtime.io.network.api.EndOfSuperstepEvent;
+import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferProvider;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.util.event.EventListener;
+import org.apache.flink.runtime.util.event.EventNotificationHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+public final class BufferReader implements BufferReaderBase {
+
+	private static final Logger LOG = LoggerFactory.getLogger(BufferReader.class);
+
+	private final Object requestLock = new Object();
+
+	private final RuntimeEnvironment environment;
+
+	private final NetworkEnvironment networkEnvironment;
+
+	private final EventNotificationHandler<TaskEvent> taskEventHandler = new EventNotificationHandler<TaskEvent>();
+
+	private final IntermediateDataSetID consumedResultId;
+
+	private final int totalNumberOfInputChannels;
+
+	private final int queueToRequest;
+
+	private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
+
+	private BufferPool bufferPool;
+
+	private boolean isReleased;
+
+	private boolean isTaskEvent;
+
+	// ------------------------------------------------------------------------
+
+	private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
+
+	private final AtomicReference<EventListener<BufferReader>> readerListener = new AtomicReference<EventListener<BufferReader>>(null);
+
+	// ------------------------------------------------------------------------
+
+	private boolean isIterativeReader;
+
+	private int currentNumEndOfSuperstepEvents;
+
+	private int channelIndexOfLastReadBuffer = -1;
+
+	private boolean hasRequestedPartitions = false;
+
+	public BufferReader(RuntimeEnvironment environment, NetworkEnvironment networkEnvironment, IntermediateDataSetID consumedResultId, int numberOfInputChannels, int queueToRequest) {
+
+		this.consumedResultId = checkNotNull(consumedResultId);
+		// Note: the environment is not fully initialized yet
+		this.environment = checkNotNull(environment);
+
+		this.networkEnvironment = networkEnvironment;
+
+		checkArgument(numberOfInputChannels >= 0);
+		this.totalNumberOfInputChannels = numberOfInputChannels;
+
+		checkArgument(queueToRequest >= 0);
+		this.queueToRequest = queueToRequest;
+
+		this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
+	}
+
+	// ------------------------------------------------------------------------
+	// Properties
+	// ------------------------------------------------------------------------
+
+	public void setBufferPool(BufferPool bufferPool) {
+		checkArgument(bufferPool.getNumberOfRequiredMemorySegments() == totalNumberOfInputChannels, "Buffer pool has not enough buffers for this reader.");
+		checkState(this.bufferPool == null, "Buffer pool has already been set for reader.");
+
+		this.bufferPool = checkNotNull(bufferPool);
+	}
+
+	public IntermediateDataSetID getConsumedResultId() {
+		return consumedResultId;
+	}
+
+	public String getTaskNameWithSubtasks() {
+		return environment.getTaskNameWithSubtasks();
+	}
+
+	public IntermediateResultPartitionProvider getIntermediateResultPartitionProvider() {
+		return networkEnvironment.getPartitionManager();
+	}
+
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return networkEnvironment.getTaskEventDispatcher();
+	}
+
+	public ConnectionManager getConnectionManager() {
+		return networkEnvironment.getConnectionManager();
+	}
+
+	// TODO This is a work-around for the union reader
+	boolean hasInputChannelWithData() {
+		return !inputChannelsWithData.isEmpty();
+	}
+
+	/**
+	 * Returns the total number of input channels for this reader.
+	 * <p>
+	 * Note: This number might be smaller the current number of input channels
+	 * of the reader as channels are possibly updated during runtime.
+	 */
+	public int getNumberOfInputChannels() {
+		return totalNumberOfInputChannels;
+	}
+
+	public BufferProvider getBufferProvider() {
+		return bufferPool;
+	}
+
+	public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
+		synchronized (requestLock) {
+			inputChannels.put(checkNotNull(partitionId), checkNotNull(inputChannel));
+		}
+	}
+
+	public void updateInputChannel(PartitionInfo partitionInfo) throws IOException {
+		synchronized (requestLock) {
+			if (isReleased) {
+				// There was a race with a task failure/cancel
+				return;
+			}
+
+			final IntermediateResultPartitionID partitionId = partitionInfo.getPartitionId();
+
+			InputChannel current = inputChannels.get(partitionId);
+
+			if (current.getClass() == UnknownInputChannel.class) {
+				UnknownInputChannel unknownChannel = (UnknownInputChannel) current;
+
+				InputChannel newChannel;
+
+				if (partitionInfo.getProducerLocation() == PartitionLocation.REMOTE) {
+					newChannel = unknownChannel.toRemoteInputChannel(partitionInfo.getProducerAddress());
+				}
+				else if (partitionInfo.getProducerLocation() == PartitionLocation.LOCAL) {
+					newChannel = unknownChannel.toLocalInputChannel();
+				}
+				else {
+					throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
+				}
+
+				inputChannels.put(partitionId, newChannel);
+
+				newChannel.requestIntermediateResultPartition(queueToRequest);
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Consume
+	// ------------------------------------------------------------------------
+
+	void requestPartitionsOnce() throws IOException {
+		if (!hasRequestedPartitions) {
+			// Sanity check
+			if (totalNumberOfInputChannels != inputChannels.size()) {
+				throw new IllegalStateException("Mismatch between number of total input channels and the currently number of set input channels.");
+			}
+
+			synchronized (requestLock) {
+				for (InputChannel inputChannel : inputChannels.values()) {
+					inputChannel.requestIntermediateResultPartition(queueToRequest);
+				}
+			}
+
+			hasRequestedPartitions = true;
+		}
+	}
+
+	@Override
+	public Buffer getNextBuffer() throws IOException, InterruptedException {
+		requestPartitionsOnce();
+
+		while (true) {
+			if (Thread.interrupted()) {
+				throw new InterruptedException();
+			}
+
+			// Possibly block until data is available at one of the input channels
+			InputChannel currentChannel = null;
+			while (currentChannel == null) {
+				currentChannel = inputChannelsWithData.poll(2000, TimeUnit.MILLISECONDS);
+			}
+
+			isTaskEvent = false;
+
+			final Buffer buffer = currentChannel.getNextBuffer();
+
+			if (buffer == null) {
+				throw new IllegalStateException("Bug in reader logic: queried for a buffer although none was available.");
+			}
+
+			if (buffer.isBuffer()) {
+				channelIndexOfLastReadBuffer = currentChannel.getChannelIndex();
+				return buffer;
+			}
+			else {
+				try {
+					final AbstractEvent event = EventSerializer.fromBuffer(buffer, getClass().getClassLoader());
+
+					// ------------------------------------------------------------
+					// Runtime events
+					// ------------------------------------------------------------
+					// Note: We can not assume that every channel will be finished
+					// with an according event. In failure cases or iterations the
+					// consumer task finishes earlier and has to release all
+					// resources.
+					// ------------------------------------------------------------
+					if (event.getClass() == EndOfPartitionEvent.class) {
+						currentChannel.releaseAllResources();
+
+						return null;
+					}
+					else if (event.getClass() == EndOfSuperstepEvent.class) {
+						incrementEndOfSuperstepEventAndCheck();
+
+						return null;
+					}
+					// ------------------------------------------------------------
+					// Task events (user)
+					// ------------------------------------------------------------
+					else if (event instanceof TaskEvent) {
+						taskEventHandler.publish((TaskEvent) event);
+
+						isTaskEvent = true;
+
+						return null;
+					}
+					else {
+						throw new IllegalStateException("Received unexpected event " + event + " from input channel " + currentChannel + ".");
+					}
+				}
+				catch (Throwable t) {
+					throw new IOException("Error while reading event: " + t.getMessage(), t);
+				}
+				finally {
+					buffer.recycle();
+				}
+			}
+		}
+	}
+
+	@Override
+	public Buffer getNextBuffer(Buffer exchangeBuffer) {
+		throw new UnsupportedOperationException("Buffer exchange when reading data is not yet supported.");
+	}
+
+	@Override
+	public int getChannelIndexOfLastBuffer() {
+		return channelIndexOfLastReadBuffer;
+	}
+
+	@Override
+	public boolean isTaskEvent() {
+		return isTaskEvent;
+	}
+
+	@Override
+	public boolean isFinished() {
+		synchronized (requestLock) {
+			for (InputChannel inputChannel : inputChannels.values()) {
+				if (!inputChannel.isReleased()) {
+					return false;
+				}
+			}
+		}
+
+		return true;
+	}
+
+	public void releaseAllResources() throws IOException {
+		synchronized (requestLock) {
+			if (!isReleased) {
+				try {
+					for (InputChannel inputChannel : inputChannels.values()) {
+						try {
+							inputChannel.releaseAllResources();
+						}
+						catch (IOException e) {
+							LOG.warn("Error during release of channel resources: " + e.getMessage(), e);
+						}
+					}
+
+					// The buffer pool can actually be destroyed immediately after the
+					// reader received all of the data from the input channels.
+					if (bufferPool != null) {
+						bufferPool.destroy();
+					}
+				}
+				finally {
+					isReleased = true;
+				}
+			}
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Channel notifications
+	// ------------------------------------------------------------------------
+
+	public void onAvailableInputChannel(InputChannel inputChannel) {
+		inputChannelsWithData.add(inputChannel);
+
+		if (readerListener.get() != null) {
+			readerListener.get().onEvent(this);
+		}
+	}
+
+	void subscribeToReader(EventListener<BufferReader> listener) {
+		if (!this.readerListener.compareAndSet(null, listener)) {
+			throw new IllegalStateException(listener + " is already registered as a record availability listener");
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	// Task events
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void sendTaskEvent(TaskEvent event) throws IOException, InterruptedException {
+		// This can be improved by just serializing the event once for all
+		// remote input channels.
+		synchronized (requestLock) {
+			for (InputChannel inputChannel : inputChannels.values()) {
+				inputChannel.sendTaskEvent(event);
+			}
+		}
+	}
+
+	@Override
+	public void subscribeToTaskEvent(EventListener<TaskEvent> listener, Class<? extends TaskEvent> eventType) {
+		taskEventHandler.subscribe(listener, eventType);
+	}
+
+	// ------------------------------------------------------------------------
+	// Iteration end of superstep events
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void setIterativeReader() {
+		isIterativeReader = true;
+	}
+
+	@Override
+	public void startNextSuperstep() {
+		checkState(isIterativeReader, "Tried to start next superstep in a non-iterative reader.");
+		checkState(currentNumEndOfSuperstepEvents == totalNumberOfInputChannels,
+				"Tried to start next superstep before reaching end of previous superstep.");
+
+		currentNumEndOfSuperstepEvents = 0;
+	}
+
+	@Override
+	public boolean hasReachedEndOfSuperstep() {
+		return currentNumEndOfSuperstepEvents == totalNumberOfInputChannels;
+	}
+
+	private boolean incrementEndOfSuperstepEventAndCheck() {
+		checkState(isIterativeReader, "Received end of superstep event in a non-iterative reader.");
+
+		currentNumEndOfSuperstepEvents++;
+
+		checkState(currentNumEndOfSuperstepEvents <= totalNumberOfInputChannels,
+				"Received too many (" + currentNumEndOfSuperstepEvents + ") end of superstep events.");
+
+		return currentNumEndOfSuperstepEvents == totalNumberOfInputChannels;
+	}
+
+	// ------------------------------------------------------------------------
+
+	@Override
+	public String toString() {
+		return String.format("BufferReader %s [task: %s, current/total number of input channels: %d/%d]",
+				consumedResultId, getTaskNameWithSubtasks(), inputChannels.size(), totalNumberOfInputChannels);
+	}
+
+	public static BufferReader create(RuntimeEnvironment runtimeEnvironment, NetworkEnvironment networkEnvironment, PartitionConsumerDeploymentDescriptor desc) {
+		// The consumed intermediate data set (all partitions are part of this data set)
+		final IntermediateDataSetID resultId = desc.getResultId();
+
+		// The queue to request from each consumed partition
+		final int queueIndex = desc.getQueueIndex();
+
+		// There is one input channel for each consumed partition
+		final PartitionInfo[] partitions = desc.getPartitions();
+		final int numberOfInputChannels = partitions.length;
+
+		final BufferReader reader = new BufferReader(runtimeEnvironment, networkEnvironment, resultId, numberOfInputChannels, queueIndex);
+
+		// Create input channels
+		final InputChannel[] inputChannels = new InputChannel[numberOfInputChannels];
+
+		int channelIndex = 0;
+
+		for (PartitionInfo partition : partitions) {
+			final ExecutionAttemptID producerExecutionId = partition.getProducerExecutionId();
+			final IntermediateResultPartitionID partitionId = partition.getPartitionId();
+
+			final PartitionLocation producerLocation = partition.getProducerLocation();
+
+			switch (producerLocation) {
+				case LOCAL:
+					inputChannels[channelIndex] = new LocalInputChannel(channelIndex, producerExecutionId, partitionId, reader);
+					break;
+
+				case REMOTE:
+					final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(), "Missing producer address for remote intermediate result partition.");
+
+					inputChannels[channelIndex] = new RemoteInputChannel(channelIndex, producerExecutionId, partitionId, reader, producerAddress);
+					break;
+
+				case UNKNOWN:
+					inputChannels[channelIndex] = new UnknownInputChannel(channelIndex, producerExecutionId, partitionId, reader);
+					break;
+			}
+
+			reader.setInputChannel(partitionId, inputChannels[channelIndex]);
+
+			channelIndex++;
+		}
+
+		return reader;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
new file mode 100644
index 0000000..04fae71
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/BufferReaderBase.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
+import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
+
+import java.io.IOException;
+
+/**
+ * A buffer-oriented runtime result reader.
+ * <p>
+ * {@link BufferReaderBase} is the runtime API for consuming results. Events
+ * are handled by the reader and users can query for buffers with
+ * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+ * <p>
+ * <strong>Important</strong>: If {@link #getNextBuffer()} is used, it is
+ * necessary to release the returned buffers with {@link Buffer#recycle()}
+ * after they are consumed.
+ */
+public interface BufferReaderBase extends ReaderBase {
+
+	/**
+	 * Returns the next queued {@link Buffer} from one of the {@link RemoteInputChannel}
+	 * instances attached to this reader. The are no ordering guarantees with
+	 * respect to which channel is queried for data.
+	 * <p>
+	 * <strong>Important</strong>: it is necessary to release buffers, which
+	 * are returned by the reader via {@link Buffer#recycle()}, because they
+	 * are a pooled resource. If not recycled, the network stack will run out
+	 * of buffers and deadlock.
+	 *
+	 * @see #getChannelIndexOfLastBuffer()
+	 */
+	Buffer getNextBuffer() throws IOException, InterruptedException;
+
+	/**
+	 * {@link #getNextBuffer()} requires the user to quickly recycle the
+	 * returned buffer. For a fully buffer-oriented runtime, we need to
+	 * support a variant of this method, which allows buffers to be exchanged
+	 * in order to save unnecessary memory copies between buffer pools.
+	 * <p>
+	 * Currently this is not a problem, because the only "users" of the buffer-
+	 * oriented API are the record-oriented readers, which immediately
+	 * deserialize the buffer and recycle it.
+	 */
+	Buffer getNextBuffer(Buffer exchangeBuffer) throws IOException, InterruptedException;
+
+	/**
+	 * Returns a channel index for the last {@link Buffer} instance returned by
+	 * {@link #getNextBuffer()} or {@link #getNextBuffer(Buffer)}.
+	 * <p>
+	 * The returned index is guaranteed to be the same for all buffers read by
+	 * the same {@link RemoteInputChannel} instance. This is useful when data spans
+	 * multiple buffers returned by this reader.
+	 * <p>
+	 * Initially returns <code>-1</code> and if multiple readers are unioned,
+	 * the local channel indexes are mapped to the sequence from 0 to n-1.
+	 */
+	int getChannelIndexOfLastBuffer();
+
+	/**
+	 * Returns the total number of {@link InputChannel} instances, from which this
+	 * reader gets its data.
+	 */
+	int getNumberOfInputChannels();
+
+	boolean isTaskEvent();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
new file mode 100644
index 0000000..e47982e
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import java.io.IOException;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+/**
+ * A record-oriented reader for mutable record types.
+ */
+public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
+
+	boolean next(T target) throws IOException, InterruptedException;
+
+	void clearBuffers();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
new file mode 100644
index 0000000..75d4f21
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/MutableRecordReader.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.api.reader;
+
+import org.apache.flink.core.io.IOReadableWritable;
+
+import java.io.IOException;
+
+public class MutableRecordReader<T extends IOReadableWritable> extends AbstractRecordReader<T> implements MutableReader<T> {
+
+	public MutableRecordReader(BufferReaderBase reader) {
+		super(reader);
+	}
+
+	@Override
+	public boolean next(final T target) throws IOException, InterruptedException {
+		return getNextRecord(target);
+	}
+
+	@Override
+	public void clearBuffers() {
+		super.clearBuffers();
+	}
+}


Mime
View raw message