flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [15/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:23 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
index 1f79067..7de8651 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManagerAsync.java
@@ -18,15 +18,15 @@
 
 package org.apache.flink.runtime.io.disk.iomanager;
 
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.util.EnvironmentInformation;
+
 import java.io.IOException;
 import java.lang.Thread.UncaughtExceptionHandler;
 import java.util.List;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.flink.core.memory.MemorySegment;
-import org.apache.flink.runtime.util.EnvironmentInformation;
-
-import com.google.common.base.Preconditions;
+import static com.google.common.base.Preconditions.checkState;
 
 /**
  * A version of the {@link IOManager} that uses asynchronous I/O.
@@ -181,13 +181,13 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelWriter createBlockChannelWriter(FileIOChannel.ID channelID,
 								LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!shutdown, "I/O-Manger is closed.");
 		return new AsynchronousBlockWriter(channelID, this.writers[channelID.getThreadNum()].requestQueue, returnQueue);
 	}
 	
 	@Override
-	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback callback) throws IOException {
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+	public BlockChannelWriterWithCallback createBlockChannelWriter(FileIOChannel.ID channelID, RequestDoneCallback<MemorySegment> callback) throws IOException {
+		checkState(!shutdown, "I/O-Manger is closed.");
 		return new AsynchronousBlockWriterWithCallback(channelID, this.writers[channelID.getThreadNum()].requestQueue, callback);
 	}
 	
@@ -205,7 +205,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BlockChannelReader createBlockChannelReader(FileIOChannel.ID channelID,
 										LinkedBlockingQueue<MemorySegment> returnQueue) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!shutdown, "I/O-Manger is closed.");
 		return new AsynchronousBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, returnQueue);
 	}
 	
@@ -228,7 +228,7 @@ public class IOManagerAsync extends IOManager implements UncaughtExceptionHandle
 	public BulkBlockChannelReader createBulkBlockChannelReader(FileIOChannel.ID channelID,
 			List<MemorySegment> targetSegments,	int numBlocks) throws IOException
 	{
-		Preconditions.checkState(!shutdown, "I/O-Manger is closed.");
+		checkState(!shutdown, "I/O-Manger is closed.");
 		return new AsynchronousBulkBlockReader(channelID, this.readers[channelID.getThreadNum()].requestQueue, targetSegments, numBlocks);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
index 1b7adae..69791f4 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IORequest.java
@@ -33,7 +33,6 @@ interface IORequest {
 	public void requestDone(IOException ioex);
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's reading thread. 
  */
@@ -47,7 +46,6 @@ interface ReadRequest extends IORequest {
 	public void read() throws IOException;
 }
 
-
 /**
  * Interface for I/O requests that are handled by the IOManager's writing thread.
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
index 78699e2..95f3dc7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/QueuingCallback.java
@@ -26,7 +26,7 @@ import org.apache.flink.core.memory.MemorySegment;
 /**
  * A {@link RequestDoneCallback} that adds the memory segments to a blocking queue.
  */
-public class QueuingCallback implements RequestDoneCallback {
+public class QueuingCallback implements RequestDoneCallback<MemorySegment> {
 
 	private final LinkedBlockingQueue<MemorySegment> queue;
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
index 982343c..f9a0965 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/RequestDoneCallback.java
@@ -20,17 +20,15 @@ package org.apache.flink.runtime.io.disk.iomanager;
 
 import java.io.IOException;
 
-import org.apache.flink.core.memory.MemorySegment;
-
 /**
  * Callback to be executed on completion of an asynchronous I/O request.
- * Depending on success or failure, either method of
- * {@ink #requestSuccessful(MemorySegment)} or {@link #requestFailed(MemorySegment, IOException)}
- * is called.
+ * <p>
+ * Depending on success or failure, either {@link #requestSuccessful(Object)}
+ * or {@link #requestSuccessful(Object)} is called.
  */
-public interface RequestDoneCallback {
+public interface RequestDoneCallback<T> {
+
+	void requestSuccessful(T request);
 
-	void requestSuccessful(MemorySegment buffer);
-	
-	void requestFailed(MemorySegment buffer, IOException e);
+	void requestFailed(T buffer, IOException e);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
deleted file mode 100644
index 60232da..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Buffer.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public class Buffer {
-
-	private final MemorySegment memorySegment;
-
-	private final BufferRecycler recycler;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private final AtomicInteger referenceCounter;
-
-	private int size;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public Buffer(MemorySegment memorySegment, int size, BufferRecycler recycler) {
-		this.memorySegment = memorySegment;
-		this.size = size;
-		this.recycler = recycler;
-
-		// we are the first, so we start with reference count of one
-		this.referenceCounter = new AtomicInteger(1);
-	}
-
-	/**
-	 * @param toDuplicate Buffer instance to duplicate
-	 */
-	private Buffer(Buffer toDuplicate) {
-		if (toDuplicate.referenceCounter.getAndIncrement() == 0) {
-			throw new IllegalStateException("Buffer was released before duplication.");
-		}
-		
-		this.memorySegment = toDuplicate.memorySegment;
-		this.size = toDuplicate.size;
-		this.recycler = toDuplicate.recycler;
-		this.referenceCounter = toDuplicate.referenceCounter;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public MemorySegment getMemorySegment() {
-		return this.memorySegment;
-	}
-
-	public int size() {
-		return this.size;
-	}
-
-	public void limitSize(int size) {
-		if (size >= 0 && size <= this.memorySegment.size()) {
-			this.size = size;
-		} else {
-			throw new IllegalArgumentException();
-		}
-	}
-
-	public void recycleBuffer() {
-		int refCount = this.referenceCounter.decrementAndGet();
-		if (refCount == 0) {
-			this.recycler.recycle(this.memorySegment);
-		}
-	}
-
-	public Buffer duplicate() {
-		return new Buffer(this);
-	}
-
-	public void copyToBuffer(Buffer destinationBuffer) {
-		if (size() > destinationBuffer.size()) {
-			throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer.");
-		}
-
-		this.memorySegment.copyTo(0, destinationBuffer.memorySegment, 0, size);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
deleted file mode 100644
index e48ca52..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/BufferRecycler.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.core.memory.MemorySegment;
-
-public interface BufferRecycler {
-
-	/**
-	 * Called by {@link org.apache.flink.runtime.io.network.Buffer} to return a {@link MemorySegment} to its original buffer pool.
-	 *
-	 * @param buffer the segment to be recycled
-	 */
-	void recycle(MemorySegment buffer);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
deleted file mode 100644
index c841872..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ /dev/null
@@ -1,692 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import akka.actor.ActorRef;
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.runtime.AbstractID;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.execution.RuntimeEnvironment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.InstanceConnectionInfo;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProviderBroker;
-import org.apache.flink.runtime.io.network.bufferprovider.DiscardBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.Channel;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.ChannelType;
-import org.apache.flink.runtime.io.network.channels.InputChannel;
-import org.apache.flink.runtime.io.network.channels.OutputChannel;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.util.ExceptionUtils;
-import scala.concurrent.duration.FiniteDuration;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * The channel manager sets up the network buffers and dispatches data between channels.
- */
-public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
-
-	private static final Logger LOG = LoggerFactory.getLogger(ChannelManager.class);
-
-	private final ActorRef channelLookup;
-
-	private final InstanceConnectionInfo connectionInfo;
-
-	private final Map<ChannelID, Channel> channels;
-
-	private final Map<AbstractID, LocalBufferPoolOwner> localBuffersPools;
-
-	private final Map<ChannelID, EnvelopeReceiverList> receiverCache;
-
-	private final GlobalBufferPool globalBufferPool;
-
-	private final NetworkConnectionManager networkConnectionManager;
-	
-	private final InetSocketAddress ourAddress;
-	
-	private final DiscardBufferPool discardBufferPool;
-
-	private final FiniteDuration timeout;
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public ChannelManager(ActorRef channelLookup, InstanceConnectionInfo connectionInfo, int numNetworkBuffers,
-						int networkBufferSize, NetworkConnectionManager networkConnectionManager,
-						FiniteDuration timeout) throws IOException {
-
-		this.channelLookup= channelLookup;
-		this.connectionInfo = connectionInfo;
-
-		this.timeout = timeout;
-
-		try {
-			this.globalBufferPool = new GlobalBufferPool(numNetworkBuffers, networkBufferSize);
-		} catch (Throwable e) {
-			throw new IOException("Failed to instantiate GlobalBufferPool.", e);
-		}
-
-		this.networkConnectionManager = networkConnectionManager;
-		networkConnectionManager.start(this);
-
-		// management data structures
-		this.channels = new ConcurrentHashMap<ChannelID, Channel>();
-		this.receiverCache = new ConcurrentHashMap<ChannelID, EnvelopeReceiverList>();
-		this.localBuffersPools = new ConcurrentHashMap<AbstractID, LocalBufferPoolOwner>();
-		
-		this.ourAddress = new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort());
-		
-		// a special pool if the data is to be discarded
-		this.discardBufferPool = new DiscardBufferPool();
-	}
-
-	public void shutdown() throws IOException {
-		this.networkConnectionManager.shutdown();
-
-		this.globalBufferPool.destroy();
-	}
-
-	public GlobalBufferPool getGlobalBufferPool() {
-		return globalBufferPool;
-	}
-
-	public NetworkConnectionManager getNetworkConnectionManager() {
-		return networkConnectionManager;
-	}
-	
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                               Task registration
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Registers the given task with the channel manager.
-	 *
-	 * @param task the task to be registered
-	 * @throws InsufficientResourcesException thrown if not enough buffers available to safely run this task
-	 */
-	public void register(Task task) throws InsufficientResourcesException {
-		// Check if we can safely run this task with the given buffers
-		ensureBufferAvailability(task);
-
-		RuntimeEnvironment environment = task.getEnvironment();
-
-		// -------------------------------------------------------------------------------------------------------------
-		//                                       Register output channels
-		// -------------------------------------------------------------------------------------------------------------
-
-		environment.registerGlobalBufferPool(this.globalBufferPool);
-
-		if (this.localBuffersPools.containsKey(task.getExecutionId())) {
-			throw new IllegalStateException("Execution " + task.getExecutionId() + " has a previous buffer pool owner");
-		}
-
-		for (OutputGate gate : environment.outputGates()) {
-			// add receiver list hints
-			for (OutputChannel channel : gate.channels()) {
-				// register envelope dispatcher with the channel
-				channel.registerEnvelopeDispatcher(this);
-
-				switch (channel.getChannelType()) {
-					case IN_MEMORY:
-						addReceiverListHint(channel.getID(), channel.getConnectedId());
-						break;
-					case NETWORK:
-						addReceiverListHint(channel.getConnectedId(), channel.getID());
-						break;
-				}
-
-				this.channels.put(channel.getID(), channel);
-			}
-		}
-
-		this.localBuffersPools.put(task.getExecutionId(), environment);
-
-		// -------------------------------------------------------------------------------------------------------------
-		//                                       Register input channels
-		// -------------------------------------------------------------------------------------------------------------
-
-		// register global
-		for (InputGate<?> gate : environment.inputGates()) {
-			gate.registerGlobalBufferPool(this.globalBufferPool);
-
-			for (int i = 0; i < gate.getNumberOfInputChannels(); i++) {
-				InputChannel<? extends IOReadableWritable> channel = gate.getInputChannel(i);
-				channel.registerEnvelopeDispatcher(this);
-
-				if (channel.getChannelType() == ChannelType.IN_MEMORY) {
-					addReceiverListHint(channel.getID(), channel.getConnectedId());
-				}
-
-				this.channels.put(channel.getID(), channel);
-			}
-
-			this.localBuffersPools.put(gate.getGateID(), gate);
-		}
-
-		// the number of channels per buffers has changed after unregistering the task
-		// => redistribute the number of designated buffers of the registered local buffer pools
-		redistributeBuffers();
-	}
-
-	/**
-	 * Unregisters the given task from the channel manager.
-	 *
-	 * @param executionId the ID of the task to be unregistered
-	 * @param task the task to be unregistered
-	 */
-	public void unregister(ExecutionAttemptID executionId, Task task) {
-		final Environment environment = task.getEnvironment();
-		if (environment == null) {
-			return;
-		}
-
-		// destroy and remove OUTPUT channels from registered channels and cache
-		for (ChannelID id : environment.getOutputChannelIDs()) {
-			Channel channel = this.channels.remove(id);
-			if (channel != null) {
-
-				channel.destroy();
-
-				removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
-			}
-		}
-
-		// destroy and remove INPUT channels from registered channels and cache
-		for (ChannelID id : environment.getInputChannelIDs()) {
-			Channel channel = this.channels.remove(id);
-			if (channel != null) {
-				channel.destroy();
-
-				removeFromReceiverCacheAndMaybeCloseTcpConnection(channel);
-			}
-		}
-
-		// clear and remove INPUT side buffer pools
-		for (GateID id : environment.getInputGateIDs()) {
-			LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(id);
-			if (bufferPool != null) {
-				bufferPool.clearLocalBufferPool();
-			}
-		}
-
-		// clear and remove OUTPUT side buffer pool
-		LocalBufferPoolOwner bufferPool = this.localBuffersPools.remove(executionId);
-		if (bufferPool != null) {
-			bufferPool.clearLocalBufferPool();
-		}
-
-		// the number of channels per buffers has changed after unregistering the task
-		// => redistribute the number of designated buffers of the registered local buffer pools
-		redistributeBuffers();
-	}
-
-	private void removeFromReceiverCacheAndMaybeCloseTcpConnection(Channel channel) {
-		EnvelopeReceiverList receiver = this.receiverCache.remove(channel.getID());
-
-		if (receiver != null && receiver.hasRemoteReceiver()) {
-			networkConnectionManager.close(receiver.getRemoteReceiver());
-		}
-	}
-
-	/**
-	 * Ensures that the channel manager has enough buffers to execute the given task.
-	 * <p>
-	 * If there is less than one buffer per channel available, an InsufficientResourcesException will be thrown,
-	 * because of possible deadlocks. With more then one buffer per channel, deadlock-freedom is guaranteed.
-	 *
-	 * @param task task to be executed
-	 * @throws InsufficientResourcesException thrown if not enough buffers available to execute the task
-	 */
-	private void ensureBufferAvailability(Task task) throws InsufficientResourcesException {
-		Environment env = task.getEnvironment();
-
-		int numBuffers = this.globalBufferPool.numBuffers();
-		// existing channels + channels of the task
-		int numChannels = this.channels.size() + env.getNumberOfOutputChannels() + env.getNumberOfInputChannels();
-
-		// need at least one buffer per channel
-		if (numChannels > 0 && numBuffers / numChannels < 1) {
-			String msg = String.format("%s has not enough buffers to safely execute %s (%d buffers missing)",
-					this.connectionInfo.getFQDNHostname(), env.getTaskName(), numChannels - numBuffers);
-
-			throw new InsufficientResourcesException(msg);
-		}
-	}
-
-	/**
-	 * Redistributes the buffers among the registered buffer pools. This method is called after each task registration
-	 * and unregistration.
-	 * <p>
-	 * Every registered buffer pool gets buffers according to its number of channels weighted by the current buffer to
-	 * channel ratio.
-	 */
-	private void redistributeBuffers() {
-		if (this.localBuffersPools.isEmpty() | this.channels.size() == 0) {
-			return;
-		}
-
-		int numBuffers = this.globalBufferPool.numBuffers();
-		int numChannels = this.channels.size();
-
-		double buffersPerChannel = numBuffers / (double) numChannels;
-
-		if (buffersPerChannel < 1.0) {
-			throw new RuntimeException("System has not enough buffers to execute tasks.");
-		}
-
-		// redistribute number of designated buffers per buffer pool
-		for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
-			int numDesignatedBuffers = (int) Math.ceil(buffersPerChannel * bufferPool.getNumberOfChannels());
-			bufferPool.setDesignatedNumberOfBuffers(numDesignatedBuffers);
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                           Envelope processing
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private void releaseEnvelope(Envelope envelope) {
-		Buffer buffer = envelope.getBuffer();
-		if (buffer != null) {
-			buffer.recycleBuffer();
-		}
-	}
-
-	private void addReceiverListHint(ChannelID source, ChannelID localReceiver) {
-		EnvelopeReceiverList receiverList = new EnvelopeReceiverList(localReceiver);
-
-		if (this.receiverCache.put(source, receiverList) != null) {
-			LOG.warn("Receiver cache already contained entry for " + source);
-		}
-	}
-
-	private void addReceiverListHint(ChannelID source, RemoteReceiver remoteReceiver) {
-		EnvelopeReceiverList receiverList = new EnvelopeReceiverList(remoteReceiver);
-
-		if (this.receiverCache.put(source, receiverList) != null) {
-			LOG.warn("Receiver cache already contained entry for " + source);
-		}
-	}
-
-	private void generateSenderHint(Envelope envelope, RemoteReceiver receiver) throws IOException {
-		Channel channel = this.channels.get(envelope.getSource());
-		if (channel == null) {
-			LOG.error("Cannot find channel for channel ID " + envelope.getSource());
-			return;
-		}
-
-		// Only generate sender hints for output channels
-		if (channel.isInputChannel()) {
-			return;
-		}
-
-		final ChannelID targetChannelID = channel.getConnectedId();
-		final int connectionIndex = receiver.getConnectionIndex();
-
-		final RemoteReceiver ourAddress = new RemoteReceiver(this.ourAddress, connectionIndex);
-		final Envelope senderHint = SenderHintEvent.createEnvelopeWithEvent(envelope, targetChannelID, ourAddress);
-
-		this.networkConnectionManager.enqueue(senderHint, receiver, true);
-	}
-
-	/**
-	 * Returns the list of receivers for transfer envelopes produced by the channel with the given source channel ID.
-	 *
-	 * @param jobID
-	 *        the ID of the job the given channel ID belongs to
-	 * @param sourceChannelID
-	 *        the source channel ID for which the receiver list shall be retrieved
-	 * @return the list of receivers or <code>null</code> if the receiver could not be determined
-	 * @throws IOException
-	 */
-	private EnvelopeReceiverList getReceiverList(JobID jobID, ChannelID sourceChannelID, boolean reportException) throws IOException {
-		EnvelopeReceiverList receiverList = this.receiverCache.get(sourceChannelID);
-
-		if (receiverList != null) {
-			return receiverList;
-		}
-
-		while (true) {
-			ConnectionInfoLookupResponse lookupResponse;
-			lookupResponse = AkkaUtils.<JobManagerMessages.ConnectionInformation>ask(channelLookup,
-					new JobManagerMessages.LookupConnectionInformation(connectionInfo, jobID,
-							sourceChannelID), timeout).response();
-
-			if (lookupResponse.receiverReady()) {
-				receiverList = new EnvelopeReceiverList(lookupResponse);
-				break;
-			}
-			else if (lookupResponse.receiverNotReady()) {
-				try {
-					Thread.sleep(100);
-				} catch (InterruptedException e) {
-					if (reportException) {
-						throw new IOException("Lookup was interrupted.");
-					} else {
-						return null;
-					}
-				}
-			}
-			else if (lookupResponse.isJobAborting()) {
-				if (reportException) {
-					throw new CancelTaskException();
-				} else {
-					return null;
-				}
-			}
-			else if (lookupResponse.receiverNotFound()) {
-				if (reportException) {
-					throw new IOException("Could not find the receiver for Job " + jobID + ", channel with source id " + sourceChannelID);
-				} else {
-					return null;
-				}
-			}
-			else {
-				throw new IllegalStateException("Unrecognized response to channel lookup.");
-			}
-		}
-
-		this.receiverCache.put(sourceChannelID, receiverList);
-
-		if (LOG.isDebugEnabled()) {
-			LOG.debug(String.format("Receiver for %s: %s [%s])",
-					sourceChannelID,
-					receiverList.hasLocalReceiver() ? receiverList.getLocalReceiver() : receiverList.getRemoteReceiver(),
-					receiverList.hasLocalReceiver() ? "local" : "remote"));
-		}
-
-		return receiverList;
-	}
-
-	/**
-	 * Invalidates the entries identified by the given channel IDs from the receiver lookup cache.
-	 *
-	 * @param channelIDs channel IDs for entries to invalidate
-	 */
-	public void invalidateLookupCacheEntries(Set<ChannelID> channelIDs) {
-		for (ChannelID id : channelIDs) {
-			this.receiverCache.remove(id);
-		}
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                       EnvelopeDispatcher methods
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException {
-		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
-
-		Buffer srcBuffer = envelope.getBuffer();
-		Buffer destBuffer = null;
-		
-		boolean success = false;
-		
-		try {
-			if (receiverList.hasLocalReceiver()) {
-				ChannelID receiver = receiverList.getLocalReceiver();
-				Channel channel = this.channels.get(receiver);
-
-				if (channel == null) {
-					throw new LocalReceiverCancelledException(receiver);
-				}
-
-				if (!channel.isInputChannel()) {
-					throw new IOException("Local receiver " + receiver + " is not an input channel.");
-				}
-
-				InputChannel<?> inputChannel = (InputChannel<?>) channel;
-				
-				// copy the buffer into the memory space of the receiver 
-				if (srcBuffer != null) {
-					try {
-						destBuffer = inputChannel.requestBufferBlocking(srcBuffer.size());
-					} catch (InterruptedException e) {
-						throw new IOException(e.getMessage());
-					}
-
-					srcBuffer.copyToBuffer(destBuffer);
-					envelope.setBuffer(destBuffer);
-					srcBuffer.recycleBuffer();
-				}
-				
-				inputChannel.queueEnvelope(envelope);
-				success = true;
-			}
-			else if (receiverList.hasRemoteReceiver()) {
-				RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
-
-				// Generate sender hint before sending the first envelope over the network
-				if (envelope.getSequenceNumber() == 0) {
-					generateSenderHint(envelope, remoteReceiver);
-				}
-
-				this.networkConnectionManager.enqueue(envelope, remoteReceiver, false);
-				success = true;
-			}
-		} finally {
-			if (!success) {
-				if (srcBuffer != null) {
-					srcBuffer.recycleBuffer();
-				}
-				if (destBuffer != null) {
-					destBuffer.recycleBuffer();
-				}
-			}
-		}
-	}
-
-	@Override
-	public void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException {
-		// this method sends only events back from input channels to output channels
-		// sanity check that we have no buffer
-		if (envelope.getBuffer() != null) {
-			throw new RuntimeException("Error: This method can only process envelopes without buffers.");
-		}
-		
-		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, true);
-
-		if (receiverList.hasLocalReceiver()) {
-			ChannelID receiver = receiverList.getLocalReceiver();
-			Channel channel = this.channels.get(receiver);
-
-			if (channel == null) {
-				throw new LocalReceiverCancelledException(receiver);
-			}
-
-			if (channel.isInputChannel()) {
-				throw new IOException("Local receiver " + receiver + " of backward event is not an output channel.");
-			}
-
-			OutputChannel outputChannel = (OutputChannel) channel;
-			outputChannel.queueEnvelope(envelope);
-		}
-		else if (receiverList.hasRemoteReceiver()) {
-			RemoteReceiver remoteReceiver = receiverList.getRemoteReceiver();
-
-			// Generate sender hint before sending the first envelope over the network
-			this.networkConnectionManager.enqueue(envelope, remoteReceiver, envelope.getSequenceNumber() == 0);
-		}
-	}
-
-	/**
-	 * 
-	 */
-	@Override
-	public void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException {
-		// ========================================================================================
-		//  IMPORTANT
-		//  
-		//  This method is called by the network I/O thread that reads the incoming TCP 
-		//  connections. This method must have minimal overhead and not throw exception if
-		//  something is wrong with a job or individual transmission, but only when something
-		//  is fundamentally broken in the system.
-		// ========================================================================================
-		
-		// the sender hint event is to let the receiver know where exactly the envelope came from.
-		// the receiver will cache the sender id and its connection info in its local lookup table
-		// that allows the receiver to send envelopes to the sender without first pinging the job manager
-		// for the sender's connection info
-		
-		// Check if the envelope is the special envelope with the sender hint event
-		if (SenderHintEvent.isSenderHintEvent(envelope)) {
-			// Check if this is the final destination of the sender hint event before adding it
-			final SenderHintEvent seh = (SenderHintEvent) envelope.deserializeEvents().get(0);
-			if (this.channels.get(seh.getSource()) != null) {
-				addReceiverListHint(seh.getSource(), seh.getRemoteReceiver());
-				return;
-			}
-		}
-		
-		// try and get the receiver list. if we cannot get it anymore, the task has been cleared
-		// the code frees the envelope on exception, so we need not to anything
-		EnvelopeReceiverList receiverList = getReceiverListForEnvelope(envelope, false);
-		if (receiverList == null) {
-			// receiver is cancelled and cleaned away
-			releaseEnvelope(envelope);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Dropping envelope for cleaned up receiver.");
-			}
-
-			return;
-		}
-
-		if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
-			throw new IOException("Bug in network stack: Envelope dispatched from the incoming network pipe has no local receiver or has a remote receiver");
-		}
-
-		ChannelID localReceiver = receiverList.getLocalReceiver();
-		Channel channel = this.channels.get(localReceiver);
-		
-		// if the channel is null, it means that receiver has been cleared already (cancelled or failed).
-		// release the buffer immediately
-		if (channel == null) {
-			releaseEnvelope(envelope);
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Dropping envelope for cancelled receiver " + localReceiver);
-			}
-		}
-		else {
-			channel.queueEnvelope(envelope);
-		}
-	}
-
-	/**
-	 * 
-	 * Upon an exception, this method frees the envelope.
-	 */
-	private final EnvelopeReceiverList getReceiverListForEnvelope(Envelope envelope, boolean reportException) throws IOException {
-		try {
-			return getReceiverList(envelope.getJobID(), envelope.getSource(), reportException);
-		} catch (IOException e) {
-			releaseEnvelope(envelope);
-			throw e;
-		} catch (CancelTaskException e) {
-			releaseEnvelope(envelope);
-			throw e;
-		} catch (Throwable t) {
-			releaseEnvelope(envelope);
-			ExceptionUtils.rethrow(t, "Error while requesting receiver list.");
-			return null; // silence the compiler
-		}
-	}
-	
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                       BufferProviderBroker methods
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public BufferProvider getBufferProvider(JobID jobID, ChannelID sourceChannelID) throws IOException {
-		EnvelopeReceiverList receiverList = getReceiverList(jobID, sourceChannelID, false);
-		
-		// check if the receiver is already gone
-		if (receiverList == null) {
-			return this.discardBufferPool;
-		}
-
-		if (!receiverList.hasLocalReceiver() || receiverList.hasRemoteReceiver()) {
-			throw new IOException("The destination to be looked up is not a single local endpoint.");
-		}
-		
-
-		ChannelID localReceiver = receiverList.getLocalReceiver();
-		Channel channel = this.channels.get(localReceiver);
-		
-		if (channel == null) {
-			// receiver is already canceled
-			return this.discardBufferPool;
-		}
-
-		if (!channel.isInputChannel()) {
-			throw new IOException("Channel context for local receiver " + localReceiver + " is not an input channel context");
-		}
-
-		return (InputChannel<?>) channel;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public void logBufferUtilization() {
-		System.out.println("Buffer utilization at " + System.currentTimeMillis());
-
-		System.out.println("\tUnused global buffers: " + this.globalBufferPool.numAvailableBuffers());
-
-		System.out.println("\tLocal buffer pool status:");
-
-		for (LocalBufferPoolOwner bufferPool : this.localBuffersPools.values()) {
-			bufferPool.logBufferUtilization();
-		}
-
-		System.out.println("\tIncoming connections:");
-
-		for (Channel channel : this.channels.values()) {
-			if (channel.isInputChannel()) {
-				((InputChannel<?>) channel).logQueuedEnvelopes();
-			}
-		}
-	}
-	
-	public void verifyAllCachesEmpty() {
-		if (!channels.isEmpty()) {
-			throw new IllegalStateException("Channel manager caches not empty: There are still registered channels.");
-		}
-		if (!localBuffersPools.isEmpty()) {
-			throw new IllegalStateException("Channel manager caches not empty: There are still local buffer pools.");
-		}
-		if (!receiverCache.isEmpty()) {
-			throw new IllegalStateException("Channel manager caches not empty: There are still entries in the receiver cache.");
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
deleted file mode 100644
index 51a0f94..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionInfoLookupResponse.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.Serializable;
-
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public class ConnectionInfoLookupResponse implements Serializable {
-
-	private static final long serialVersionUID = 3961171754642077522L;
-
-	
-	private enum ReturnCode {
-		NOT_FOUND, FOUND_AND_RECEIVER_READY, FOUND_BUT_RECEIVER_NOT_READY, JOB_IS_ABORTING
-	};
-
-	// was request successful?
-	private ReturnCode returnCode;
-	
-	private RemoteReceiver remoteTarget;
-
-	private ChannelID localTarget;
-	
-	public ConnectionInfoLookupResponse() {}
-
-	public ConnectionInfoLookupResponse(ReturnCode code) {
-		this.returnCode = code;
-		this.remoteTarget = null;
-		this.localTarget = null;
-	}
-	
-	public ConnectionInfoLookupResponse(ReturnCode code, ChannelID localTarget) {
-		this.returnCode = code;
-		this.remoteTarget = null;
-		this.localTarget = localTarget;
-	}
-	
-	public ConnectionInfoLookupResponse(ReturnCode code, RemoteReceiver receiver) {
-		this.returnCode = code;
-		this.remoteTarget = receiver;
-		this.localTarget = null;
-	}
-
-	public RemoteReceiver getRemoteTarget() {
-		return this.remoteTarget;
-	}
-
-	public ChannelID getLocalTarget() {
-		return this.localTarget;
-	}
-
-	public boolean receiverNotFound() {
-		return (this.returnCode == ReturnCode.NOT_FOUND);
-	}
-
-	public boolean receiverNotReady() {
-		return (this.returnCode == ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-	}
-
-	public boolean receiverReady() {
-		return (this.returnCode == ReturnCode.FOUND_AND_RECEIVER_READY);
-	}
-
-	public boolean isJobAborting() {
-		return (this.returnCode == ReturnCode.JOB_IS_ABORTING);
-	}
-
-	
-	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(ChannelID targetChannelID) {
-		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, targetChannelID);
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverFoundAndReady(RemoteReceiver remoteReceiver) {
-		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_AND_RECEIVER_READY, remoteReceiver);
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverNotFound() {
-		return new ConnectionInfoLookupResponse(ReturnCode.NOT_FOUND);
-	}
-
-	public static ConnectionInfoLookupResponse createReceiverNotReady() {
-		return new ConnectionInfoLookupResponse(ReturnCode.FOUND_BUT_RECEIVER_NOT_READY);
-	}
-
-	public static ConnectionInfoLookupResponse createJobIsAborting() {
-		return new ConnectionInfoLookupResponse(ReturnCode.JOB_IS_ABORTING);
-	}
-
-	
-	@Override
-	public String toString() {
-		return this.returnCode.name() + ", local target: " + this.localTarget + ", remoteTarget: " + this.remoteTarget;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
new file mode 100644
index 0000000..4a5536b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
+
+import java.io.IOException;
+
+/**
+ * The connection manager manages physical connections for the (logical) remote
+ * input channels at runtime.
+ */
+public interface ConnectionManager {
+
+	void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException;
+
+	/**
+	 * Creates a {@link PartitionRequestClient} instance for the given {@link RemoteAddress}.
+	 */
+	PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException;
+
+	int getNumberOfActiveConnections();
+
+	void shutdown() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
deleted file mode 100644
index 53f403c..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/Envelope.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.serialization.DataInputDeserializer;
-import org.apache.flink.runtime.io.network.serialization.DataOutputSerializer;
-import org.apache.flink.runtime.jobgraph.JobID;
-import org.apache.flink.util.InstantiationUtil;
-
-public final class Envelope {
-
-	private final JobID jobID;
-
-	private final ChannelID source;
-
-	private final int sequenceNumber;
-
-	private ByteBuffer serializedEventList;
-
-	private Buffer buffer;
-
-	public Envelope(int sequenceNumber, JobID jobID, ChannelID source) {
-		this.sequenceNumber = sequenceNumber;
-		this.jobID = jobID;
-		this.source = source;
-	}
-
-	private Envelope(Envelope toDuplicate) {
-		this.jobID = toDuplicate.jobID;
-		this.source = toDuplicate.source;
-		this.sequenceNumber = toDuplicate.sequenceNumber;
-		this.serializedEventList = null;
-		this.buffer = null;
-	}
-
-	public Envelope duplicate() {
-		Envelope duplicate = new Envelope(this);
-		if (hasBuffer()) {
-			duplicate.setBuffer(this.buffer.duplicate());
-		}
-
-		return duplicate;
-	}
-
-	public Envelope duplicateWithoutBuffer() {
-		return new Envelope(this);
-	}
-
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	public ChannelID getSource() {
-		return this.source;
-	}
-
-	public int getSequenceNumber() {
-		return this.sequenceNumber;
-	}
-
-	public void setEventsSerialized(ByteBuffer serializedEventList) {
-		if (this.serializedEventList != null) {
-			throw new IllegalStateException("Event list has already been set.");
-		}
-
-		this.serializedEventList = serializedEventList;
-	}
-
-	public void serializeEventList(List<? extends AbstractEvent> eventList) {
-		if (this.serializedEventList != null) {
-			throw new IllegalStateException("Event list has already been set.");
-		}
-
-		this.serializedEventList = serializeEvents(eventList);
-	}
-
-	public ByteBuffer getEventsSerialized() {
-		return this.serializedEventList;
-	}
-
-	public List<? extends AbstractEvent> deserializeEvents() {
-		return deserializeEvents(getClass().getClassLoader());
-	}
-
-	public List<? extends AbstractEvent> deserializeEvents(ClassLoader classloader) {
-		if (this.serializedEventList == null) {
-			return Collections.emptyList();
-		}
-
-		try {
-			DataInputDeserializer deserializer = new DataInputDeserializer(this.serializedEventList);
-
-			int numEvents = deserializer.readInt();
-			ArrayList<AbstractEvent> events = new ArrayList<AbstractEvent>(numEvents);
-
-			for (int i = 0; i < numEvents; i++) {
-				String className = deserializer.readUTF();
-				Class<? extends AbstractEvent> clazz;
-				try {
-					clazz = Class.forName(className).asSubclass(AbstractEvent.class);
-				} catch (ClassNotFoundException e) {
-					throw new RuntimeException("Could not load event class '" + className + "'.", e);
-				} catch (ClassCastException e) {
-					throw new RuntimeException("The class '" + className + "' is no valid subclass of '" + AbstractEvent.class.getName() + "'.", e);
-				}
-
-				AbstractEvent evt = InstantiationUtil.instantiate(clazz, AbstractEvent.class);
-				evt.read(deserializer);
-
-				events.add(evt);
-			}
-
-			return events;
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while deserializing the events.", e);
-		}
-	}
-
-	public void setBuffer(Buffer buffer) {
-		this.buffer = buffer;
-	}
-
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-
-	private ByteBuffer serializeEvents(List<? extends AbstractEvent> events) {
-		try {
-			// create the serialized event list
-			DataOutputSerializer serializer = events.size() == 0
-				? new DataOutputSerializer(4)
-				: new DataOutputSerializer(events.size() * 32);
-			serializer.writeInt(events.size());
-
-			for (AbstractEvent evt : events) {
-				serializer.writeUTF(evt.getClass().getName());
-				evt.write(serializer);
-			}
-
-			return serializer.wrapAsByteBuffer();
-		}
-		catch (IOException e) {
-			throw new RuntimeException("Error while serializing the task events.", e);
-		}
-	}
-
-	public boolean hasBuffer() {
-		return this.buffer != null;
-	}
-
-	@Override
-	public String toString() {
-		return String.format("Envelope %d [source id: %s, buffer size: %d, events size: %d]",
-				this.sequenceNumber, this.getSource(), this.buffer == null ? -1 : this.buffer.size(),
-				this.serializedEventList == null ? -1 : this.serializedEventList.remaining());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
deleted file mode 100644
index 59cf491..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeDispatcher.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-/**
- * A envelope dispatcher receives {@link Envelope}s and sends them to all of its destinations.
- */
-public interface EnvelopeDispatcher {
-
-	/**
-	 * Dispatches an envelope from an output channel to the receiving input channels (forward flow).
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromOutputChannel(Envelope envelope) throws IOException, InterruptedException;
-
-	/**
-	 * Dispatches an envelope from an input channel to the receiving output channels (backwards flow).
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromInputChannel(Envelope envelope) throws IOException, InterruptedException;
-
-	/**
-	 * Dispatches an envelope from an incoming TCP connection.
-	 * <p>
-	 * After an envelope has been constructed from a TCP socket, this method is called to send the envelope to the
-	 * receiving input channel.
-	 *
-	 * @param envelope envelope to be sent
-	 */
-	void dispatchFromNetwork(Envelope envelope) throws IOException, InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
deleted file mode 100644
index f6e4982..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/EnvelopeReceiverList.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.net.InetAddress;
-
-import org.apache.flink.runtime.io.network.ConnectionInfoLookupResponse;
-import org.apache.flink.runtime.io.network.RemoteReceiver;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- * 
- */
-public class EnvelopeReceiverList {
-
-	private final ChannelID localReceiver;
-
-	private final RemoteReceiver remoteReceiver;
-
-	public EnvelopeReceiverList(ConnectionInfoLookupResponse cilr) {
-		this.localReceiver = cilr.getLocalTarget();
-		this.remoteReceiver = cilr.getRemoteTarget();
-	}
-
-	public EnvelopeReceiverList(ChannelID localReceiver) {
-		this.localReceiver = localReceiver;
-		this.remoteReceiver = null;
-	}
-
-	public EnvelopeReceiverList(RemoteReceiver remoteReceiver) {
-		this.localReceiver = null;
-		this.remoteReceiver = remoteReceiver;
-	}
-
-	public boolean hasLocalReceiver() {
-		return this.localReceiver != null;
-	}
-
-	public boolean hasRemoteReceiver() {
-		return this.remoteReceiver != null;
-	}
-
-	public int getTotalNumberOfReceivers() {
-		return (this.localReceiver == null ? 0 : 1) + (this.remoteReceiver == null ? 0 : 1);
-	}
-
-	public RemoteReceiver getRemoteReceiver() {
-		return this.remoteReceiver;
-	}
-
-	public ChannelID getLocalReceiver() {
-		return this.localReceiver;
-	}
-	
-	@Override
-	public String toString() {
-		return "local receiver: " + this.localReceiver + ", remote receiver: " + this.remoteReceiver;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
deleted file mode 100644
index df4f4ec..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/InsufficientResourcesException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-/**
- * This exception is thrown by the {@link ChannelManager} to indicate that a task cannot be accepted because
- * there are not enough resources available to safely execute it.
- * 
- */
-public final class InsufficientResourcesException extends Exception {
-
-	/**
-	 * The generated serial version UID.
-	 */
-	private static final long serialVersionUID = -8977049569413215169L;
-
-	/**
-	 * Constructs a new insufficient resources exception.
-	 * 
-	 * @param msg
-	 *        the message describing the exception
-	 */
-	InsufficientResourcesException(final String msg) {
-		super(msg);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 264fde6..894db35 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -18,21 +18,24 @@
 
 package org.apache.flink.runtime.io.network;
 
-import java.io.IOException;
+import org.apache.flink.runtime.io.network.netty.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionProvider;
 
-public class LocalConnectionManager implements NetworkConnectionManager {
+import java.io.IOException;
 
-	@Override
-	public void start(ChannelManager channelManager) throws IOException {
-	}
+/**
+ * A connection manager implementation to bypass setup overhead for task managers running in local
+ * execution mode.
+ */
+public class LocalConnectionManager implements ConnectionManager {
 
 	@Override
-	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException {
+	public void start(IntermediateResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {
 	}
 
 	@Override
-	public void close(RemoteReceiver receiver) {
-
+	public PartitionRequestClient createPartitionRequestClient(RemoteAddress remoteAddress) throws IOException {
+		return null;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
deleted file mode 100644
index 84d2d80..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalReceiverCancelledException.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-
-/**
- *
- */
-public class LocalReceiverCancelledException extends CancelTaskException {
-	private static final long serialVersionUID = 1L;
-
-	private final ChannelID receiver;
-
-	public LocalReceiverCancelledException(ChannelID receiver) {
-		this.receiver = receiver;
-	}
-	
-	
-	public ChannelID getReceiver() {
-		return receiver;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
deleted file mode 100644
index 309c92d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-
-public interface NetworkConnectionManager {
-
-	public void start(ChannelManager channelManager) throws IOException;
-
-	public void enqueue(Envelope envelope, RemoteReceiver receiver, boolean isFirstEnvelope) throws IOException;
-
-	public void close(RemoteReceiver receiver);
-
-	public int getNumberOfActiveConnections();
-
-	public void shutdown() throws IOException;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
new file mode 100644
index 0000000..aa6c64c
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import akka.actor.ActorRef;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
+import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.netty.NettyConfig;
+import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartitionManager;
+import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
+import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.concurrent.duration.FiniteDuration;
+
+import java.io.IOException;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * Network I/O components of each {@link TaskManager} instance.
+ */
+public class NetworkEnvironment {
+
+	private static final Logger LOG = LoggerFactory.getLogger(NetworkEnvironment.class);
+
+	private final ActorRef jobManager;
+
+	private final FiniteDuration jobManagerTimeout;
+
+	private final IntermediateResultPartitionManager partitionManager;
+
+	private final TaskEventDispatcher taskEventDispatcher;
+
+	private final NetworkBufferPool networkBufferPool;
+
+	private final ConnectionManager connectionManager;
+
+	private boolean isShutdown;
+
+	/**
+	 * Initializes all network I/O components.
+	 */
+	public NetworkEnvironment(ActorRef jobManager, FiniteDuration jobManagerTimeout, NetworkEnvironmentConfiguration config) throws IOException {
+		this.jobManager = checkNotNull(jobManager);
+		this.jobManagerTimeout = checkNotNull(jobManagerTimeout);
+
+		this.partitionManager = new IntermediateResultPartitionManager();
+		this.taskEventDispatcher = new TaskEventDispatcher();
+
+		// --------------------------------------------------------------------
+		// Network buffers
+		// --------------------------------------------------------------------
+		try {
+			networkBufferPool = new NetworkBufferPool(config.numNetworkBuffers(), config.networkBufferSize());
+		}
+		catch (Throwable t) {
+			throw new IOException("Failed to instantiate network buffer pool: " + t.getMessage(), t);
+		}
+
+		// --------------------------------------------------------------------
+		// Network connections
+		// --------------------------------------------------------------------
+		final Option<NettyConfig> nettyConfig = config.nettyConfig();
+
+		connectionManager = nettyConfig.isDefined() ? new NettyConnectionManager(nettyConfig.get()) : new LocalConnectionManager();
+
+		try {
+			connectionManager.start(partitionManager, taskEventDispatcher);
+		}
+		catch (Throwable t) {
+			throw new IOException("Failed to instantiate network connection manager: " + t.getMessage(), t);
+		}
+	}
+
+	public ActorRef getJobManager() {
+		return jobManager;
+	}
+
+	public FiniteDuration getJobManagerTimeout() {
+		return jobManagerTimeout;
+	}
+
+	public void registerTask(Task task) throws IOException {
+		final ExecutionAttemptID executionId = task.getExecutionId();
+
+		final IntermediateResultPartition[] producedPartitions = task.getProducedPartitions();
+		final BufferWriter[] writers = task.getWriters();
+
+		if (writers.length != producedPartitions.length) {
+			throw new IllegalStateException("Unequal number of writers and partitions.");
+		}
+
+		for (int i = 0; i < producedPartitions.length; i++) {
+			final IntermediateResultPartition partition = producedPartitions[i];
+			final BufferWriter writer = writers[i];
+
+			// Buffer pool for the partition
+			BufferPool bufferPool = null;
+
+			try {
+				bufferPool = networkBufferPool.createBufferPool(partition.getNumberOfQueues(), false);
+				partition.setBufferPool(bufferPool);
+				partitionManager.registerIntermediateResultPartition(partition);
+			}
+			catch (Throwable t) {
+				if (bufferPool != null) {
+					bufferPool.destroy();
+				}
+
+				if (t instanceof IOException) {
+					throw (IOException) t;
+				}
+				else {
+					throw new IOException(t.getMessage(), t);
+				}
+			}
+
+			// Register writer with task event dispatcher
+			taskEventDispatcher.registerWriterForIncomingTaskEvents(executionId, writer.getPartitionId(), writer);
+		}
+
+		// Setup the buffer pool for each buffer reader
+		final BufferReader[] readers = task.getReaders();
+
+		for (BufferReader reader : readers) {
+			BufferPool bufferPool = null;
+
+			try {
+				bufferPool = networkBufferPool.createBufferPool(reader.getNumberOfInputChannels(), false);
+				reader.setBufferPool(bufferPool);
+			}
+			catch (Throwable t) {
+				if (bufferPool != null) {
+					bufferPool.destroy();
+				}
+
+				if (t instanceof IOException) {
+					throw (IOException) t;
+				}
+				else {
+					throw new IOException(t.getMessage(), t);
+				}
+			}
+		}
+	}
+
+	public void unregisterTask(Task task) {
+		if (LOG.isDebugEnabled()) {
+			LOG.debug("Unregistering task {} ({}) from network environment (state: {}).", task.getTaskNameWithSubtasks(), task.getExecutionId(), task.getExecutionState());
+		}
+
+		final ExecutionAttemptID executionId = task.getExecutionId();
+
+		if (task.isCanceledOrFailed()) {
+			partitionManager.failIntermediateResultPartitions(executionId);
+		}
+
+		taskEventDispatcher.unregisterWriters(executionId);
+
+		final BufferReader[] readers = task.getReaders();
+
+		if (readers != null) {
+			for (BufferReader reader : readers) {
+				try {
+					if (reader != null) {
+						reader.releaseAllResources();
+					}
+				}
+				catch (IOException e) {
+					LOG.error("Error during release of reader resources: " + e.getMessage(), e);
+				}
+			}
+		}
+	}
+
+	public IntermediateResultPartitionManager getPartitionManager() {
+		return partitionManager;
+	}
+
+	public TaskEventDispatcher getTaskEventDispatcher() {
+		return taskEventDispatcher;
+	}
+
+	public ConnectionManager getConnectionManager() {
+		return connectionManager;
+	}
+
+	public NetworkBufferPool getNetworkBufferPool() {
+		return networkBufferPool;
+	}
+
+	public boolean hasReleasedAllResources() {
+		String msg = String.format("Network buffer pool: %d missing memory segments. %d registered buffer pools. Connection manager: %d active connections. Task event dispatcher: %d registered writers.",
+				networkBufferPool.getTotalNumberOfMemorySegments() - networkBufferPool.getNumberOfAvailableMemorySegments(), networkBufferPool.getNumberOfRegisteredBufferPools(), connectionManager.getNumberOfActiveConnections(), taskEventDispatcher.getNumberOfRegisteredWriters());
+
+		boolean success = networkBufferPool.getTotalNumberOfMemorySegments() == networkBufferPool.getNumberOfAvailableMemorySegments() &&
+				networkBufferPool.getNumberOfRegisteredBufferPools() == 0 &&
+				connectionManager.getNumberOfActiveConnections() == 0 &&
+				taskEventDispatcher.getNumberOfRegisteredWriters() == 0;
+
+		if (success) {
+			String successMsg = "Network environment did release all resources: " + msg;
+			LOG.debug(successMsg);
+		}
+		else {
+			String errMsg = "Network environment did *not* release all resources: " + msg;
+
+			LOG.error(errMsg);
+		}
+
+		return success;
+	}
+
+	/**
+	 * Tries to shut down all network I/O components.
+	 */
+	public void shutdown() {
+		if (!isShutdown) {
+			try {
+				if (networkBufferPool != null) {
+					networkBufferPool.destroy();
+				}
+			}
+			catch (Throwable t) {
+				LOG.warn("Network buffer pool did not shut down properly: " + t.getMessage(), t);
+			}
+
+			if (partitionManager != null) {
+				try {
+					partitionManager.shutdown();
+				}
+				catch (Throwable t) {
+					LOG.warn("Partition manager did not shut down properly: " + t.getMessage(), t);
+				}
+			}
+
+			try {
+				if (connectionManager != null) {
+					connectionManager.shutdown();
+				}
+			}
+			catch (Throwable t) {
+				LOG.warn("Network connection manager did not shut down properly: " + t.getMessage(), t);
+			}
+
+			isShutdown = true;
+		}
+	}
+
+	public boolean isShutdown() {
+		return isShutdown;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
new file mode 100644
index 0000000..937055b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteAddress.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.IntermediateResult;
+import org.apache.flink.runtime.instance.InstanceConnectionInfo;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A {@link RemoteAddress} identifies a connection to a remote task manager by
+ * the socket address and a connection index. This allows multiple connections
+ * to be distinguished by their connection index.
+ * <p>
+ * The connection index is assigned by the {@link IntermediateResult} and
+ * ensures that it is safe to multiplex multiple data transfers over the same
+ * physical TCP connection.
+ */
+public class RemoteAddress implements IOReadableWritable, Serializable {
+
+	private InetSocketAddress address;
+
+	private int connectionIndex;
+
+	public RemoteAddress(InstanceConnectionInfo connectionInfo, int connectionIndex) {
+		this(new InetSocketAddress(connectionInfo.address(), connectionInfo.dataPort()), connectionIndex);
+	}
+
+	public RemoteAddress(InetSocketAddress address, int connectionIndex) {
+		this.address = checkNotNull(address);
+		checkArgument(connectionIndex >= 0);
+		this.connectionIndex = connectionIndex;
+	}
+
+	public InetSocketAddress getAddress() {
+		return address;
+	}
+
+	public int getConnectionIndex() {
+		return connectionIndex;
+	}
+
+	@Override
+	public int hashCode() {
+		return address.hashCode() + (31 * connectionIndex);
+	}
+
+	@Override
+	public boolean equals(Object other) {
+		if (other.getClass() != RemoteAddress.class) {
+			return false;
+		}
+
+		final RemoteAddress ra = (RemoteAddress) other;
+		if (!ra.getAddress().equals(address) || ra.getConnectionIndex() != connectionIndex) {
+			return false;
+		}
+
+		return true;
+	}
+
+	@Override
+	public String toString() {
+		return address + " [" + connectionIndex + "]";
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
+	// ------------------------------------------------------------------------
+
+	public RemoteAddress() {
+		this.address = null;
+		this.connectionIndex = -1;
+	}
+
+	@Override
+	public void write(final DataOutputView out) throws IOException {
+		final InetAddress ia = address.getAddress();
+		out.writeInt(ia.getAddress().length);
+		out.write(ia.getAddress());
+		out.writeInt(address.getPort());
+
+		out.writeInt(connectionIndex);
+	}
+
+	@Override
+	public void read(final DataInputView in) throws IOException {
+		final byte[] addressBytes = new byte[in.readInt()];
+		in.readFully(addressBytes);
+
+		final InetAddress ia = InetAddress.getByAddress(addressBytes);
+		int port = in.readInt();
+
+		address = new InetSocketAddress(ia, port);
+		connectionIndex = in.readInt();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
deleted file mode 100644
index 436d07d..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-
-import org.apache.flink.core.io.IOReadableWritable;
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-
-/**
- * Objects of this class uniquely identify a connection to a remote {@link org.apache.flink.runtime.taskmanager.TaskManager}.
- */
-public final class RemoteReceiver implements IOReadableWritable, Serializable {
-
-	private static final long serialVersionUID = 4304924747853162443L;
-
-	/**
-	 * The address of the connection to the remote TaskManager.
-	 */
-	private InetSocketAddress connectionAddress;
-
-	/**
-	 * The index of the connection to the remote TaskManager.
-	 */
-	private int connectionIndex;
-
-	/**
-	 * Constructs a new remote receiver object.
-	 * 
-	 * @param connectionAddress
-	 *        the address of the connection to the remote TaskManager
-	 * @param connectionIndex
-	 *        the index of the connection to the remote TaskManager
-	 */
-	public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
-		if (connectionAddress == null) {
-			throw new IllegalArgumentException("Argument connectionAddress must not be null");
-		}
-		if (connectionIndex < 0) {
-			throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
-		}
-
-		this.connectionAddress = connectionAddress;
-		this.connectionIndex = connectionIndex;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public RemoteReceiver() {
-		this.connectionAddress = null;
-		this.connectionIndex = -1;
-	}
-
-	/**
-	 * Returns the address of the connection to the remote TaskManager.
-	 * 
-	 * @return the address of the connection to the remote TaskManager
-	 */
-	public InetSocketAddress getConnectionAddress() {
-		return this.connectionAddress;
-	}
-
-	/**
-	 * Returns the index of the connection to the remote TaskManager.
-	 * 
-	 * @return the index of the connection to the remote TaskManager
-	 */
-	public int getConnectionIndex() {
-		return this.connectionIndex;
-	}
-
-
-	@Override
-	public int hashCode() {
-		return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof RemoteReceiver)) {
-			return false;
-		}
-
-		final RemoteReceiver rr = (RemoteReceiver) obj;
-		if (!this.connectionAddress.equals(rr.connectionAddress)) {
-			return false;
-		}
-
-		if (this.connectionIndex != rr.connectionIndex) {
-			return false;
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		final InetAddress ia = this.connectionAddress.getAddress();
-		out.writeInt(ia.getAddress().length);
-		out.write(ia.getAddress());
-		out.writeInt(this.connectionAddress.getPort());
-
-		out.writeInt(this.connectionIndex);
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-		final int addr_length = in.readInt();
-		final byte[] address = new byte[addr_length];
-		in.readFully(address);
-
-		InetAddress ia = InetAddress.getByAddress(address);
-		int port = in.readInt();
-		this.connectionAddress = new InetSocketAddress(ia, port);
-
-		this.connectionIndex = in.readInt();
-	}
-
-
-	@Override
-	public String toString() {
-		return this.connectionAddress + " (" + this.connectionIndex + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
deleted file mode 100644
index 7cdbabc..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/SenderHintEvent.java
+++ /dev/null
@@ -1,122 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.io.network;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.core.memory.DataInputView;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.event.task.AbstractEvent;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-
-public final class SenderHintEvent extends AbstractEvent {
-
-	/**
-	 * The sequence number that will be set for transfer envelopes which contain the sender hint event.
-	 */
-	private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
-
-	private final ChannelID source;
-
-	private final RemoteReceiver remoteReceiver;
-
-	SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
-		if (source == null) {
-			throw new IllegalArgumentException("Argument source must not be null");
-		}
-
-		if (remoteReceiver == null) {
-			throw new IllegalArgumentException("Argument remoteReceiver must not be null");
-		}
-
-		this.source = source;
-		this.remoteReceiver = remoteReceiver;
-	}
-
-	public SenderHintEvent() {
-
-		this.source = new ChannelID();
-		this.remoteReceiver = new RemoteReceiver();
-	}
-
-	public ChannelID getSource() {
-
-		return this.source;
-	}
-
-	public RemoteReceiver getRemoteReceiver() {
-
-		return this.remoteReceiver;
-	}
-
-
-	@Override
-	public void write(final DataOutputView out) throws IOException {
-
-		this.source.write(out);
-		this.remoteReceiver.write(out);
-	}
-
-
-	@Override
-	public void read(final DataInputView in) throws IOException {
-
-		this.source.read(in);
-		this.remoteReceiver.read(in);
-	}
-
-	public static Envelope createEnvelopeWithEvent(final Envelope originalEnvelope,
-			final ChannelID source, final RemoteReceiver remoteReceiver) {
-
-		final Envelope envelope = new Envelope(SENDER_HINT_SEQUENCE_NUMBER,
-			originalEnvelope.getJobID(), originalEnvelope.getSource());
-
-		final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
-		envelope.serializeEventList(Arrays.asList(senderEvent));
-
-		return envelope;
-	}
-
-	static boolean isSenderHintEvent(final Envelope envelope) {
-
-		if (envelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
-			return false;
-		}
-
-		if (envelope.getBuffer() != null) {
-			return false;
-		}
-
-		List<? extends AbstractEvent> events = envelope.deserializeEvents();
-
-		if (events.size() != 1) {
-			return false;
-		}
-
-		if (!(events.get(0) instanceof SenderHintEvent)) {
-			return false;
-		}
-
-		return true;
-	}
-}


Mime
View raw message