flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [15/34] Offer buffer-oriented API for I/O (#25)
Date Tue, 10 Jun 2014 19:35:12 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractSingleGateRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractSingleGateRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractSingleGateRecordReader.java
new file mode 100644
index 0000000..71d5628
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractSingleGateRecordReader.java
@@ -0,0 +1,69 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+
+import java.io.IOException;
+
+/**
+ * This is an abstract base class for a record reader, either dealing with mutable or immutable records.
+ * 
+ * @param <T> The type of the record that can be read from this record reader.
+ */
+public abstract class AbstractSingleGateRecordReader<T extends IOReadableWritable> extends AbstractRecordReader {
+	
+	/**
+	 * The input gate associated with the record reader.
+	 */
+	protected final InputGate<T> inputGate;
+	
+	// --------------------------------------------------------------------------------------------
+
+	protected AbstractSingleGateRecordReader(AbstractInvokable invokable) {
+		this.inputGate = invokable.getEnvironment().createAndRegisterInputGate();
+	}
+
+	/**
+	 * Returns the number of input channels wired to this reader's input gate.
+	 * 
+	 * @return the number of input channels wired to this reader's input gate
+	 */
+	public int getNumberOfInputChannels() {
+		return this.inputGate.getNumberOfInputChannels();
+	}
+
+	/**
+	 * Publishes an event.
+	 * 
+	 * @param event
+	 *        the event to be published
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the event
+	 * @throws InterruptedException
+	 *         thrown if the thread is interrupted while waiting for the event to be published
+	 */
+	@Override
+	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
+		// Delegate call to input gate to send events
+		this.inputGate.publishEvent(event);
+	}
+
+	InputGate<T> getInputGate() {
+		return this.inputGate;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractUnionRecordReader.java
new file mode 100644
index 0000000..69ac327
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractUnionRecordReader.java
@@ -0,0 +1,155 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+import eu.stratosphere.runtime.io.gates.RecordAvailabilityListener;
+import eu.stratosphere.runtime.io.gates.InputGate;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.HashSet;
+import java.util.Set;
+
+public abstract class AbstractUnionRecordReader<T extends IOReadableWritable> extends AbstractRecordReader implements RecordAvailabilityListener<T> {
+
+	/**
+	 * The set of all input gates.
+	 */
+	private final InputGate<T>[] allInputGates;
+	
+	/**
+	 * The set of unclosed input gates.
+	 */
+	private final Set<InputGate<T>> remainingInputGates;
+
+	/**
+	 * Queue with indices of channels that store at least one available record.
+	 */
+	private final ArrayDeque<InputGate<T>> availableInputGates = new ArrayDeque<InputGate<T>>();
+	
+	/**
+	 * The next input gate to read a record from.
+	 */
+	private InputGate<T> nextInputGateToReadFrom;
+
+	
+	@Override
+	public boolean isInputClosed() {
+		return this.remainingInputGates.isEmpty();
+	}
+	
+	/**
+	 * Constructs a new mutable union record reader.
+	 * 
+	 * @param recordReaders
+	 *        the individual mutable record readers whose input is used to construct the union
+	 */
+	@SuppressWarnings("unchecked")
+	protected AbstractUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
+
+		if (recordReaders == null) {
+			throw new IllegalArgumentException("Provided argument recordReaders is null");
+		}
+
+		if (recordReaders.length < 2) {
+			throw new IllegalArgumentException(
+				"The mutable union record reader must at least be initialized with two individual mutable record readers");
+		}
+		
+		this.allInputGates = new InputGate[recordReaders.length];
+		this.remainingInputGates = new HashSet<InputGate<T>>((int) (recordReaders.length * 1.6f));
+		
+		for (int i = 0; i < recordReaders.length; i++) {
+			InputGate<T> inputGate = recordReaders[i].getInputGate();
+			inputGate.registerRecordAvailabilityListener(this);
+			this.allInputGates[i] = inputGate;
+			this.remainingInputGates.add(inputGate);
+		}
+	}
+	
+	
+	@Override
+	public void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException {
+		for (InputGate<T> gate : this.allInputGates) {
+			gate.publishEvent(event);
+		}
+	}
+	
+	@Override
+	public void reportRecordAvailability(InputGate<T> inputGate) {
+		synchronized (this.availableInputGates) {
+			this.availableInputGates.add(inputGate);
+			this.availableInputGates.notifyAll();
+		}
+	}
+	
+	protected boolean getNextRecord(T target) throws IOException, InterruptedException {
+
+		while (true) {
+			// has the current input gate more data?
+			if (this.nextInputGateToReadFrom == null) {
+				if (this.remainingInputGates.isEmpty()) {
+					return false;
+				}
+				
+				this.nextInputGateToReadFrom = getNextAvailableInputGate();
+			}
+
+			InputChannelResult result = this.nextInputGateToReadFrom.readRecord(target);
+			switch (result) {
+				case INTERMEDIATE_RECORD_FROM_BUFFER: // record is available and we can stay on the same channel
+					return true;
+					
+				case LAST_RECORD_FROM_BUFFER: // record is available, but we need to re-check the channels
+					this.nextInputGateToReadFrom = null;
+					return true;
+					
+				case END_OF_SUPERSTEP:
+					this.nextInputGateToReadFrom = null;
+					if (incrementEndOfSuperstepEventAndCheck()) {
+						return false; // end of the superstep
+					}
+					else {
+						break; // fall through and wait for next record/event
+					}
+					
+				case TASK_EVENT:	// event for the subscribers is available
+					handleEvent(this.nextInputGateToReadFrom.getCurrentEvent());
+					this.nextInputGateToReadFrom = null;
+					break;
+					
+				case END_OF_STREAM: // one gate is empty
+					this.remainingInputGates.remove(this.nextInputGateToReadFrom);
+					this.nextInputGateToReadFrom = null;
+					break;
+					
+				case NONE: // gate processed an internal event and could not return a record on this call
+					this.nextInputGateToReadFrom = null;
+					break;
+			}
+		}
+	}
+	
+	private InputGate<T> getNextAvailableInputGate() throws InterruptedException {
+		synchronized (this.availableInputGates) {
+			while (this.availableInputGates.isEmpty()) {
+				this.availableInputGates.wait();
+			}
+			return this.availableInputGates.pop();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/BufferWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/BufferWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/BufferWriter.java
new file mode 100644
index 0000000..a7b62e0
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/BufferWriter.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.event.task.EventListener;
+import eu.stratosphere.runtime.io.channels.EndOfSuperstepEvent;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+
+import java.io.IOException;
+
+public class BufferWriter {
+
+	protected final OutputGate outputGate;
+
+	public BufferWriter(AbstractInvokable invokable) {
+		this.outputGate = invokable.getEnvironment().createAndRegisterOutputGate();
+	}
+
+	public void sendBuffer(Buffer buffer, int targetChannel) throws IOException, InterruptedException {
+		this.outputGate.sendBuffer(buffer, targetChannel);
+	}
+
+	public void sendEvent(AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+		this.outputGate.sendEvent(event, targetChannel);
+	}
+
+	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event, int targetChannel) throws IOException, InterruptedException {
+		this.outputGate.sendBufferAndEvent(buffer, event, targetChannel);
+	}
+
+	public void broadcastBuffer(Buffer buffer) throws IOException, InterruptedException {
+		this.outputGate.broadcastBuffer(buffer);
+	}
+
+	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+		this.outputGate.broadcastEvent(event);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+		this.outputGate.subscribeToEvent(eventListener, eventType);
+	}
+
+	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+		this.outputGate.unsubscribeFromEvent(eventListener, eventType);
+	}
+
+	public void sendEndOfSuperstep() throws IOException, InterruptedException {
+		this.outputGate.broadcastEvent(EndOfSuperstepEvent.INSTANCE);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ChannelSelector.java
new file mode 100644
index 0000000..b9638ea
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ChannelSelector.java
@@ -0,0 +1,39 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+/**
+ * Objects implementing this interface are passed to an {@link eu.stratosphere.runtime.io.gates.OutputGate}. When a record is sent through the output
+ * gate, the channel selector object is called to determine to which {@link eu.stratosphere.runtime.io.channels.OutputChannel} objects the record
+ * shall be passed on.
+ * 
+ * @param <T>
+ *        the type of record which is sent through the attached output gate
+ */
+public interface ChannelSelector<T extends IOReadableWritable> {
+
+	/**
+	 * Called to determine to which attached {@link eu.stratosphere.runtime.io.channels.OutputChannel} objects the given record shall be forwarded.
+	 * 
+	 * @param record
+	 *        the record to the determine the output channels for
+	 * @param numberOfOutputChannels
+	 *        the total number of output channels which are attached to respective output gate
+	 * @return a (possibly empty) array of integer numbers which indicate the indices of the output channels through
+	 *         which the record shall be forwarded
+	 */
+	int[] selectChannels(T record, int numberOfOutputChannels);
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableReader.java
new file mode 100644
index 0000000..ddc080f
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableReader.java
@@ -0,0 +1,32 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+/**
+ * 
+ */
+public interface MutableReader<T extends IOReadableWritable> extends ReaderBase {
+	
+	/**
+	 * @param target
+	 * @return
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	boolean next(T target) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
new file mode 100644
index 0000000..9d03c7f
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableRecordReader.java
@@ -0,0 +1,120 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+
+public class MutableRecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements MutableReader<T> {
+	
+	private boolean endOfStream;
+	
+	
+	/**
+	 * Constructs a new mutable record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param taskBase The application that instantiated the record reader.
+	 */
+	public MutableRecordReader(final AbstractTask taskBase) {
+		super(taskBase);
+	}
+
+	/**
+	 * Constructs a new record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param outputBase The application that instantiated the record reader.
+	 */
+	public MutableRecordReader(final AbstractOutputTask outputBase) {
+		super(outputBase);
+	}
+
+	/**
+	 * Constructs a new record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param taskBase
+	 *        the application that instantiated the record reader
+	 * @param inputGateID
+	 *        The ID of the input gate that the reader reads from.
+	 */
+	public MutableRecordReader(final AbstractTask taskBase, final int inputGateID) {
+		super(taskBase);
+	}
+
+	/**
+	 * Constructs a new record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param outputBase
+	 *        the application that instantiated the record reader
+	 * @param inputGateID
+	 *        The ID of the input gate that the reader reads from.
+	 */
+	public MutableRecordReader(final AbstractOutputTask outputBase, final int inputGateID) {
+		super(outputBase);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	@Override
+	public boolean next(final T target) throws IOException, InterruptedException {
+		if (this.endOfStream) {
+			return false;
+			
+		}
+		while (true) {
+			InputChannelResult result = this.inputGate.readRecord(target);
+			switch (result) {
+				case INTERMEDIATE_RECORD_FROM_BUFFER:
+				case LAST_RECORD_FROM_BUFFER:
+					return true;
+					
+				case END_OF_SUPERSTEP:
+					if (incrementEndOfSuperstepEventAndCheck()) {
+						return false; // end of the superstep
+					}
+					else {
+						break; // fall through and wait for next record/event
+					}
+					
+				case TASK_EVENT:
+					handleEvent(this.inputGate.getCurrentEvent());
+					break;	// fall through to get next record
+				
+				case END_OF_STREAM:
+					this.endOfStream = true;
+					return false;
+					
+				default:
+					; // fall through to get next record
+			}
+		}
+	}
+	
+	@Override
+	public boolean isInputClosed() {
+		return this.endOfStream;
+	}
+
+	@Override
+	public void setIterative(int numEventsUntilEndOfSuperstep) {
+		// sanity check for debug purposes
+		if (numEventsUntilEndOfSuperstep != getNumberOfInputChannels()) {
+			throw new IllegalArgumentException("Number of events till end of superstep is different from the number of input channels.");
+		}
+		super.setIterative(numEventsUntilEndOfSuperstep);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableUnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableUnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableUnionRecordReader.java
new file mode 100644
index 0000000..e79c7bb
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/MutableUnionRecordReader.java
@@ -0,0 +1,37 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+public class MutableUnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements MutableReader<T> {
+
+	
+	/**
+	 * Constructs a new mutable union record reader.
+	 * 
+	 * @param recordReaders
+	 *        the individual mutable record readers whose input is used to construct the union
+	 */
+	public MutableUnionRecordReader(MutableRecordReader<T>[] recordReaders) {
+		super(recordReaders);
+	}
+
+	@Override
+	public boolean next(T target) throws IOException, InterruptedException {
+		return getNextRecord(target);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/Reader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/Reader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/Reader.java
new file mode 100644
index 0000000..ba0d3b6
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/Reader.java
@@ -0,0 +1,30 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+/**
+ * A reader interface to read records from an input.
+ * 
+ * @param <T> The type of the record that can be emitted with this record writer
+ */
+public interface Reader<T extends IOReadableWritable> extends ReaderBase {
+
+	boolean hasNext() throws IOException, InterruptedException;
+
+	T next() throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ReaderBase.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ReaderBase.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ReaderBase.java
new file mode 100644
index 0000000..17f6b7d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/ReaderBase.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.event.task.EventListener;
+
+
+/**
+ *
+ */
+public interface ReaderBase {
+
+	boolean isInputClosed();
+	
+	/**
+	 * Subscribes the listener object to receive events of the given type.
+	 * 
+	 * @param eventListener
+	 *        the listener object to register
+	 * @param eventType
+	 *        the type of event to register the listener for
+	 */
+	void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType);
+	
+	/**
+	 * Removes the subscription for events of the given type for the listener object.
+	 * 
+	 * @param eventListener
+	 *        the listener object to cancel the subscription for
+	 * @param eventType
+	 *        the type of the event to cancel the subscription for
+	 */
+	void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType);
+
+	/**
+	 * Publishes an event.
+	 * 
+	 * @param event
+	 *        the event to be published
+	 * @throws IOException
+	 *         thrown if an error occurs while transmitting the event
+	 * @throws InterruptedException
+	 *         thrown if the thread is interrupted while waiting for the event to be published
+	 */
+	void publishEvent(AbstractTaskEvent event) throws IOException, InterruptedException;
+	
+	
+	void setIterative(int numEventsUntilEndOfSuperstep);
+
+	
+	void startNextSuperstep();
+	
+	boolean hasReachedEndOfSuperstep();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
new file mode 100644
index 0000000..bb6a580
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordReader.java
@@ -0,0 +1,154 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.template.AbstractOutputTask;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+
+import java.io.IOException;
+
+/**
+ * A record writer connects an input gate to an application. It allows the application
+ * query for incoming records and read them from input gate.
+ * 
+ * @param <T> The type of the record that can be read from this record reader.
+ */
+public class RecordReader<T extends IOReadableWritable> extends AbstractSingleGateRecordReader<T> implements Reader<T> {
+	
+	private final Class<T> recordType;
+	
+	/**
+	 * Stores the last read record.
+	 */
+	private T lookahead;
+
+	/**
+	 * Stores if more no more records will be received from the assigned input gate.
+	 */
+	private boolean noMoreRecordsWillFollow;
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Constructs a new record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param taskBase
+	 *        The application that instantiated the record reader.
+	 * @param recordType
+	 *        The class of records that can be read from the record reader.
+	 */
+	public RecordReader(AbstractTask taskBase, Class<T> recordType) {
+		super(taskBase);
+		this.recordType = recordType;
+	}
+
+	/**
+	 * Constructs a new record reader and registers a new input gate with the application's environment.
+	 * 
+	 * @param outputBase
+	 *        The application that instantiated the record reader.
+	 * @param recordType
+	 *        The class of records that can be read from the record reader.
+	 */
+	public RecordReader(AbstractOutputTask outputBase, Class<T> recordType) {
+		super(outputBase);
+		this.recordType = recordType;
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Checks if at least one more record can be read from the associated input gate. This method may block
+	 * until the associated input gate is able to read the record from one of its input channels.
+	 * 
+	 * @return <code>true</code>it at least one more record can be read from the associated input gate, otherwise
+	 *         <code>false</code>
+	 */
+	@Override
+	public boolean hasNext() throws IOException, InterruptedException{
+		if (this.lookahead != null) {
+			return true;
+		} else {
+			if (this.noMoreRecordsWillFollow) {
+				return false;
+			}
+			
+			T record = instantiateRecordType();
+			
+			while (true) {
+				InputChannelResult result = this.inputGate.readRecord(record);
+				switch (result) {
+					case INTERMEDIATE_RECORD_FROM_BUFFER:
+					case LAST_RECORD_FROM_BUFFER:
+						this.lookahead = record;
+						return true;
+						
+					case END_OF_SUPERSTEP:
+						if (incrementEndOfSuperstepEventAndCheck()) {
+							return false;
+						}
+						else {
+							break; // fall through and wait for next record/event
+						}
+						
+					case TASK_EVENT:
+						handleEvent(this.inputGate.getCurrentEvent());
+						break;
+						
+					case END_OF_STREAM:
+						this.noMoreRecordsWillFollow = true;
+						return false;
+				
+					default:
+						; // fall through the loop
+				}
+			}
+		}
+	}
+
+	/**
+	 * Reads the current record from the associated input gate.
+	 * 
+	 * @return the current record from the associated input gate.
+	 * @throws IOException
+	 *         thrown if any error occurs while reading the record from the input gate
+	 */
+	@Override
+	public T next() throws IOException, InterruptedException {
+		if (hasNext()) {
+			T tmp = this.lookahead;
+			this.lookahead = null;
+			return tmp;
+		} else {
+			return null;
+		}
+	}
+	
+	@Override
+	public boolean isInputClosed() {
+		return this.noMoreRecordsWillFollow;
+	}
+	
+	private T instantiateRecordType() {
+		try {
+			return this.recordType.newInstance();
+		} catch (InstantiationException e) {
+			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
new file mode 100644
index 0000000..132dc14
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RecordWriter.java
@@ -0,0 +1,151 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.template.AbstractInputTask;
+import eu.stratosphere.nephele.template.AbstractInvokable;
+import eu.stratosphere.nephele.template.AbstractTask;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.EndOfSuperstepEvent;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.serialization.RecordSerializer;
+import eu.stratosphere.runtime.io.serialization.SpanningRecordSerializer;
+
+import java.io.IOException;
+
+/**
+ * A record writer connects the application to an output gate. It allows the application
+ * of emit (send out) to the output gate. The output gate will then take care of distributing
+ * the emitted records among the output channels.
+ * 
+ * @param <T>
+ *        the type of the record that can be emitted with this record writer
+ */
+public class RecordWriter<T extends IOReadableWritable> extends BufferWriter {
+
+	private final BufferProvider bufferPool;
+
+	private final ChannelSelector<T> channelSelector;
+
+	private int numChannels;
+
+	/** RecordSerializer per outgoing channel */
+	private RecordSerializer<T>[] serializers;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public RecordWriter(AbstractTask task) {
+		this((AbstractInvokable) task, new RoundRobinChannelSelector<T>());
+	}
+
+	public RecordWriter(AbstractTask task, ChannelSelector<T> channelSelector) {
+		this((AbstractInvokable) task, channelSelector);
+	}
+
+	public RecordWriter(AbstractInputTask<?> task) {
+		this((AbstractInvokable) task, new RoundRobinChannelSelector<T>());
+	}
+
+	public RecordWriter(AbstractInputTask<?> task, ChannelSelector<T> channelSelector) {
+		this((AbstractInvokable) task, channelSelector);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private RecordWriter(AbstractInvokable invokable, ChannelSelector<T> channelSelector) {
+		// initialize the gate
+		super(invokable);
+
+		this.bufferPool = invokable.getEnvironment().getOutputBufferProvider();
+		this.channelSelector = channelSelector;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("unchecked")
+	public void initializeSerializers() {
+		this.numChannels = this.outputGate.getNumChannels();
+		this.serializers = new RecordSerializer[numChannels];
+		for (int i = 0; i < this.numChannels; i++) {
+			this.serializers[i] = new SpanningRecordSerializer<T>();
+		}
+	}
+
+	public void emit(final T record) throws IOException, InterruptedException {
+		for (int targetChannel : this.channelSelector.selectChannels(record, this.numChannels)) {
+			// serialize with corresponding serializer and send full buffer
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			RecordSerializer.SerializationResult result = serializer.addRecord(record);
+			while (result.isFullBuffer()) {
+				Buffer buffer = serializer.getCurrentBuffer();
+				if (buffer != null) {
+					sendBuffer(buffer, targetChannel);
+				}
+
+				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+				result = serializer.setNextBuffer(buffer);
+			}
+		}
+	}
+
+	public void flush() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			Buffer buffer = serializer.getCurrentBuffer();
+			if (buffer != null) {
+				sendBuffer(buffer, targetChannel);
+			}
+
+			serializer.clear();
+		}
+	}
+
+	@Override
+	public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			Buffer buffer = serializer.getCurrentBuffer();
+			if (buffer == null) {
+				super.sendEvent(event, targetChannel);
+			} else {
+				super.sendBufferAndEvent(buffer, event, targetChannel);
+
+				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+				serializer.setNextBuffer(buffer);
+			}
+		}
+	}
+
+	@Override
+	public void sendEndOfSuperstep() throws IOException, InterruptedException {
+		for (int targetChannel = 0; targetChannel < this.numChannels; targetChannel++) {
+			RecordSerializer<T> serializer = this.serializers[targetChannel];
+
+			Buffer buffer = serializer.getCurrentBuffer();
+			if (buffer == null) {
+				super.sendEvent(EndOfSuperstepEvent.INSTANCE, targetChannel);
+			} else {
+				super.sendBufferAndEvent(buffer, EndOfSuperstepEvent.INSTANCE, targetChannel);
+
+				buffer = this.bufferPool.requestBufferBlocking(this.bufferPool.getBufferSize());
+				serializer.setNextBuffer(buffer);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RoundRobinChannelSelector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RoundRobinChannelSelector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RoundRobinChannelSelector.java
new file mode 100644
index 0000000..aaa1506
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/RoundRobinChannelSelector.java
@@ -0,0 +1,47 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+/**
+ * This is the default implementation of the {@link ChannelSelector} interface. It represents a simple round-robin
+ * strategy, i.e. regardless of the record every attached exactly one output channel is selected at a time.
+
+ * @param <T>
+ *        the type of record which is sent through the attached output gate
+ */
+public class RoundRobinChannelSelector<T extends IOReadableWritable> implements ChannelSelector<T> {
+
+	/**
+	 * Stores the index of the channel to send the next record to.
+	 */
+	private final int[] nextChannelToSendTo = new int[1];
+
+	/**
+	 * Constructs a new default channel selector.
+	 */
+	public RoundRobinChannelSelector() {
+		this.nextChannelToSendTo[0] = 0;
+	}
+
+
+	@Override
+	public int[] selectChannels(final T record, final int numberOfOutputChannels) {
+
+		this.nextChannelToSendTo[0] = (this.nextChannelToSendTo[0] + 1) % numberOfOutputChannels;
+
+		return this.nextChannelToSendTo;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/UnionRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/UnionRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/UnionRecordReader.java
new file mode 100644
index 0000000..85ce389
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/UnionRecordReader.java
@@ -0,0 +1,67 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import java.io.IOException;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+
+public final class UnionRecordReader<T extends IOReadableWritable> extends AbstractUnionRecordReader<T> implements Reader<T> {
+	
+	private final Class<T> recordType;
+	
+	private T lookahead;
+	
+
+	public UnionRecordReader(MutableRecordReader<T>[] recordReaders, Class<T> recordType) {
+		super(recordReaders);
+		this.recordType = recordType;
+	}
+
+	@Override
+	public boolean hasNext() throws IOException, InterruptedException {
+		if (this.lookahead != null) {
+			return true;
+		} else {
+			T record = instantiateRecordType();
+			if (getNextRecord(record)) {
+				this.lookahead = record;
+				return true;
+			} else {
+				return false;
+			}
+		}
+	}
+
+	@Override
+	public T next() throws IOException, InterruptedException {
+		if (hasNext()) {
+			T tmp = this.lookahead;
+			this.lookahead = null;
+			return tmp;
+		} else {
+			return null;
+		}
+	}
+	
+	private T instantiateRecordType() {
+		try {
+			return this.recordType.newInstance();
+		} catch (InstantiationException e) {
+			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
+		} catch (IllegalAccessException e) {
+			throw new RuntimeException("Cannot instantiate class '" + this.recordType.getName() + "'.", e);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/BufferOrEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/BufferOrEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/BufferOrEvent.java
new file mode 100644
index 0000000..2d51d13
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/BufferOrEvent.java
@@ -0,0 +1,52 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.channels;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+
+/**
+ * Either type for {@link eu.stratosphere.runtime.io.Buffer} and {@link AbstractEvent}.
+ */
+public class BufferOrEvent {
+	
+	private final Buffer buffer;
+	
+	private final AbstractEvent event;
+	
+	public BufferOrEvent(Buffer buffer) {
+		this.buffer = buffer;
+		this.event = null;
+	}
+	
+	public BufferOrEvent(AbstractEvent event) {
+		this.buffer = null;
+		this.event = event;
+	}
+	
+	public boolean isBuffer() {
+		return this.buffer != null;
+	}
+	
+	public boolean isEvent() {
+		return this.event != null;
+	}
+	
+	public Buffer getBuffer() {
+		return this.buffer;
+	}
+	
+	public AbstractEvent getEvent() {
+		return this.event;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
new file mode 100644
index 0000000..dfa5d5e
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/Channel.java
@@ -0,0 +1,97 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import java.io.IOException;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeDispatcher;
+
+/**
+ * The base class for channel objects.
+ * <p>
+ * Every channel has an index (at the corresponding gate), ID, and type. The connected channel is given by the ID of
+ * destination channel.
+ */
+public abstract class Channel {
+
+	private final ChannelID id;
+
+	private final ChannelID connectedId;
+
+	private final int index;
+
+	private final ChannelType type;
+
+	protected EnvelopeDispatcher envelopeDispatcher;
+
+	/**
+	 * Auxiliary constructor for channels
+	 * 
+	 * @param index the index of the channel in either the output or input gate
+	 * @param id the ID of the channel
+	 * @param connectedId the ID of the channel this channel is connected to
+	 */
+	protected Channel(int index, ChannelID id, ChannelID connectedId, ChannelType type) {
+		this.index = index;
+		this.id = id;
+		this.connectedId = connectedId;
+		this.type = type;
+	}
+
+	public int getIndex() {
+		return this.index;
+	}
+
+	public ChannelID getID() {
+		return this.id;
+	}
+
+	public ChannelID getConnectedId() {
+		return this.connectedId;
+	}
+
+	public ChannelType getChannelType() {
+		return this.type;
+	}
+
+	/**
+	 * Registers an EnvelopeDispatcher with this channel at runtime.
+	 *
+	 * @param envelopeDispatcher the envelope dispatcher to use for data transfers
+	 */
+	public void registerEnvelopeDispatcher(EnvelopeDispatcher envelopeDispatcher) {
+		this.envelopeDispatcher = envelopeDispatcher;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public abstract JobID getJobID();
+
+	public abstract boolean isInputChannel();
+
+	public abstract boolean isClosed() throws IOException, InterruptedException;
+
+	public abstract void transferEvent(AbstractEvent event) throws IOException, InterruptedException;
+
+	public abstract void queueEnvelope(Envelope envelope);
+
+	// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
+	public abstract void releaseAllResources();
+
+	// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
+	public abstract void destroy();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelCloseEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelCloseEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelCloseEvent.java
new file mode 100644
index 0000000..dcdcbb2
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelCloseEvent.java
@@ -0,0 +1,33 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+
+public final class ChannelCloseEvent extends AbstractEvent {
+
+	@Override
+	public void read(DataInput in) throws IOException {
+		// Nothing to do here
+	}
+
+	@Override
+	public void write(DataOutput out) throws IOException {
+		// Nothing to do here
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
new file mode 100644
index 0000000..66be7de
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelID.java
@@ -0,0 +1,39 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import java.nio.ByteBuffer;
+
+import eu.stratosphere.nephele.AbstractID;
+
+public class ChannelID extends AbstractID {
+
+	public ChannelID() {
+		super();
+	}
+
+	public ChannelID(long lowerPart, long upperPart) {
+		super(lowerPart, upperPart);
+	}
+
+	public ChannelID(byte[] bytes) {
+		super(bytes);
+	}
+
+	public static ChannelID fromByteBuffer(ByteBuffer buf, int offset) {
+		long lower = buf.getLong(offset);
+		long upper = buf.getLong(offset + 8);
+		return new ChannelID(lower, upper);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
new file mode 100644
index 0000000..5d5b53d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ChannelType.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+/**
+ * An enumeration for the channel types.
+ */
+public enum ChannelType {
+	
+	/** Network channels */
+	NETWORK,
+
+	/** In-memory channels */
+	IN_MEMORY
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/EndOfSuperstepEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/EndOfSuperstepEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/EndOfSuperstepEvent.java
new file mode 100644
index 0000000..e5f9589
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/EndOfSuperstepEvent.java
@@ -0,0 +1,34 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+
+/**
+ * Marks the end of a superstep of one particular iteration head
+ */
+public class EndOfSuperstepEvent extends AbstractEvent {
+	
+	public static final EndOfSuperstepEvent INSTANCE = new EndOfSuperstepEvent();
+
+	@Override
+	public void write(DataOutput out) throws IOException {}
+
+	@Override
+	public void read(DataInput in) throws IOException {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
new file mode 100644
index 0000000..860141d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/InputChannel.java
@@ -0,0 +1,493 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.gates.InputChannelResult;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.gates.InputGate;
+import eu.stratosphere.runtime.io.serialization.AdaptiveSpanningRecordDeserializer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer;
+import eu.stratosphere.runtime.io.serialization.RecordDeserializer.DeserializationResult;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * InputChannel is an abstract base class to all different kinds of concrete
+ * input channels that can be used. Input channels are always parameterized to
+ * a specific type that can be transported through the channel.
+
+ * @param <T> The Type of the record that can be transported through the channel.
+ */
+public class InputChannel<T extends IOReadableWritable> extends Channel implements BufferProvider {
+
+	private final InputGate<T> inputGate;
+
+	/**
+	 * The log object used to report warnings and errors.
+	 */
+	private static final Log LOG = LogFactory.getLog(InputChannel.class);
+
+	/**
+	 * The deserializer used to deserialize records.
+	 */
+	private final RecordDeserializer<T> deserializer;
+
+	/**
+	 * Buffer for the uncompressed (raw) data.
+	 */
+	private Buffer dataBuffer;
+
+	private AbstractTaskEvent currentEvent;
+
+	/**
+	 * The exception observed in this channel while processing the buffers. Checked and thrown
+	 * per-buffer.
+	 */
+	private volatile IOException ioException;
+
+	/**
+	 * Stores the number of bytes read through this input channel since its instantiation.
+	 */
+	private long amountOfDataTransmitted;
+
+	private volatile boolean brokerAggreedToCloseChannel;
+
+	// -------------------------------------------------------------------------------------------
+
+	private int lastReceivedEnvelope = -1;
+
+	private boolean destroyCalled = false;
+
+	// ----------------------
+
+	private Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+
+	private Iterator<AbstractEvent> pendingEvents;
+
+	/**
+	 * Constructs an input channel with a given input gate associated.
+	 * 
+	 * @param inputGate
+	 *        the input gate this channel is connected to
+	 * @param channelIndex
+	 *        the index of the channel in the input gate
+	 * @param channelID
+	 *        the ID of the channel
+	 * @param connectedChannelID
+	 *        the ID of the channel this channel is connected to
+	 */
+	public InputChannel(final InputGate<T> inputGate, final int channelIndex, final ChannelID channelID,
+						   final ChannelID connectedChannelID, ChannelType type) {
+		super(channelIndex, channelID, connectedChannelID, type);
+		this.inputGate = inputGate;
+		this.deserializer = new AdaptiveSpanningRecordDeserializer<T>();
+	}
+
+	/**
+	 * Returns the input gate associated with the input channel.
+	 * 
+	 * @return the input gate associated with the input channel.
+	 */
+	public InputGate<T> getInputGate() {
+		return this.inputGate;
+	}
+
+	/**
+	 * Reads a record from the input channel. If currently no record is available the method
+	 * returns <code>null</code>. If the channel is closed (i.e. no more records will be received), the method
+	 * throws an {@link EOFException}.
+	 * 
+	 * @return a record that has been transported through the channel or <code>null</code> if currently no record is
+	 *         available
+	 * @throws IOException
+	 *         thrown if the input channel is already closed {@link EOFException} or a transmission error has occurred
+	 */
+//	public abstract InputChannelResult readRecord(T target) throws IOException;
+
+	/**
+	 * Immediately closes the input channel. The corresponding output channels are
+	 * notified if necessary. Any remaining records in any buffers or queue is considered
+	 * irrelevant and is discarded.
+	 * 
+	 * @throws InterruptedException
+	 *         thrown if the thread is interrupted while waiting for the channel to close
+	 * @throws IOException
+	 *         thrown if an I/O error occurs while closing the channel
+	 */
+//	public abstract void close() throws IOException, InterruptedException;
+
+
+
+	@Override
+	public boolean isInputChannel() {
+		return true;
+	}
+
+
+	@Override
+	public JobID getJobID() {
+		return this.inputGate.getJobID();
+	}
+
+//	public abstract AbstractTaskEvent getCurrentEvent();
+
+	public InputChannelResult readRecord(T target) throws IOException {
+		if (this.dataBuffer == null) {
+			if (isClosed()) {
+				return InputChannelResult.END_OF_STREAM;
+			}
+
+			// get the next element we need to handle (buffer or event)
+			BufferOrEvent boe = getNextBufferOrEvent();
+
+			if (boe == null) {
+				throw new IllegalStateException("Input channel was queries for data even though none was announced available.");
+			}
+
+			// handle events
+			if (boe.isEvent())
+			{
+				// sanity check: an event may only come after a complete record.
+				if (this.deserializer.hasUnfinishedData()) {
+					throw new IOException("Channel received an event before completing the current partial record.");
+				}
+
+				AbstractEvent evt = boe.getEvent();
+				if (evt.getClass() == ChannelCloseEvent.class) {
+					this.brokerAggreedToCloseChannel = true;
+					return InputChannelResult.END_OF_STREAM;
+				}
+				else if (evt.getClass() == EndOfSuperstepEvent.class) {
+					return InputChannelResult.END_OF_SUPERSTEP;
+				}
+				else if (evt instanceof AbstractTaskEvent) {
+					this.currentEvent = (AbstractTaskEvent) evt;
+					return InputChannelResult.TASK_EVENT;
+				}
+				else {
+					LOG.error("Received unknown event: " + evt);
+					return InputChannelResult.NONE;
+				}
+			} else {
+				// buffer case
+				this.dataBuffer = boe.getBuffer();
+				this.deserializer.setNextMemorySegment(this.dataBuffer.getMemorySegment(), this.dataBuffer.size());
+			}
+		}
+
+
+		DeserializationResult deserializationResult = this.deserializer.getNextRecord(target);
+
+		if (deserializationResult.isBufferConsumed()) {
+			releasedConsumedReadBuffer(this.dataBuffer);
+			this.dataBuffer = null;
+		}
+
+		if (deserializationResult == DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER) {
+			return InputChannelResult.INTERMEDIATE_RECORD_FROM_BUFFER;
+		} else if (deserializationResult == DeserializationResult.LAST_RECORD_FROM_BUFFER) {
+			return InputChannelResult.LAST_RECORD_FROM_BUFFER;
+		} else if (deserializationResult == DeserializationResult.PARTIAL_RECORD) {
+			return InputChannelResult.NONE;
+		} else {
+			throw new IllegalStateException();
+		}
+	}
+
+	@Override
+	public ChannelType getChannelType() {
+		return null;
+	}
+
+	@Override
+	public boolean isClosed() throws IOException{
+		if (this.ioException != null) {
+			throw new IOException("An error occurred in the channel: " + this.ioException.getMessage(), this.ioException);
+		} else {
+			return this.brokerAggreedToCloseChannel;
+		}
+	}
+
+	public void close() throws IOException, InterruptedException {
+
+		this.deserializer.clear();
+		if (this.dataBuffer != null) {
+			releasedConsumedReadBuffer(this.dataBuffer);
+			this.dataBuffer = null;
+		}
+
+		// This code fragment makes sure the isClosed method works in case the channel input has not been fully consumed
+		while (!this.brokerAggreedToCloseChannel)
+		{
+			BufferOrEvent next = getNextBufferOrEvent();
+			if (next != null) {
+				if (next.isEvent()) {
+					if (next.getEvent() instanceof ChannelCloseEvent) {
+						this.brokerAggreedToCloseChannel = true;
+					}
+				} else {
+					releasedConsumedReadBuffer(next.getBuffer());
+				}
+			} else {
+				Thread.sleep(200);
+			}
+		}
+
+		// Send close event to indicate the input channel has successfully
+		// processed all data it is interested in.
+		transferEventToOutputChannel(new ChannelCloseEvent());
+	}
+
+
+	private void releasedConsumedReadBuffer(Buffer buffer) {
+		this.amountOfDataTransmitted += buffer.size();
+		buffer.recycleBuffer();
+	}
+
+
+	public void notifyGateThatInputIsAvailable() {
+		this.getInputGate().notifyRecordIsAvailable(getIndex());
+	}
+
+
+	@Override
+	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
+		transferEventToOutputChannel(event);
+	}
+
+
+	public void reportIOException(IOException ioe) {
+		this.ioException = ioe;
+	}
+
+
+	@Override
+	public void releaseAllResources() {
+		this.brokerAggreedToCloseChannel = true;
+		this.deserializer.clear();
+
+		// The buffers are recycled by the input channel wrapper
+	}
+
+	/**
+	 * Notify the channel that a data unit has been consumed.
+	 */
+	public void notifyDataUnitConsumed() {
+		this.getInputGate().notifyDataUnitConsumed(getIndex());
+	}
+
+	public AbstractTaskEvent getCurrentEvent() {
+		AbstractTaskEvent e = this.currentEvent;
+		this.currentEvent = null;
+		return e;
+	}
+
+	// InputChannelContext
+
+	@Override
+	public void queueEnvelope(Envelope envelope) {
+		// The sequence number of the envelope to be queued
+		final int sequenceNumber = envelope.getSequenceNumber();
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (this.destroyCalled) {
+				final Buffer buffer = envelope.getBuffer();
+				if (buffer != null) {
+					buffer.recycleBuffer();
+				}
+				return;
+			}
+
+			final int expectedSequenceNumber = this.lastReceivedEnvelope + 1;
+			if (sequenceNumber != expectedSequenceNumber) {
+				// This is a problem, now we are actually missing some data
+				reportIOException(new IOException("Expected data packet " + expectedSequenceNumber + " but received " + sequenceNumber));
+
+				// notify that something (an exception) is available
+				notifyGateThatInputIsAvailable();
+
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Input channel " + this.toString() + " expected envelope " + expectedSequenceNumber
+							+ " but received " + sequenceNumber);
+				}
+
+				// rescue the buffer
+				final Buffer buffer = envelope.getBuffer();
+				if (buffer != null) {
+					buffer.recycleBuffer();
+				}
+			} else {
+
+				this.queuedEnvelopes.add(envelope);
+				this.lastReceivedEnvelope = sequenceNumber;
+
+				// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
+				if (envelope.getBuffer() != null) {
+					notifyGateThatInputIsAvailable();
+				}
+
+				List<? extends AbstractEvent> events = envelope.deserializeEvents();
+
+				if (events != null) {
+					for (int i = 0; i < events.size(); i++) {
+						notifyGateThatInputIsAvailable();
+					}
+				}
+			}
+		}
+	}
+
+	@Override
+	public void destroy() {
+		final Queue<Buffer> buffersToRecycle = new ArrayDeque<Buffer>();
+
+		synchronized (this.queuedEnvelopes) {
+			this.destroyCalled = true;
+
+			while (!this.queuedEnvelopes.isEmpty()) {
+				final Envelope envelope = this.queuedEnvelopes.poll();
+				if (envelope.getBuffer() != null) {
+					buffersToRecycle.add(envelope.getBuffer());
+				}
+			}
+		}
+
+		while (!buffersToRecycle.isEmpty()) {
+			buffersToRecycle.poll().recycleBuffer();
+		}
+	}
+
+	public void logQueuedEnvelopes() {
+		int numberOfQueuedEnvelopes = 0;
+		int numberOfQueuedEnvelopesWithMemoryBuffers = 0;
+		int numberOfQueuedEnvelopesWithFileBuffers = 0;
+
+		synchronized (this.queuedEnvelopes) {
+
+			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
+			while (it.hasNext()) {
+
+				final Envelope envelope = it.next();
+				++numberOfQueuedEnvelopes;
+				final Buffer buffer = envelope.getBuffer();
+				if (buffer == null) {
+					continue;
+				}
+
+				++numberOfQueuedEnvelopesWithMemoryBuffers;
+			}
+		}
+
+		System.out.println("\t\t" + this.toString() + ": " + numberOfQueuedEnvelopes + " ("
+				+ numberOfQueuedEnvelopesWithMemoryBuffers + ", " + numberOfQueuedEnvelopesWithFileBuffers + ")");
+
+	}
+
+	@Override
+	public Buffer requestBuffer(int minBufferSize) throws IOException {
+		return this.inputGate.requestBuffer(minBufferSize);
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+		return this.inputGate.requestBufferBlocking(minBufferSize);
+	}
+
+	@Override
+	public int getBufferSize() {
+		return this.inputGate.getBufferSize();
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		this.inputGate.reportAsynchronousEvent();
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		return this.inputGate.registerBufferAvailabilityListener(listener);
+	}
+
+	// ChannelBroker
+
+	public BufferOrEvent getNextBufferOrEvent() throws IOException {
+		// return pending events first
+		if (this.pendingEvents != null) {
+			// if the field is not null, it must always have a next value!
+			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
+			if (!this.pendingEvents.hasNext()) {
+				this.pendingEvents = null;
+			}
+			return next;
+		}
+
+		// if no events are pending, get the next buffer
+		Envelope nextEnvelope;
+		synchronized (this.queuedEnvelopes) {
+			if (this.queuedEnvelopes.isEmpty()) {
+				return null;
+			}
+			nextEnvelope = this.queuedEnvelopes.poll();
+		}
+
+		// schedule events as pending, because events come always after the buffer!
+		List<AbstractEvent> events = (List<AbstractEvent>) nextEnvelope.deserializeEvents();
+		Iterator<AbstractEvent> eventsIt = events.iterator();
+		if (eventsIt.hasNext()) {
+			this.pendingEvents = eventsIt;
+		}
+
+		// get the buffer, if there is one
+		if (nextEnvelope.getBuffer() != null) {
+			return new BufferOrEvent(nextEnvelope.getBuffer());
+		}
+		else if (this.pendingEvents != null) {
+			// if the field is not null, it must always have a next value!
+			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
+			if (!this.pendingEvents.hasNext()) {
+				this.pendingEvents = null;
+			}
+
+			return next;
+		}
+		else {
+			// no buffer and no events, this should be an error
+			throw new IOException("Received an envelope with neither data nor events.");
+		}
+	}
+
+	public void transferEventToOutputChannel(AbstractEvent event) throws IOException, InterruptedException {
+		Envelope ephemeralEnvelope = new Envelope(0, getJobID(), getID());
+		ephemeralEnvelope.serializeEventList(Arrays.asList(event));
+
+		this.envelopeDispatcher.dispatchFromInputChannel(ephemeralEnvelope);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
new file mode 100644
index 0000000..7ca916c
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/OutputChannel.java
@@ -0,0 +1,193 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.channels;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.gates.OutputGate;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+
+public class OutputChannel extends Channel {
+
+	private static final Log LOG = LogFactory.getLog(OutputChannel.class);
+
+	private final Object closeLock = new Object();
+	
+	private final OutputGate outputGate;
+
+	private boolean senderCloseRequested;
+
+	private boolean receiverCloseRequested;
+
+	private int currentSeqNum;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a new output channel object.
+	 *
+	 * @param outputGate the output gate this channel is connected to
+	 * @param index the index of the channel in the output gate
+	 * @param id the ID of the channel
+	 * @param connectedId the ID of the channel this channel is connected to
+	 * @param type the type of this channel
+	 */
+	public OutputChannel(OutputGate outputGate, int index, ChannelID id, ChannelID connectedId, ChannelType type) {
+		super(index, id, connectedId, type);
+
+		this.outputGate = outputGate;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                           Data processing
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void sendBuffer(Buffer buffer) throws IOException, InterruptedException {
+		checkStatus();
+
+		// discard empty buffers
+		if (buffer.size() == 0) {
+			return;
+		}
+
+		Envelope envelope = createNextEnvelope();
+		envelope.setBuffer(buffer);
+		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
+	}
+
+	public void sendEvent(AbstractEvent event) throws IOException, InterruptedException {
+		checkStatus();
+
+		Envelope envelope = createNextEnvelope();
+		envelope.serializeEventList(Arrays.asList(event));
+		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
+	}
+
+	public void sendBufferAndEvent(Buffer buffer, AbstractEvent event) throws IOException, InterruptedException {
+		checkStatus();
+
+		Envelope envelope = createNextEnvelope();
+		envelope.setBuffer(buffer);
+		envelope.serializeEventList(Arrays.asList(event));
+		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                          Event processing
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public void queueEnvelope(Envelope envelope) {
+		if (envelope.hasBuffer()) {
+			throw new IllegalStateException("Envelope for OutputChannel has Buffer attached.");
+		}
+
+		for (AbstractEvent event : envelope.deserializeEvents()) {
+			if (event.getClass() == ChannelCloseEvent.class) {
+				synchronized (this.closeLock) {
+					this.receiverCloseRequested = true;
+					this.closeLock.notifyAll();
+				}
+				LOG.debug("OutputChannel received close event from target.");
+			} 
+			else if (event instanceof AbstractTaskEvent) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("OutputChannel received task event: " + event);
+				}
+				
+				this.outputGate.deliverEvent((AbstractTaskEvent) event);
+			}
+			else {
+				throw new RuntimeException("OutputChannel received an event that is neither close nor task event.");
+			}
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	//                                              Shutdown
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void requestClose() throws IOException, InterruptedException {
+		if (this.senderCloseRequested) {
+			return;
+		}
+
+		this.senderCloseRequested = true;
+
+		Envelope envelope = createNextEnvelope();
+		envelope.serializeEventList(Arrays.asList(new ChannelCloseEvent()));
+		this.envelopeDispatcher.dispatchFromOutputChannel(envelope);
+	}
+
+	@Override
+	public boolean isClosed() {
+		return this.senderCloseRequested && this.receiverCloseRequested;
+	}
+	
+	public void waitForChannelToBeClosed() throws InterruptedException {
+		synchronized (this.closeLock) {
+			while (!this.receiverCloseRequested) {
+				this.closeLock.wait(1000);
+			}
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public boolean isInputChannel() {
+		return false;
+	}
+
+	@Override
+	public JobID getJobID() {
+		return this.outputGate.getJobID();
+	}
+	
+	private void checkStatus() throws IOException {
+		if (this.senderCloseRequested) {
+			throw new IllegalStateException(String.format("Channel %s already requested to be closed.", getID()));
+		}
+		if (this.receiverCloseRequested) {
+			throw new ReceiverAlreadyClosedException();
+		}
+	}
+
+	private Envelope createNextEnvelope() {
+		return new Envelope(this.currentSeqNum++, getJobID(), getID());
+	}
+
+	@Override
+	public void transferEvent(AbstractEvent event) throws IOException, InterruptedException {
+		// TODO remove with pending changes for input side
+	}
+
+	@Override
+	public void releaseAllResources() {
+		// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
+	}
+
+	@Override
+	public void destroy() {
+		// nothing to do for buffer oriented runtime => TODO remove with pending changes for input side
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ReceiverAlreadyClosedException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ReceiverAlreadyClosedException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ReceiverAlreadyClosedException.java
new file mode 100644
index 0000000..d03179f
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/channels/ReceiverAlreadyClosedException.java
@@ -0,0 +1,22 @@
+/***********************************************************************************************************************
+ *
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ *
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io.channels;
+
+import java.io.IOException;
+
+
+public class ReceiverAlreadyClosedException extends IOException {
+	private static final long serialVersionUID = 1L;
+}


Mime
View raw message