flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [13/34] Offer buffer-oriented API for I/O (#25)
Date Tue, 10 Jun 2014 19:35:10 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProviderBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProviderBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProviderBroker.java
new file mode 100644
index 0000000..518fe47
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/BufferProviderBroker.java
@@ -0,0 +1,24 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import java.io.IOException;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+
+public interface BufferProviderBroker {
+
+	BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
new file mode 100644
index 0000000..2141017
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -0,0 +1,123 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import eu.stratosphere.core.memory.MemorySegment;
+
+/**
+ * A global buffer pool for the network stack.
+ * <p>
+ * All buffers used by the network stack come from this pool. Requests to this pool are mediated by instances of
+ * {@link LocalBufferPool}.
+ * <p>
+ * The size and number of buffers can be configured via the global system config.
+ */
+public final class GlobalBufferPool {
+
+	private final static Log LOG = LogFactory.getLog(GlobalBufferPool.class);
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/** Total number of buffers */
+	private final int numBuffers;
+
+	/** Size of each buffer (in bytes) */
+	private final int bufferSize;
+
+	/** The available buffers */
+	private final Queue<MemorySegment> buffers;
+
+	private boolean isDestroyed;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public GlobalBufferPool(int numBuffers, int bufferSize) {
+		this.numBuffers = numBuffers;
+		this.bufferSize = bufferSize;
+
+		this.buffers = new ArrayBlockingQueue<MemorySegment>(this.numBuffers);
+		for (int i = 0; i < this.numBuffers; i++) {
+			this.buffers.add(new MemorySegment(new byte[this.bufferSize]));
+		}
+
+		LOG.info(String.format("Initialized global buffer pool with %d buffers (%d bytes each).",
+				this.numBuffers, this.bufferSize));
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Requests a buffer <strong>from</strong> the pool.
+	 *
+	 * @return buffer from pool or <code>null</code>, if no buffer available
+	 */
+	public MemorySegment requestBuffer() {
+		return this.buffers.poll();
+	}
+
+	/**
+	 * Returns a buffer <em>to</em> the pool.
+	 *
+	 * @param buffer the buffer to be returned
+	 */
+	public void returnBuffer(MemorySegment buffer) {
+		this.buffers.add(buffer);
+	}
+
+	/**
+	 * Returns the size of buffers (in bytes).
+	 *
+	 * @return size of buffers (in bytes)
+	 */
+	public int getBufferSize() {
+		return this.bufferSize;
+	}
+
+	/**
+	 * Returns the total number of managed buffers.
+	 * 
+	 * @return total number of managed buffers
+	 */
+	public int numBuffers() {
+		return this.numBuffers;
+	}
+
+	/**
+	 * Returns the number of currently available buffers.
+	 * 
+	 * @return currently available number of buffers
+	 */
+	public int numAvailableBuffers() {
+		return this.buffers.size();
+	}
+
+	public synchronized void destroy() {
+		if (!this.isDestroyed) {
+			// mark as shutdown and release memory
+			this.isDestroyed = true;
+
+			for (MemorySegment buffer : this.buffers) {
+				buffer.free();
+			}
+
+			this.buffers.clear();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
new file mode 100644
index 0000000..e8aeb11
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPool.java
@@ -0,0 +1,306 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Queue;
+
+/**
+ * A buffer pool used to manage a designated number of buffers from a {@link GlobalBufferPool}.
+ * <p>
+ * A local buffer pool mediates buffer requests to the global buffer pool to ensure dead-lock free operation of the
+ * network stack by limiting the number of designated buffers per local buffer pool. It also implements the default
+ * mechanism for buffer recycling, which ensures that every buffer is ultimately returned to the global buffer pool.
+ */
+public final class LocalBufferPool implements BufferProvider {
+
+	private static final class LocalBufferPoolRecycler implements BufferRecycler {
+
+		private final LocalBufferPool bufferPool;
+
+		private LocalBufferPoolRecycler(LocalBufferPool bufferPool) {
+			this.bufferPool = bufferPool;
+		}
+
+		@Override
+		public void recycle(MemorySegment buffer) {
+			this.bufferPool.recycleBuffer(buffer);
+		}
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/** Time (ms) to wait before retry for blocking buffer requests */
+	private static final int WAIT_TIME = 100;
+
+	/** Global buffer pool to request buffers from */
+	private final GlobalBufferPool globalBufferPool;
+
+	/** Buffers managed by this local buffer pool */
+	private final Queue<MemorySegment> buffers = new ArrayDeque<MemorySegment>();
+
+	/** The recycler via which to return buffers to this local buffer pool */
+	private final LocalBufferPoolRecycler recycler;
+
+	/** Queue of buffer availability listeners */
+	private final Queue<BufferAvailabilityListener> listeners = new ArrayDeque<BufferAvailabilityListener>();
+
+	/** Size of each buffer in this pool (in bytes) */
+	private final int bufferSize;
+
+	/** Number of buffers assigned to this local buffer pool */
+	private int numDesignatedBuffers;
+
+	/** Number of buffers requested from the global buffer pool */
+	private int numRequestedBuffers;
+
+	/** Flag to indicate whether an asynchronous event has been reported */
+	private boolean hasAsyncEventOccurred;
+
+	/** Flag to indicate whether this local buffer pool has been destroyed */
+	private boolean isDestroyed;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public LocalBufferPool(GlobalBufferPool globalBufferPool, int numDesignatedBuffers) {
+		this.globalBufferPool = globalBufferPool;
+		this.bufferSize = globalBufferPool.getBufferSize();
+		this.numDesignatedBuffers = numDesignatedBuffers;
+
+		this.recycler = new LocalBufferPoolRecycler(this);
+	}
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public Buffer requestBuffer(int minBufferSize) throws IOException {
+		try {
+			return requestBuffer(minBufferSize, false);
+		} catch (InterruptedException e) {
+			throw new IOException("Unexpected InterruptedException while non-blocking buffer request.");
+		}
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
+		return requestBuffer(minBufferSize, true);
+	}
+
+	/**
+	 * Requests a buffer from this local buffer pool.
+	 * <p>
+	 * A non-blocking call to this method will only return a buffer, if one is available in the local pool after
+	 * having returned excess buffers. Otherwise, it will return <code>null</code>.
+	 * <p>
+	 * A blocking call will request a new buffer from the global buffer and block until one is available or an
+	 * asynchronous event has been reported via {@link #reportAsynchronousEvent()}.
+	 *
+	 * @param minBufferSize minimum size of requested buffer (in bytes)
+	 * @param isBlocking flag to indicate whether to block until buffer is available
+	 * @return buffer from the global buffer pool or <code>null</code>, if no buffer available
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	private Buffer requestBuffer(int minBufferSize, boolean isBlocking) throws IOException, InterruptedException {
+		if (minBufferSize > this.bufferSize) {
+			throw new IllegalArgumentException(String.format("Too large buffer requested (requested %d, maximum %d).",
+					minBufferSize, this.bufferSize));
+		}
+
+		while (true) {
+			boolean isAsyncRequest = false;
+
+			synchronized (this.buffers) {
+				// Return excess buffers to global buffer pool
+				while (this.numRequestedBuffers > this.numDesignatedBuffers) {
+					final MemorySegment buffer = this.buffers.poll();
+					if (buffer == null) {
+						break;
+					}
+
+					this.globalBufferPool.returnBuffer(buffer);
+					this.numRequestedBuffers--;
+				}
+
+				// Request buffers from global buffer pool
+				while (this.buffers.isEmpty()) {
+					if (this.numRequestedBuffers < this.numDesignatedBuffers) {
+						final MemorySegment buffer = this.globalBufferPool.requestBuffer();
+
+						if (buffer != null) {
+							this.buffers.add(buffer);
+
+							this.numRequestedBuffers++;
+							continue;
+						}
+					}
+
+					if (this.hasAsyncEventOccurred && isBlocking) {
+						this.hasAsyncEventOccurred = false;
+						isAsyncRequest = true;
+						break;
+					}
+
+					if (isBlocking) {
+						this.buffers.wait(WAIT_TIME);
+					} else {
+						return null;
+					}
+				}
+
+				if (!isAsyncRequest) {
+					return new Buffer(this.buffers.poll(), minBufferSize, this.recycler);
+				}
+			}
+		}
+	}
+
+	@Override
+	public int getBufferSize() {
+		return this.bufferSize;
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		synchronized (this.buffers) {
+			this.hasAsyncEventOccurred = true;
+			this.buffers.notify();
+		}
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		synchronized (this.buffers) {
+			if (!this.buffers.isEmpty()) {
+				return false;
+			}
+
+			if (this.isDestroyed) {
+				return false;
+			}
+
+			this.listeners.add(listener);
+		}
+
+		return true;
+	}
+
+	/**
+	 * Sets the designated number of buffers for this local buffer pool and returns excess buffers to the global buffer
+	 * pool.
+	 * <p>
+	 * The designated number of buffers determines how many buffers this buffer pool is allowed to manage. New buffers
+	 * can only be requested, if the requested number of buffers is less than the designated number. If possible, excess
+	 * buffers will be returned to the global buffer pool.
+	 *
+	 * @param numDesignatedBuffers number of buffers designated for this local buffer pool
+	 */
+	public void setNumDesignatedBuffers(int numDesignatedBuffers) {
+		synchronized (this.buffers) {
+			this.numDesignatedBuffers = numDesignatedBuffers;
+
+			// Return excess buffers to global buffer pool
+			while (this.numRequestedBuffers > this.numDesignatedBuffers) {
+				if (this.buffers.isEmpty()) {
+					break;
+				}
+
+				this.globalBufferPool.returnBuffer(this.buffers.poll());
+				this.numRequestedBuffers --;
+			}
+
+			this.buffers.notify();
+		}
+	}
+
+	/**
+	 * Returns the number of buffers available in the local buffer pool.
+	 *
+	 * @return number of available buffers
+	 */
+	public int numAvailableBuffers() {
+		synchronized (this.buffers) {
+			return this.buffers.size();
+		}
+	}
+
+	/**
+	 * Returns the number of buffers, which have been requested from the global buffer pool.
+	 *
+	 * @return number of buffers requested from the global buffer pool
+	 */
+	public int numRequestedBuffers() {
+		synchronized (this.buffers) {
+			return this.numRequestedBuffers;
+		}
+	}
+
+	/**
+	 * Returns the designated number of buffers for this local buffer pool.
+	 *
+	 * @return number of designated buffers for this buffer pool
+	 */
+	public int numDesignatedBuffers() {
+		synchronized (this.buffers) {
+			return this.numDesignatedBuffers;
+		}
+	}
+
+	/**
+	 * Destroys this local buffer pool and immediately returns all available buffers to the global buffer pool.
+	 * <p>
+	 * Buffers, which have been requested from this local buffer pool via <code>requestBuffer</code> cannot be returned
+	 * immediately and will be returned when the respective buffer is recycled (see {@link #recycleBuffer(MemorySegment)}).
+	 */
+	public void destroy() {
+		synchronized (this.buffers) {
+			if (this.isDestroyed) {
+				return;
+			}
+
+			this.isDestroyed = true;
+
+			// return all buffers
+			while (!this.buffers.isEmpty()) {
+				this.globalBufferPool.returnBuffer(this.buffers.poll());
+				this.numRequestedBuffers--;
+			}
+		}
+	}
+
+	/**
+	 * Returns a buffer to the buffer pool and notifies listeners about the availability of a new buffer.
+	 *
+	 * @param buffer buffer to return to the buffer pool
+	 */
+	private void recycleBuffer(MemorySegment buffer) {
+		synchronized (this.buffers) {
+			if (this.isDestroyed) {
+				this.globalBufferPool.returnBuffer(buffer);
+				this.numRequestedBuffers--;
+			} else {
+				this.buffers.add(buffer);
+				this.buffers.notify();
+
+				while (!this.listeners.isEmpty()) {
+					this.listeners.poll().bufferAvailable();
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
new file mode 100644
index 0000000..ccab2ca
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/LocalBufferPoolOwner.java
@@ -0,0 +1,56 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+/**
+ * A local buffer pool owner is an object which initially retrieves its buffers from the {@link GlobalBufferPool} and
+ * manages its fraction of the overall buffer pool locally by means of a {@link LocalBufferPool}.
+ * 
+ */
+public interface LocalBufferPoolOwner {
+
+	/**
+	 * Returns the number of byte-buffered channels that will retrieve their buffers from the local buffer pool.
+	 * 
+	 * @return the number of byte-buffered channels that will retrieve their buffers from the local buffer pool
+	 */
+	int getNumberOfChannels();
+
+	/**
+	 * Sets the designated number of buffers the local buffer pool owner is allowed to fetch from the global buffer pool
+	 * and manage locally by means of the {@link LocalBufferPool}.
+	 * 
+	 * @param numBuffers
+	 *        the numBuffers the local buffer pool owner is allowed to fetch from the global buffer pool
+	 */
+	void setDesignatedNumberOfBuffers(int numBuffers);
+
+	/**
+	 * Clears the local buffer pool and returns all buffers to the global buffer pool.
+	 */
+	void clearLocalBufferPool();
+
+	void registerGlobalBufferPool(GlobalBufferPool globalBufferPool);
+
+	/**
+	 * Logs the current status of the local buffer pool. This method is intended mainly for debugging purposes.
+	 */
+	void logBufferUtilization();
+
+	/**
+	 * Reports an asynchronous event. Calling this method interrupts each blocking method of the buffer pool owner and
+	 * allows the blocked thread to respond to the event.
+	 */
+	void reportAsynchronousEvent();
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
new file mode 100644
index 0000000..217a5ce
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/bufferprovider/SerialSingleBufferPool.java
@@ -0,0 +1,77 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.bufferprovider;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+/**
+ * 
+ */
+public final class SerialSingleBufferPool implements BufferProvider, BufferRecycler {
+	
+	private final Buffer buffer;
+
+	/** Size of the buffer in this pool */
+	private final int bufferSize;
+
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public SerialSingleBufferPool(int bufferSize) {
+		this.buffer = new Buffer(new MemorySegment(new byte[bufferSize]), bufferSize, this);
+		this.bufferSize = bufferSize;
+	}
+	
+	// -----------------------------------------------------------------------------------------------------------------
+
+	@Override
+	public Buffer requestBuffer(int minBufferSize) {
+		if (minBufferSize <= this.bufferSize) {
+			return this.buffer.duplicate();
+		}
+		else {
+			throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
+		}
+	}
+
+	@Override
+	public Buffer requestBufferBlocking(int minBufferSize) {
+		if (minBufferSize <= this.bufferSize) {
+			return this.buffer.duplicate();
+		}
+		else {
+			throw new IllegalArgumentException("Requesting buffer with size " + minBufferSize + ". Pool's buffer size is " + this.bufferSize);
+		}
+	}
+
+	@Override
+	public int getBufferSize() {
+		return this.bufferSize;
+	}
+
+	@Override
+	public void reportAsynchronousEvent() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public boolean registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public void recycle(MemorySegment buffer) {}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
new file mode 100644
index 0000000..a692aec
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/Envelope.java
@@ -0,0 +1,169 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.nephele.event.task.AbstractEvent;
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.serialization.DataInputDeserializer;
+import eu.stratosphere.runtime.io.serialization.DataOutputSerializer;
+import eu.stratosphere.util.InstantiationUtil;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public final class Envelope {
+
+	private final JobID jobID;
+
+	private final ChannelID source;
+
+	private final int sequenceNumber;
+
+	private ByteBuffer serializedEventList;
+
+	private Buffer buffer;
+
+	public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
+		this.sequenceNumber = sequenceNumber;
+		this.jobID = jobID;
+		this.source = source;
+	}
+
+	private Envelope(Envelope toDuplicate) {
+		this.jobID = toDuplicate.jobID;
+		this.source = toDuplicate.source;
+		this.sequenceNumber = toDuplicate.sequenceNumber;
+		this.serializedEventList = null;
+		this.buffer = null;
+	}
+
+	public Envelope duplicate() {
+		Envelope duplicate = new Envelope(this);
+		if (hasBuffer()) {
+			duplicate.setBuffer(this.buffer.duplicate());
+		}
+
+		return duplicate;
+	}
+
+	public Envelope duplicateWithoutBuffer() {
+		return new Envelope(this);
+	}
+
+	public JobID getJobID() {
+		return this.jobID;
+	}
+
+	public ChannelID getSource() {
+		return this.source;
+	}
+
+	public int getSequenceNumber() {
+		return this.sequenceNumber;
+	}
+
+	public void setEventsSerialized(ByteBuffer serializedEventList) {
+		if (this.serializedEventList != null)
+			throw new IllegalStateException("Event list has already been set.");
+
+		this.serializedEventList = serializedEventList;
+	}
+
+	public void serializeEventList(List<? extends AbstractEvent> eventList) {
+		if (this.serializedEventList != null)
+			throw new IllegalStateException("Event list has already been set.");
+
+		this.serializedEventList = serializeEvents(eventList);
+	}
+
+	public ByteBuffer getEventsSerialized() {
+		return this.serializedEventList;
+	}
+
+	public List<? extends AbstractEvent> deserializeEvents() {
+		return deserializeEvents(getClass().getClassLoader());
+	}
+
+	public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
+		if (this.serializedEventList == null) {
+			return Collections.emptyList();
+		}
+
+		try {
+			DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
+
+			int numEvents = deserializer.readInt();
+			ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
+
+			for (int i = 0; i < numEvents; i++) {
+				String className = deserializer.readUTF();
+				Class<? extends AbstractEvent> clazz;
+				try {
+					clazz = Class.forName(className).asSubclass(AbstractEvent.class);
+				} catch (ClassNotFoundException e) {
+					throw new RuntimeException("Could not load event class '" + className + "'.", e);
+				} catch (ClassCastException e) {
+					throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
+				}
+
+				AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
+				evt.read(deserializer);
+
+				events.add(evt);
+			}
+
+			return events;
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while deserializing the events.", e);
+		}
+	}
+
+	public void setBuffer(Buffer buffer) {
+		this.buffer = buffer;
+	}
+
+	public Buffer getBuffer() {
+		return this.buffer;
+	}
+
+	private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
+		try {
+			// create the serialized event list
+			DataOutputSerializer serializer = events.size() == 0
+				? new DataOutputSerializer(4)
+				: new DataOutputSerializer(events.size() * 32);
+			serializer.writeInt(events.size());
+
+			for (AbstractEvent evt : events) {
+				serializer.writeUTF(evt.getClass().getName());
+				evt.write(serializer);
+			}
+
+			return serializer.wrapAsByteBuffer();
+		}
+		catch (IOException e) {
+			throw new RuntimeException("Error while serializing the task events.", e);
+		}
+	}
+
+	public boolean hasBuffer() {
+		return this.buffer != null;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
new file mode 100644
index 0000000..2b69c0d
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeDispatcher.java
@@ -0,0 +1,46 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import java.io.IOException;
+
+/**
+ * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
+ */
+public interface EnvelopeDispatcher {
+
+	/**
+	 * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+	/**
+	 * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
+
+	/**
+	 * Dispatches an envelope from an incoming TCP connection.
+	 * <p>
+	 * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
+	 * receiving input channel.
+	 *
+	 * @param envelope envelope to be sent
+	 */
+	void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
new file mode 100644
index 0000000..7b7e178
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReader.java
@@ -0,0 +1,212 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.nephele.jobgraph.JobID;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProviderBroker;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.ReadableByteChannel;
+
+public class EnvelopeReader {
+
+	public enum DeserializationState {
+		COMPLETE,
+		PENDING,
+		NO_BUFFER_AVAILABLE;
+	}
+
+	private final BufferProviderBroker bufferProviderBroker;
+
+	private final ByteBuffer headerBuffer;
+
+	private ByteBuffer currentHeaderBuffer;
+
+	private ByteBuffer currentEventsList;
+
+	private ByteBuffer currentDataBuffer;
+
+	private int bufferRequestPendingWithSize;
+
+
+	private Envelope pendingEnvelope;
+
+	private Envelope constructedEnvelope;
+
+
+	public BufferProvider bufferProvider;
+
+	private JobID lastDeserializedJobID;
+
+	private ChannelID lastDeserializedSourceID;
+
+
+	public EnvelopeReader(BufferProviderBroker bufferProviderBroker) {
+		this.bufferProviderBroker = bufferProviderBroker;
+
+		this.headerBuffer = ByteBuffer.allocateDirect(EnvelopeWriter.HEADER_SIZE);
+		this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+
+		this.currentHeaderBuffer = this.headerBuffer;
+	}
+
+	public DeserializationState readNextChunk(ReadableByteChannel channel) throws IOException {
+
+		// 1) check if the header is pending
+		if (this.currentHeaderBuffer != null) {
+			ByteBuffer header = this.currentHeaderBuffer;
+
+			channel.read(header);
+			if (header.hasRemaining()) {
+				// not finished with the header
+				return DeserializationState.PENDING;
+			} else {
+				// header done, construct the envelope
+				this.currentHeaderBuffer = null;
+
+				Envelope env = constructEnvelopeFromHeader(header);
+				this.pendingEnvelope = env;
+
+				// check for events and data
+				int eventsSize = getEventListSize(header);
+				int bufferSize = getBufferSize(header);
+
+				// make the events list the next buffer to be read
+				if (eventsSize > 0) {
+					this.currentEventsList = ByteBuffer.allocate(eventsSize);
+				}
+
+				// if we have a data buffer, we need memory segment for it
+				// we may not immediately get the memory segment, though, so we first record
+				// that we need it
+				if (bufferSize > 0) {
+					this.bufferRequestPendingWithSize = bufferSize;
+				}
+			}
+		}
+
+		// 2) read the eventList, if it should have one
+		if (this.currentEventsList != null) {
+			channel.read(this.currentEventsList);
+			if (this.currentEventsList.hasRemaining()) {
+				// events list still incomplete
+				return DeserializationState.PENDING;
+			} else {
+				this.currentEventsList.flip();
+				this.pendingEnvelope.setEventsSerialized(this.currentEventsList);
+				this.currentEventsList = null;
+			}
+		}
+
+		// 3) check if we need to get a buffer
+		if (this.bufferRequestPendingWithSize > 0) {
+			Buffer b = getBufferForTarget(this.pendingEnvelope.getJobID(), this.pendingEnvelope.getSource(), this.bufferRequestPendingWithSize);
+			if (b == null) {
+				// no buffer available at this time. come back later
+				return DeserializationState.NO_BUFFER_AVAILABLE;
+			} else {
+				// buffer is available. set the field so the buffer will be filled
+				this.pendingEnvelope.setBuffer(b);
+				this.currentDataBuffer = b.getMemorySegment().wrap(0, this.bufferRequestPendingWithSize);
+				this.bufferRequestPendingWithSize = 0;
+			}
+		}
+
+		// 4) fill the buffer
+		if (this.currentDataBuffer != null) {
+			channel.read(this.currentDataBuffer);
+			if (this.currentDataBuffer.hasRemaining()) {
+				// data buffer incomplete
+				return DeserializationState.PENDING;
+			} else {
+				this.currentDataBuffer = null;
+			}
+		}
+
+		// if we get here, we completed our job, or did nothing, if the deserializer was not
+		// reset after the previous envelope
+		if (this.pendingEnvelope != null) {
+			this.constructedEnvelope = this.pendingEnvelope;
+			this.pendingEnvelope = null;
+			return DeserializationState.COMPLETE;
+		} else {
+			throw new IllegalStateException("Error: read() was called before reserializer was reset after the last envelope.");
+		}
+	}
+
+	private Envelope constructEnvelopeFromHeader(ByteBuffer header) throws IOException {
+		int magicNumber = header.getInt(EnvelopeWriter.MAGIC_NUMBER_OFFSET);
+
+		if (magicNumber != EnvelopeWriter.MAGIC_NUMBER) {
+			throw new IOException("Network stream corrupted: invalid magic number in envelope header.");
+		}
+
+		int seqNum = header.getInt(EnvelopeWriter.SEQUENCE_NUMBER_OFFSET);
+		JobID jid = JobID.fromByteBuffer(header, EnvelopeWriter.JOB_ID_OFFSET);
+		ChannelID cid = ChannelID.fromByteBuffer(header, EnvelopeWriter.CHANNEL_ID_OFFSET);
+		return new Envelope(seqNum, jid, cid);
+	}
+
+	private int getBufferSize(ByteBuffer header) {
+		return header.getInt(EnvelopeWriter.BUFFER_SIZE_OFFSET);
+	}
+
+	private int getEventListSize(ByteBuffer header) {
+		return header.getInt(EnvelopeWriter.EVENTS_SIZE_OFFSET);
+	}
+
+	private Buffer getBufferForTarget(JobID jid, ChannelID cid, int size) throws IOException {
+		if (!(jid.equals(this.lastDeserializedJobID) && cid.equals(this.lastDeserializedSourceID))) {
+			this.bufferProvider = this.bufferProviderBroker.getBufferProvider(jid, cid);
+			this.lastDeserializedJobID = jid;
+			this.lastDeserializedSourceID = cid;
+		}
+
+		return this.bufferProvider.requestBuffer(size);
+	}
+
+
+	public Envelope getFullyDeserializedTransferEnvelope() {
+		Envelope t = this.constructedEnvelope;
+		if (t == null) {
+			throw new IllegalStateException("Envelope has not yet been fully constructed.");
+		}
+
+		this.constructedEnvelope = null;
+		return t;
+	}
+
+	public void reset() {
+		this.headerBuffer.clear();
+		this.currentHeaderBuffer = this.headerBuffer;
+		this.constructedEnvelope = null;
+	}
+
+	public boolean hasUnfinishedData() {
+		return this.pendingEnvelope != null || this.currentHeaderBuffer != null;
+	}
+
+	public BufferProvider getBufferProvider() {
+		return bufferProvider;
+	}
+
+	public Envelope getPendingEnvelope() {
+		return pendingEnvelope;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
new file mode 100644
index 0000000..f99e1f2
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeReceiverList.java
@@ -0,0 +1,75 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import java.net.InetAddress;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.ConnectionInfoLookupResponse;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+
+/**
+ * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three d ifferent types of
+ * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
+ * {@link InetAddress} objects and finally checkpoints which are identified by
+ * <p>
+ * This class is thread-safe.
+ * 
+ */
+public class EnvelopeReceiverList {
+
+	private final ChannelID localReceiver;
+
+	private final RemoteReceiver remoteReceiver;
+
+	public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
+		this.localReceiver = cilr.getLocalTarget();
+		this.remoteReceiver = cilr.getRemoteTarget();
+	}
+
+	public EnvelopeReceiverList(ChannelID localReceiver) {
+		this.localReceiver = localReceiver;
+		this.remoteReceiver = null;
+	}
+
+	public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
+		this.localReceiver = null;
+		this.remoteReceiver = remoteReceiver;
+	}
+
+	public boolean hasLocalReceiver() {
+		return this.localReceiver != null;
+	}
+
+	public boolean hasRemoteReceiver() {
+		return this.remoteReceiver != null;
+	}
+
+	public int getTotalNumberOfReceivers() {
+		return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
+	}
+
+	public RemoteReceiver getRemoteReceiver() {
+		return this.remoteReceiver;
+	}
+
+	public ChannelID getLocalReceiver() {
+		return this.localReceiver;
+	}
+	
+	@Override
+	public String toString() {
+		return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
new file mode 100644
index 0000000..c00e61b
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/EnvelopeWriter.java
@@ -0,0 +1,134 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.nephele.AbstractID;
+import eu.stratosphere.runtime.io.Buffer;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.WritableByteChannel;
+
+public class EnvelopeWriter {
+
+	protected static final int MAGIC_NUMBER = 0xBADC0FFE;
+
+	/**
+	 * Size of the envelope header: 48 bytes = 4 bytes magic number, 4 bytes sequence number, 16 bytes job id,
+	 * 16 bytes sender id, 4 bytes bufferSize, 4 bytes event list length
+	 */
+	public static final int HEADER_SIZE = 4 + 4 + 2 * AbstractID.SIZE + 4 + 4;
+
+	public static final int MAGIC_NUMBER_OFFSET = 0;
+
+	public static final int SEQUENCE_NUMBER_OFFSET = 4;
+
+	public static final int JOB_ID_OFFSET = 8;
+
+	public static final int CHANNEL_ID_OFFSET = 24;
+
+	public static final int BUFFER_SIZE_OFFSET = 40;
+
+	public static final int EVENTS_SIZE_OFFSET = 44;
+
+	private ByteBuffer currentHeader;
+
+	private ByteBuffer currentEvents;
+
+	private ByteBuffer currentDataBuffer;
+
+	private final ByteBuffer headerBuffer;
+
+	public EnvelopeWriter() {
+		this.headerBuffer = ByteBuffer.allocateDirect(HEADER_SIZE);
+		this.headerBuffer.order(ByteOrder.LITTLE_ENDIAN);
+	}
+
+	/**
+	 * @param channel
+	 * @return True, if the writer has more pending data for the current envelope, false if not.
+	 *
+	 * @throws java.io.IOException
+	 */
+	public boolean writeNextChunk(WritableByteChannel channel) throws IOException {
+		// 1) check if the the header is still pending
+		if (this.currentHeader != null) {
+			channel.write(this.currentHeader);
+
+			if (this.currentHeader.hasRemaining()) {
+				// header was not fully written, so we can leave this method
+				return true;
+			} else {
+				this.currentHeader = null;
+			}
+		}
+
+		// 2) check if there are events pending
+		if (this.currentEvents != null) {
+			channel.write(this.currentEvents);
+			if (this.currentEvents.hasRemaining()) {
+				// events were not fully written, so leave this method
+				return true;
+			} else {
+				this.currentEvents = null;
+			}
+		}
+
+		// 3) write the data buffer
+		if (this.currentDataBuffer != null) {
+			channel.write(this.currentDataBuffer);
+			if (this.currentDataBuffer.hasRemaining()) {
+				return true;
+			} else {
+				this.currentDataBuffer = null;
+			}
+		}
+
+		return false;
+	}
+
+	public void setEnvelopeForWriting(Envelope env) {
+		// header
+		constructHeader(env);
+		this.currentHeader = this.headerBuffer;
+
+		// events (possibly null)
+		this.currentEvents = env.getEventsSerialized();
+
+		// data buffer (possibly null)
+		Buffer buf = env.getBuffer();
+		if (buf != null && buf.size() > 0) {
+			this.currentDataBuffer = buf.getMemorySegment().wrap(0, buf.size());
+		}
+	}
+
+	private void constructHeader(Envelope env) {
+		final ByteBuffer buf = this.headerBuffer;
+
+		buf.clear();							// reset
+		buf.putInt(MAGIC_NUMBER);
+		buf.putInt(env.getSequenceNumber());	// sequence number (4 bytes)
+		env.getJobID().write(buf);				// job Id (16 bytes)
+		env.getSource().write(buf);				// producerId (16 bytes)
+
+		// buffer size
+		buf.putInt(env.getBuffer() == null ? 0 : env.getBuffer().size());
+
+		// size of event list
+		buf.putInt(env.getEventsSerialized() == null ? 0 : env.getEventsSerialized().remaining());
+
+		buf.flip();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
new file mode 100644
index 0000000..f7d49bf
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/envelope/NoBufferAvailableException.java
@@ -0,0 +1,53 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.envelope;
+
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferProvider;
+
+/**
+ * This exception is thrown to indicate that the deserialization process of a {@link Envelope} could not be
+ * continued because a {@link Buffer} to store the envelope's content is currently not available.
+ * 
+ */
+public final class NoBufferAvailableException extends Exception {
+
+	/**
+	 * Generated serial UID.
+	 */
+	private static final long serialVersionUID = -9164212953646457026L;
+
+	/**
+	 * The buffer provider which could not deliver a buffer.
+	 */
+	private final BufferProvider bufferProvider;
+
+	/**
+	 * Constructs a new exception.
+	 * 
+	 * @param bufferProvider
+	 *        the buffer provider which could not deliver a buffer
+	 */
+	public NoBufferAvailableException(final BufferProvider bufferProvider) {
+		this.bufferProvider = bufferProvider;
+	}
+
+	/**
+	 * Returns the buffer provider which could not deliver a buffer.
+	 * 
+	 * @return the buffer provider which could not deliver a buffer
+	 */
+	public BufferProvider getBufferProvider() {
+		return this.bufferProvider;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
new file mode 100644
index 0000000..f22e6f7
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnection.java
@@ -0,0 +1,115 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.tcp;
+
+import eu.stratosphere.runtime.io.Buffer;
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeReader.DeserializationState;
+import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
+import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SelectionKey;
+
+/**
+ * An incoming TCP connection through which data is read and transformed into {@link Envelope} objects.
+ */
+public class IncomingConnection {
+
+	private static final Log LOG = LogFactory.getLog(IncomingConnection.class);
+
+	/** Readable byte channel (TCP socket) to read data from */
+	private final ReadableByteChannel channel;
+
+	/** Channel manager to dispatch complete envelopes */
+	private final ChannelManager channelManager;
+
+	/** Envelope reader to turn the channel data into envelopes */
+	private final EnvelopeReader reader;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public IncomingConnection(ReadableByteChannel channel, ChannelManager channelManager) {
+		this.channel = channel;
+		this.channelManager = channelManager;
+		this.reader = new EnvelopeReader(channelManager);
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public void read() throws IOException, InterruptedException, NoBufferAvailableException {
+		DeserializationState deserializationState = this.reader.readNextChunk(this.channel);
+
+		switch (deserializationState) {
+			case COMPLETE:
+				Envelope envelope = this.reader.getFullyDeserializedTransferEnvelope();
+				this.channelManager.dispatchFromNetwork(envelope);
+				this.reader.reset();
+				break;
+
+			case NO_BUFFER_AVAILABLE:
+				throw new NoBufferAvailableException(this.reader.getBufferProvider());
+
+			case PENDING:
+				break;
+		}
+	}
+
+	public void reportTransmissionProblem(SelectionKey key, IOException ioe) {
+		LOG.error(StringUtils.stringifyException(ioe));
+
+		try {
+			this.channel.close();
+		} catch (IOException e) {
+			LOG.debug("An error occurred while closing the byte channel");
+		}
+
+		if (key != null) {
+			key.cancel();
+		}
+
+		Envelope pendingEnvelope = this.reader.getPendingEnvelope();
+		if (pendingEnvelope != null) {
+			if (pendingEnvelope.hasBuffer()) {
+				Buffer buffer = pendingEnvelope.getBuffer();
+				if (buffer != null) {
+					buffer.recycleBuffer();
+				}
+			}
+		}
+
+		this.reader.reset();
+	}
+
+	public boolean isCloseUnexpected() {
+		return this.reader.hasUnfinishedData();
+	}
+
+	public void closeConnection(SelectionKey key) {
+		try {
+			this.channel.close();
+		} catch (IOException ioe) {
+			LOG.error("An IOException occurred while closing the socket: + " + StringUtils.stringifyException(ioe));
+		}
+
+		if (key != null) {
+			key.cancel();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
new file mode 100644
index 0000000..774ad4e
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/IncomingConnectionThread.java
@@ -0,0 +1,226 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.tcp;
+
+import eu.stratosphere.runtime.io.network.ChannelManager;
+import eu.stratosphere.runtime.io.network.bufferprovider.BufferAvailabilityListener;
+import eu.stratosphere.runtime.io.network.envelope.NoBufferAvailableException;
+import eu.stratosphere.util.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+
+public class IncomingConnectionThread extends Thread {
+
+	private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
+
+	private final ChannelManager channelManager;
+
+	private final Selector selector;
+
+	private final Queue<SelectionKey> pendingReadEventSubscribeRequests = new ArrayDeque<SelectionKey>();
+
+	private final ServerSocketChannel listeningSocket;
+
+	private static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
+
+		private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
+
+		private final SelectionKey key;
+
+		private IncomingConnectionBufferAvailListener(final Queue<SelectionKey> pendingReadEventSubscribeRequests,
+				final SelectionKey key) {
+
+			this.pendingReadEventSubscribeRequests = pendingReadEventSubscribeRequests;
+			this.key = key;
+		}
+
+		@Override
+		public void bufferAvailable() {
+
+			synchronized (this.pendingReadEventSubscribeRequests) {
+				this.pendingReadEventSubscribeRequests.add(this.key);
+			}
+		}
+	}
+
+	public IncomingConnectionThread(ChannelManager channelManager,
+			boolean isListeningThread, InetSocketAddress listeningAddress) throws IOException {
+		super("Incoming Connection Thread");
+
+		this.selector = Selector.open();
+		this.channelManager = channelManager;
+
+		if (isListeningThread) {
+			this.listeningSocket = ServerSocketChannel.open();
+			this.listeningSocket.configureBlocking(false);
+			listeningSocket.register(this.selector, SelectionKey.OP_ACCEPT);
+			this.listeningSocket.socket().bind(listeningAddress);
+			LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
+		} else {
+			this.listeningSocket = null;
+		}
+	}
+
+	@Override
+	public void run() {
+		try {
+			while (!this.isInterrupted()) {
+	
+				synchronized (this.pendingReadEventSubscribeRequests) {
+					while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
+						final SelectionKey key = this.pendingReadEventSubscribeRequests.poll();
+						final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
+						final SocketChannel socketChannel = (SocketChannel) key.channel();
+	
+						try {
+							final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
+							newKey.attach(incomingConnection);
+						} catch (ClosedChannelException e) {
+							incomingConnection.reportTransmissionProblem(key, e);
+						}
+					}
+				}
+	
+				try {
+					this.selector.select(500);
+				} catch (IOException e) {
+					LOG.error(e);
+				}
+	
+				final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
+	
+				while (iter.hasNext()) {
+					final SelectionKey key = iter.next();
+	
+					iter.remove();
+					if (key.isValid()) {
+						if (key.isReadable()) {
+							doRead(key);
+						} else if (key.isAcceptable()) {
+							doAccept(key);
+						} else {
+							LOG.error("Unknown key: " + key);
+						}
+					} else {
+						LOG.error("Received invalid key: " + key);
+					}
+				}
+			}
+	
+			// Do cleanup, if necessary
+			if (this.listeningSocket != null) {
+				try {
+					this.listeningSocket.close();
+				} catch (IOException ioe) {
+					// Actually, we can ignore this exception
+					LOG.debug(ioe);
+				}
+			}
+	
+			// Finally, close the selector
+			try {
+				this.selector.close();
+			} catch (IOException ioe) {
+				LOG.debug(StringUtils.stringifyException(ioe));
+			}
+		}
+		catch (Throwable t) {
+			// this is a disaster, this task manager cannot go on!
+			LOG.fatal("Incoming connection thread died with an exception: " + t.getMessage(), t);
+			System.exit(1);
+		}
+	}
+
+	private void doAccept(SelectionKey key) {
+
+		SocketChannel clientSocket = null;
+
+		try {
+			clientSocket = this.listeningSocket.accept();
+			if (clientSocket == null) {
+				LOG.error("Client socket is null");
+				return;
+			}
+		} catch (IOException ioe) {
+			LOG.error(ioe);
+			return;
+		}
+
+		final IncomingConnection incomingConnection = new IncomingConnection(
+			clientSocket, this.channelManager);
+		SelectionKey clientKey = null;
+		try {
+			clientSocket.configureBlocking(false);
+			clientKey = clientSocket.register(this.selector, SelectionKey.OP_READ);
+			clientKey.attach(incomingConnection);
+		} catch (IOException ioe) {
+			incomingConnection.reportTransmissionProblem(clientKey, ioe);
+		}
+	}
+
+	private void doRead(SelectionKey key) {
+
+		final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
+		try {
+			incomingConnection.read();
+		} catch (EOFException eof) {
+			if (incomingConnection.isCloseUnexpected()) {
+				final SocketChannel socketChannel = (SocketChannel) key.channel();
+				LOG.error("Connection from " + socketChannel.socket().getRemoteSocketAddress()
+					+ " was closed unexpectedly");
+				incomingConnection.reportTransmissionProblem(key, eof);
+			} else {
+				incomingConnection.closeConnection(key);
+			}
+		} catch (IOException ioe) {
+			incomingConnection.reportTransmissionProblem(key, ioe);
+		} catch (InterruptedException e) {
+			// Nothing to do here
+		} catch (NoBufferAvailableException e) {
+			// There are no buffers available, unsubscribe from read event
+			final SocketChannel socketChannel = (SocketChannel) key.channel();
+			try {
+				final SelectionKey newKey = socketChannel.register(this.selector, 0);
+				newKey.attach(incomingConnection);
+			} catch (ClosedChannelException e1) {
+				incomingConnection.reportTransmissionProblem(key, e1);
+			}
+
+			final BufferAvailabilityListener bal = new IncomingConnectionBufferAvailListener(
+				this.pendingReadEventSubscribeRequests, key);
+			if (!e.getBufferProvider().registerBufferAvailabilityListener(bal)) {
+				// In the meantime, a buffer has become available again, subscribe to read event again
+
+				try {
+					final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
+					newKey.attach(incomingConnection);
+				} catch (ClosedChannelException e1) {
+					incomingConnection.reportTransmissionProblem(key, e1);
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
new file mode 100644
index 0000000..7df1901
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/network/tcp/OutgoingConnection.java
@@ -0,0 +1,529 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.network.tcp;
+
+import eu.stratosphere.runtime.io.channels.ChannelID;
+import eu.stratosphere.runtime.io.network.RemoteReceiver;
+import eu.stratosphere.runtime.io.network.envelope.Envelope;
+import eu.stratosphere.runtime.io.network.envelope.EnvelopeWriter;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.SocketChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayDeque;
+import java.util.Iterator;
+import java.util.Queue;
+
+/**
+ * This class represents an outgoing TCP connection through which {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects can be sent.
+ * {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are received from the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} and added to a queue. An
+ * additional network thread then takes the envelopes from the queue and transmits them to the respective destination
+ * host.
+ * 
+ */
+public class OutgoingConnection {
+
+	/**
+	 * The log object used to report debug information and possible errors.
+	 */
+	private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
+
+	/**
+	 * The address this outgoing connection is connected to.
+	 */
+	private final RemoteReceiver remoteReceiver;
+
+	/**
+	 * The outgoing connection thread which actually transmits the queued transfer envelopes.
+	 */
+	private final OutgoingConnectionThread connectionThread;
+
+	/**
+	 * The queue of transfer envelopes to be transmitted.
+	 */
+	private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+
+	/**
+	 * The {@link eu.stratosphere.runtime.io.network.envelope.Envelope} that is currently processed.
+	 */
+	private Envelope currentEnvelope = null;
+
+	/**
+	 * Stores whether the underlying TCP connection is established. As this variable is accessed by the byte buffered
+	 * channel manager and the outgoing connection thread, it must be protected by a monitor.
+	 */
+	private boolean isConnected = false;
+
+	/**
+	 * Stores whether is underlying TCP connection is subscribed to the NIO write event. As this variable is accessed by
+	 * the byte buffered channel and the outgoing connection thread, it must be protected by a monitor.
+	 */
+	private boolean isSubscribedToWriteEvent = false;
+
+	/**
+	 * The overall number of connection retries which shall be performed before a connection error is reported.
+	 */
+	private final int numberOfConnectionRetries;
+
+	/**
+	 * The number of connection retries left before an I/O error is reported.
+	 */
+	private int retriesLeft = 0;
+
+	/**
+	 * The timestamp of the last connection retry.
+	 */
+	private long timstampOfLastRetry = 0;
+
+	/**
+	 * The current selection key representing the interest set of the underlying TCP NIO connection. This variable may
+	 * only be accessed the the outgoing connection thread.
+	 */
+	private SelectionKey selectionKey = null;
+
+	/**
+	 * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed.
+	 */
+	private static long RETRYINTERVAL = 1000L; // 1 second
+
+	private EnvelopeWriter writer;
+
+	/**
+	 * Constructs a new outgoing connection object.
+	 * 
+	 * @param remoteReceiver
+	 *        the address of the destination host this outgoing connection object is supposed to connect to
+	 * @param connectionThread
+	 *        the connection thread which actually handles the network transfer
+	 * @param numberOfConnectionRetries
+	 *        the number of connection retries allowed before an I/O error is reported
+	 */
+	public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread connectionThread,
+			int numberOfConnectionRetries) {
+
+		this.remoteReceiver = remoteReceiver;
+		this.connectionThread = connectionThread;
+		this.numberOfConnectionRetries = numberOfConnectionRetries;
+		this.writer = new EnvelopeWriter();
+	}
+
+	/**
+	 * Adds a new {@link eu.stratosphere.runtime.io.network.envelope.Envelope} to the queue of envelopes to be transmitted to the destination host of this
+	 * connection.
+	 * <p>
+	 * This method should only be called by the {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
+	 * 
+	 * @param envelope
+	 *        the envelope to be added to the transfer queue
+	 */
+	public void queueEnvelope(Envelope envelope) {
+
+		synchronized (this.queuedEnvelopes) {
+
+			checkConnection();
+			this.queuedEnvelopes.add(envelope);
+		}
+	}
+
+	private void checkConnection() {
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (!this.isConnected) {
+
+				this.retriesLeft = this.numberOfConnectionRetries;
+				this.timstampOfLastRetry = System.currentTimeMillis();
+				this.connectionThread.triggerConnect(this);
+				this.isConnected = true;
+				this.isSubscribedToWriteEvent = true;
+			} else {
+
+				if (!this.isSubscribedToWriteEvent) {
+					this.connectionThread.subscribeToWriteEvent(this.selectionKey);
+					this.isSubscribedToWriteEvent = true;
+				}
+			}
+
+		}
+	}
+
+	/**
+	 * Returns the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
+	 * connected to.
+	 * <p>
+	 * This method should be called by the {@link OutgoingConnectionThread} object only.
+	 * 
+	 * @return the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
+	 *         connected to
+	 */
+	public InetSocketAddress getConnectionAddress() {
+
+		return this.remoteReceiver.getConnectionAddress();
+	}
+
+	/**
+	 * Reports a problem which occurred while establishing the underlying TCP connection to this outgoing connection
+	 * object. Depending on the number of connection retries left, this method will either try to reestablish the TCP
+	 * connection or report an I/O error to all tasks which have queued envelopes for this connection. In the latter
+	 * case all queued envelopes will be dropped and all included buffers will be freed.
+	 * <p>
+	 * This method should only be called by the {@link OutgoingConnectionThread} object.
+	 * 
+	 * @param ioe
+	 *        thrown if an error occurs while reseting the underlying TCP connection
+	 */
+	public void reportConnectionProblem(IOException ioe) {
+
+		// First, write exception to log
+		final long currentTime = System.currentTimeMillis();
+		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
+			LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
+		}
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (this.selectionKey != null) {
+
+				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
+				if (socketChannel != null) {
+					try {
+						socketChannel.close();
+					} catch (IOException e) {
+						LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
+					}
+				}
+
+				this.selectionKey.cancel();
+				this.selectionKey = null;
+				this.isConnected = false;
+				this.isSubscribedToWriteEvent = false;
+			}
+
+			if (hasRetriesLeft(currentTime)) {
+				this.connectionThread.triggerConnect(this);
+				this.isConnected = true;
+				this.isSubscribedToWriteEvent = true;
+				return;
+			}
+
+			// Error is fatal
+			LOG.error(ioe);
+
+			// Notify source of current envelope and release buffer
+			if (this.currentEnvelope != null) {
+				if (this.currentEnvelope.getBuffer() != null) {
+					this.currentEnvelope.getBuffer().recycleBuffer();
+					this.currentEnvelope = null;
+				}
+			}
+
+			// Notify all other tasks which are waiting for data to be transmitted
+			final Iterator<Envelope> iter = this.queuedEnvelopes.iterator();
+			while (iter.hasNext()) {
+				final Envelope envelope = iter.next();
+				iter.remove();
+				// Recycle the buffer inside the envelope
+				if (envelope.getBuffer() != null) {
+					envelope.getBuffer().recycleBuffer();
+				}
+			}
+
+			this.queuedEnvelopes.clear();
+		}
+	}
+
+	/**
+	 * Reports an I/O error which occurred while writing data to the TCP connection. As a result of the I/O error the
+	 * connection is closed and the interest keys are canceled. Moreover, the task which queued the currently
+	 * transmitted transfer envelope is notified about the error and the current envelope is dropped. If the current
+	 * envelope contains a buffer, the buffer is freed.
+	 * <p>
+	 * This method should only be called by the {@link OutgoingConnectionThread} object.
+	 * 
+	 * @param ioe
+	 *        thrown if an error occurs while reseting the connection
+	 */
+	public void reportTransmissionProblem(IOException ioe) {
+
+		final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
+
+		// First, write exception to log
+		if (this.currentEnvelope != null) {
+			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
+				+ socketChannel.socket().getRemoteSocketAddress()
+				+ " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
+		} else {
+			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
+				+ socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
+		}
+
+		// Close the connection and cancel the interest key
+		synchronized (this.queuedEnvelopes) {
+			try {
+				LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
+				socketChannel.close();
+			} catch (IOException e) {
+				LOG.debug("An error occurred while responding to an IOException");
+				LOG.debug(e);
+			}
+
+			this.selectionKey.cancel();
+
+			// Error is fatal
+			LOG.error(ioe);
+
+			// Trigger new connection if there are more envelopes to be transmitted
+			if (this.queuedEnvelopes.isEmpty()) {
+				this.isConnected = false;
+				this.isSubscribedToWriteEvent = false;
+			} else {
+				this.connectionThread.triggerConnect(this);
+				this.isConnected = true;
+				this.isSubscribedToWriteEvent = true;
+			}
+
+			// We must assume the current envelope is corrupted so we notify the task which created it.
+			if (this.currentEnvelope != null) {
+				if (this.currentEnvelope.getBuffer() != null) {
+					this.currentEnvelope.getBuffer().recycleBuffer();
+					this.currentEnvelope = null;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Checks whether further retries are left for establishing the underlying TCP connection.
+	 * 
+	 * @param currentTime
+	 *        the current system time in milliseconds since January 1st, 1970
+	 * @return <code>true</code> if there are retries left, <code>false</code> otherwise
+	 */
+	private boolean hasRetriesLeft(long currentTime) {
+
+		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
+			this.retriesLeft--;
+			this.timstampOfLastRetry = currentTime;
+			if (this.retriesLeft == 0) {
+				return false;
+			}
+		}
+
+		return true;
+	}
+
+	/**
+	 * Writes the content of the current {@link eu.stratosphere.runtime.io.network.envelope.Envelope} object to the underlying TCP connection.
+	 * <p>
+	 * This method should only be called by the {@link OutgoingConnectionThread} object.
+	 * 
+	 * @return <code>true</code> if there is more data from this/other queued envelopes to be written to this channel
+	 * @throws IOException
+	 *         thrown if an error occurs while writing the data to the channel
+	 */
+	public boolean write() throws IOException {
+
+		final WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
+
+		if (this.currentEnvelope == null) {
+			synchronized (this.queuedEnvelopes) {
+				if (this.queuedEnvelopes.isEmpty()) {
+					return false;
+				} else {
+					this.currentEnvelope = this.queuedEnvelopes.peek();
+
+					this.writer.setEnvelopeForWriting(this.currentEnvelope);
+				}
+			}
+		}
+
+		if (!this.writer.writeNextChunk(writableByteChannel)) {
+			// Make sure we recycle the attached memory or file buffers correctly
+			if (this.currentEnvelope.getBuffer() != null) {
+				this.currentEnvelope.getBuffer().recycleBuffer();
+			}
+
+			synchronized (this.queuedEnvelopes) {
+				this.queuedEnvelopes.poll();
+				this.currentEnvelope = null;
+			}
+		}
+
+		return true;
+	}
+
+	/**
+	 * Requests to close the underlying TCP connection. The request is ignored if at least one {@link eu.stratosphere.runtime.io.network.envelope.Envelope}
+	 * is queued.
+	 * <p>
+	 * This method should only be called by the {@link OutgoingConnectionThread} object.
+	 * 
+	 * @throws IOException
+	 *         thrown if an error occurs while closing the TCP connection
+	 */
+	public void requestClose() throws IOException {
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (this.queuedEnvelopes.isEmpty()) {
+
+				if (this.isSubscribedToWriteEvent) {
+
+					this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
+					this.isSubscribedToWriteEvent = false;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Closes the underlying TCP connection if no more {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects are in the transmission queue.
+	 * <p>
+	 * This method should only be called by the {@link OutgoingConnectionThread} object.
+	 * 
+	 * @throws IOException
+	 */
+	public void closeConnection() throws IOException {
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (!this.queuedEnvelopes.isEmpty()) {
+				return;
+			}
+
+			if (this.selectionKey != null) {
+
+				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
+				socketChannel.close();
+				this.selectionKey.cancel();
+				this.selectionKey = null;
+			}
+
+			this.isConnected = false;
+			this.isSubscribedToWriteEvent = false;
+		}
+	}
+
+	/**
+	 * Returns the number of queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects with the given source channel ID.
+	 * 
+	 * @param sourceChannelID
+	 *        the source channel ID to count the queued envelopes for
+	 * @return the number of queued transfer envelopes with the given source channel ID
+	 */
+	public int getNumberOfQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
+
+		synchronized (this.queuedEnvelopes) {
+
+			int number = 0;
+
+			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
+			while (it.hasNext()) {
+				final Envelope te = it.next();
+				if (sourceChannelID.equals(te.getSource())) {
+					number++;
+				}
+			}
+
+			return number;
+		}
+	}
+
+	/**
+	 * Removes all queued {@link eu.stratosphere.runtime.io.network.envelope.Envelope} objects from the transmission which match the given source channel
+	 * ID.
+	 * 
+	 * @param sourceChannelID
+	 *        the source channel ID of the transfered transfer envelopes to be dropped
+	 */
+	public void dropAllQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
+
+		synchronized (this.queuedEnvelopes) {
+
+			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
+			while (it.hasNext()) {
+				final Envelope te = it.next();
+				if (sourceChannelID.equals(te.getSource())) {
+					it.remove();
+					if (te.getBuffer() != null) {
+						te.getBuffer().recycleBuffer();
+					}
+				}
+			}
+		}
+	}
+
+	/**
+	 * Checks whether this outgoing connection object manages an active connection or can be removed by the
+	 * {@link eu.stratosphere.nephele.taskmanager.io.bytebuffered.ChannelManager} object.
+	 * <p>
+	 * This method should only be called by the byte buffered channel manager.
+	 * 
+	 * @return <code>true</code> if this object is no longer manages an active connection and can be removed,
+	 *         <code>false</code> otherwise.
+	 */
+	public boolean canBeRemoved() {
+
+		synchronized (this.queuedEnvelopes) {
+
+			if (this.isConnected) {
+				return false;
+			}
+
+			if (this.currentEnvelope != null) {
+				return false;
+			}
+
+			return this.queuedEnvelopes.isEmpty();
+		}
+	}
+
+	/**
+	 * Sets the selection key representing the interest set of the underlying TCP NIO connection.
+	 * 
+	 * @param selectionKey
+	 *        the selection of the underlying TCP connection
+	 */
+	public void setSelectionKey(SelectionKey selectionKey) {
+		this.selectionKey = selectionKey;
+	}
+
+	/**
+	 * Returns the number of currently queued envelopes which contain a write buffer.
+	 * 
+	 * @return the number of currently queued envelopes which contain a write buffer
+	 */
+	public int getNumberOfQueuedWriteBuffers() {
+
+		int retVal = 0;
+
+		synchronized (this.queuedEnvelopes) {
+
+			final Iterator<Envelope> it = this.queuedEnvelopes.iterator();
+			while (it.hasNext()) {
+
+				final Envelope envelope = it.next();
+				if (envelope.getBuffer() != null) {
+					++retVal;
+				}
+			}
+		}
+
+		return retVal;
+	}
+}


Mime
View raw message