flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/2] git commit: [FLINK-998] Close network connections when idle
Date Tue, 29 Jul 2014 23:03:46 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/master bef5e5487 -> a45c14b6d


[FLINK-998] Close network connections when idle

1. Close idle TCP connections

Tasks enqueue envelopes to be dispatched via NettyConnectionManager. The
first task to enqueue an envelope triggers the establishment of a new
TCP connection to the target task manager. Usually, there are multiple
connections between the same task managers in order to avoid deadlocks.

This commit addresses the following problem: idle connections were never
closed. This meant that the number of TCP connections was monotonically
increasing over time. Idling connections are now closed after a
configured amount of time, see 'taskmanager.net.closeAfterIdleForMs'.

2. Enqueue envelopes directly

TCP connections are handled by Netty's IO loop. Every enqueue operation
needs to ensure that the envelope to be enqueued is handed over to the
respective IO thread in a thread-safe manner. Previously, this was done
via Netty's user events.

This commit addresses the following problem: every enqueue operation
resulted in the creation of an IO event loop task to hand over the
envelope. Envelopes are now directly added to a ConcurrentLinkedQueue,
which drastically improves the throughput of enqueue operations. The
user event is now only fired to trigger the processing if the queue was
empty.

3. Added unit tests for OutboundConnectionQueue and adapted existing
tests to test concurrent enqueue and close

4. Removed unused arguments to NettyConnectionManager (low and high
water mark)

This closes #76.


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/52512636
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/52512636
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/52512636

Branch: refs/heads/master
Commit: 52512636444902497e47ccbfb1cabaffb3e23343
Parents: bef5e54
Author: uce <u.celebi@fu-berlin.de>
Authored: Thu Jul 17 16:32:06 2014 +0200
Committer: uce <u.celebi@fu-berlin.de>
Committed: Wed Jul 30 01:03:07 2014 +0200

----------------------------------------------------------------------
 .../flink/configuration/ConfigConstants.java    |  20 +-
 .../io/network/LocalConnectionManager.java      |   9 +-
 .../io/network/NetworkConnectionManager.java    |   7 +-
 .../runtime/io/network/RemoteReceiver.java      |   2 +-
 .../network/netty/NettyConnectionManager.java   | 108 ++--
 .../network/netty/OutboundConnectionQueue.java  | 164 +++--
 .../flink/runtime/taskmanager/TaskManager.java  |  13 +-
 .../netty/NettyConnectionManagerTest.java       | 272 +++++----
 .../netty/OutboundConnectionQueueTest.java      | 606 +++++++++++++++++++
 9 files changed, 988 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
index 0c0800a..20a7019 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java
@@ -117,14 +117,9 @@ public final class ConfigConstants {
 	public static final String TASK_MANAGER_NET_NUM_OUT_THREADS_KEY = "taskmanager.net.numOutThreads";
 
 	/**
-	 * The low water mark used in NettyConnectionManager for the Bootstrap.
+	 * The minimum time in ms a channel must be idle, before it will be closed.
 	 */
-	public static final String TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = "taskmanager.net.nettyLowWaterMark";
-
-	/**
-	 * The high water mark used in NettyConnectionManager for the Bootstrap.
-	 */
-	public static final String TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = "taskmanager.net.nettyHighWaterMark";
+	public static final String TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY = "taskmanager.net.closeAfterIdleForMs";
 	
 	/**
 	 * Parameter for the interval in which the TaskManager sends the periodic heart beat messages
@@ -354,16 +349,9 @@ public final class ConfigConstants {
 	public static final int DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS = -1;
 
 	/**
-	 * Default low water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
-	 * will use half of the network buffer size as the low water mark.
-	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK = -1;
-
-	/**
-	 * Default high water mark used in NettyConnectionManager for the Bootstrap. If set to -1, NettyConnectionManager
-	 * will use the network buffer size as the high water mark.
+	 * The minimum time in ms a channel must be idle, before it will be closed.
 	 */
-	public static final int DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK = -1;
+	public static final int DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS = 10000;
 
 	/**
 	 * The default interval for TaskManager heart beats (2000 msecs).

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
index 645f71e..612d6d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
@@ -16,11 +16,14 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
 
+/**
+ * Local dummy network connection manager used, when the task manager
+ * runs in local mode without starting up the remote network stack.
+ */
 public class LocalConnectionManager implements NetworkConnectionManager {
 
 	@Override
@@ -32,6 +35,10 @@ public class LocalConnectionManager implements NetworkConnectionManager {
 	}
 
 	@Override
+	public void close(RemoteReceiver receiver) {
+	}
+
+	@Override
 	public void shutdown() throws IOException {
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
index 7e3e0b1..ceef2c7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkConnectionManager.java
@@ -16,16 +16,21 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network;
 
 import java.io.IOException;
 
+/**
+ * The network connection manager is responsible to dispatch envelopes
+ * to remote receivers.
+ */
 public interface NetworkConnectionManager {
 
 	public void start(ChannelManager channelManager) throws IOException;
 
 	public void enqueue(Envelope envelope, RemoteReceiver receiver) throws IOException;
 
+	public void close(RemoteReceiver receiver);
+
 	public void shutdown() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
index 23d1205..8815ea5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/RemoteReceiver.java
@@ -31,7 +31,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
  * 
  */
-public final class RemoteReceiver implements IOReadableWritable {
+public class RemoteReceiver implements IOReadableWritable {
 
 	/**
 	 * The address of the connection to the remote {@link TaskManager}.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 073893e..7e9af6f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.bootstrap.Bootstrap;
@@ -31,7 +30,6 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.io.network.ChannelManager;
@@ -70,12 +68,19 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 
 	private final int highWaterMark;
 
+	private final int closeAfterIdleForMs;
+
 	private ServerBootstrap in;
 
 	private Bootstrap out;
 
-	public NettyConnectionManager(InetAddress bindAddress, int bindPort, int bufferSize, int numInThreads,
-								int numOutThreads, int lowWaterMark, int highWaterMark) {
+	public NettyConnectionManager(
+			InetAddress bindAddress,
+			int bindPort,
+			int bufferSize,
+			int numInThreads,
+			int numOutThreads,
+			int closeAfterIdleForMs) {
 
 		this.bindAddress = bindAddress;
 		this.bindPort = bindPort;
@@ -87,14 +92,19 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		this.numInThreads = (numInThreads == -1) ? defaultNumThreads : numInThreads;
 		this.numOutThreads = (numOutThreads == -1) ? defaultNumThreads : numOutThreads;
 
-		this.lowWaterMark = (lowWaterMark == -1) ? bufferSize / 2 : lowWaterMark;
-		this.highWaterMark = (highWaterMark == -1) ? bufferSize : highWaterMark;
+		this.lowWaterMark = bufferSize / 2;
+		this.highWaterMark = bufferSize;
+
+		this.closeAfterIdleForMs = closeAfterIdleForMs;
 	}
 
 	@Override
 	public void start(ChannelManager channelManager) throws IOException {
-		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.", numInThreads, numOutThreads));
-		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.", lowWaterMark, highWaterMark));
+		LOG.info(String.format("Starting with %d incoming and %d outgoing connection threads.",
+				numInThreads, numOutThreads));
+		LOG.info(String.format("Setting low water mark to %d and high water mark to %d bytes.",
+				lowWaterMark, highWaterMark));
+		LOG.info(String.format("Close channels after idle for %d ms.", closeAfterIdleForMs));
 
 		final BufferProviderBroker bufferProviderBroker = channelManager;
 		final EnvelopeDispatcher envelopeDispatcher = channelManager;
@@ -102,16 +112,16 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		int numHeapArenas = 0;
 		int numDirectArenas = numInThreads + numOutThreads;
 		int pageSize = bufferSize << 1;
-		int chunkSize = 16 * 1 << 20; // 16 MB
+		int chunkSize = 16 << 20; // 16 MB
 
 		// shift pageSize maxOrder times to get to chunkSize
-		int maxOrder = (int) (Math.log(chunkSize/pageSize) / Math.log(2));
+		int maxOrder = (int) (Math.log(chunkSize / pageSize) / Math.log(2));
 
 		PooledByteBufAllocator pooledByteBufAllocator =
 				new PooledByteBufAllocator(true, numHeapArenas, numDirectArenas, pageSize, maxOrder);
 
 		String msg = String.format("Instantiated PooledByteBufAllocator with direct arenas: %d, heap arenas: %d, " +
-				"page size (bytes): %d, chunk size (bytes): %d.",
+						"page size (bytes): %d, chunk size (bytes): %d.",
 				numDirectArenas, numHeapArenas, pageSize, (pageSize << maxOrder));
 		LOG.info(msg);
 
@@ -188,7 +198,7 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 		// 2) a channel that is in buildup (sometimes) -> attach to the future and wait for the actual channel
 		// 3) not yet existing -> establish the channel
 
-		final Object entry = this.outConnections.get(receiver);
+		final Object entry = outConnections.get(receiver);
 		final OutboundConnectionQueue channel;
 
 		if (entry != null) {
@@ -206,14 +216,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 			// We create a "buildup future" and atomically add it to the map.
 			// Only the thread that really added it establishes the channel.
 			// The others need to wait on that original establisher's future.
-			ChannelInBuildup inBuildup = new ChannelInBuildup(this.out, receiver);
-			Object old = this.outConnections.putIfAbsent(receiver, inBuildup);
+			ChannelInBuildup inBuildup = new ChannelInBuildup(out, receiver, this, closeAfterIdleForMs);
+			Object old = outConnections.putIfAbsent(receiver, inBuildup);
 
 			if (old == null) {
-				this.out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
+				out.connect(receiver.getConnectionAddress()).addListener(inBuildup);
 				channel = inBuildup.waitForChannel();
 
-				Object previous = this.outConnections.put(receiver, channel);
+				Object previous = outConnections.put(receiver, channel);
 
 				if (inBuildup != previous) {
 					throw new IOException("Race condition during channel build up.");
@@ -227,7 +237,19 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 			}
 		}
 
-		channel.enqueue(envelope);
+		if (!channel.enqueue(envelope)) {
+			// The channel has been closed, try again.
+			LOG.debug("Retry enqueue on channel: " + channel + ".");
+
+			// This will either establish a new connection or use the
+			// one, which has been established in the mean time.
+			enqueue(envelope, receiver);
+		}
+	}
+
+	@Override
+	public void close(RemoteReceiver receiver) {
+		outConnections.remove(receiver);
 	}
 
 	@Override
@@ -256,9 +278,9 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 	private String getNonZeroNumQueuedEnvelopes() {
 		StringBuilder str = new StringBuilder();
 
-		str.append(String.format("==== %d outgoing connections ===\n", this.outConnections.size()));
+		str.append(String.format("==== %d outgoing connections ===\n", outConnections.size()));
 
-		for (Map.Entry<RemoteReceiver, Object> entry : this.outConnections.entrySet()) {
+		for (Map.Entry<RemoteReceiver, Object> entry : outConnections.entrySet()) {
 			RemoteReceiver receiver = entry.getKey();
 
 			Object value = entry.getValue();
@@ -294,41 +316,52 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 
 		private final RemoteReceiver receiver;
 
-		private ChannelInBuildup(Bootstrap out, RemoteReceiver receiver) {
+		private final NetworkConnectionManager connectionManager;
+
+		private final int closeAfterIdleMs;
+
+		private ChannelInBuildup(
+				Bootstrap out,
+				RemoteReceiver receiver,
+				NetworkConnectionManager connectionManager,
+				int closeAfterIdleMs) {
+
 			this.out = out;
 			this.receiver = receiver;
+			this.connectionManager = connectionManager;
+			this.closeAfterIdleMs = closeAfterIdleMs;
 		}
 
 		private void handInChannel(OutboundConnectionQueue c) {
-			synchronized (this.lock) {
-				this.channel = c;
-				this.lock.notifyAll();
+			synchronized (lock) {
+				channel = c;
+				lock.notifyAll();
 			}
 		}
 
-		private void notifyOfError(Throwable error) {
-			synchronized (this.lock) {
-				this.error = error;
-				this.lock.notifyAll();
+		private void notifyOfError(Throwable t) {
+			synchronized (lock) {
+				error = t;
+				lock.notifyAll();
 			}
 		}
 
 		private OutboundConnectionQueue waitForChannel() throws IOException {
-			synchronized (this.lock) {
-				while (this.error == null && this.channel == null) {
+			synchronized (lock) {
+				while (error == null && channel == null) {
 					try {
-						this.lock.wait(2000);
+						lock.wait(2000);
 					} catch (InterruptedException e) {
 						throw new RuntimeException("Channel buildup interrupted.");
 					}
 				}
 			}
 
-			if (this.error != null) {
+			if (error != null) {
 				throw new IOException("Connecting the channel failed: " + error.getMessage(), error);
 			}
 
-			return this.channel;
+			return channel;
 		}
 
 		@Override
@@ -338,13 +371,14 @@ public class NettyConnectionManager implements NetworkConnectionManager {
 					LOG.debug(String.format("Channel %s connected", future.channel()));
 				}
 
-				handInChannel(new OutboundConnectionQueue(future.channel()));
+				handInChannel(new OutboundConnectionQueue(
+						future.channel(), receiver, connectionManager, closeAfterIdleMs));
 			}
-			else if (this.numRetries > 0) {
-				LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", this.numRetries));
+			else if (numRetries > 0) {
+				LOG.debug(String.format("Connection request did not succeed, retrying (%d attempts left)", numRetries));
 
-				this.out.connect(this.receiver.getConnectionAddress()).addListener(this);
-				this.numRetries--;
+				out.connect(receiver.getConnectionAddress()).addListener(this);
+				numRetries--;
 			}
 			else {
 				if (future.getClass() != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
index 621a259..eb76be5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.netty;
 
 import io.netty.channel.Channel;
@@ -24,74 +23,137 @@ import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
-
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.flink.runtime.io.network.Envelope;
+import org.apache.flink.runtime.io.network.NetworkConnectionManager;
+import org.apache.flink.runtime.io.network.RemoteReceiver;
 
 import java.util.ArrayDeque;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.Queue;
+import java.util.concurrent.TimeUnit;
 
-public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implements ChannelFutureListener {
+public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
+
+	private static enum QueueEvent {
+		TRIGGER_WRITE
+	}
 
 	private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
 
+	private final ChannelWriteListener writeListener = new ChannelWriteListener();
+
 	private final Channel channel;
 
-	private final ArrayDeque<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
+	private final Queue<Envelope> queuedEnvelopes = new ArrayDeque<Envelope>();
 
-	private final AtomicInteger numQueuedEnvelopes = new AtomicInteger(0);
+	private final RemoteReceiver receiver;
+
+	private final NetworkConnectionManager connectionManager;
+
+	// Flag to indicate whether a channel close was requested. This flag is true,
+	// iff there are no queued envelopes, when the channel is idling. After a
+	// successful close request, enqueue should return false.
+	private boolean hasRequestedClose = false;
+
+	public OutboundConnectionQueue(
+			Channel channel,
+			RemoteReceiver receiver,
+			NetworkConnectionManager connectionManager,
+			int closeAfterIdleForMs) {
 
-	public OutboundConnectionQueue(Channel channel) {
 		this.channel = channel;
+		this.receiver = receiver;
+		this.connectionManager = connectionManager;
 
-		channel.pipeline().addFirst(this);
+		channel.pipeline().addFirst("Outbound Connection Queue", this);
+		channel.pipeline().addFirst("Idle State Handler",
+				new IdleStateHandler(0, 0, closeAfterIdleForMs, TimeUnit.MILLISECONDS));
 	}
 
 	/**
-	 * Enqueues an envelope to be sent later.
+	 * Enqueues an envelope to be sent.
 	 * <p/>
 	 * This method is always invoked by the task thread that wants the envelope sent.
+	 * <p/>
+	 * If this method returns <code>false</code>, the channel cannot be used to enqueue
+	 * envelopes any more and the caller needs to establish a new connection to the target.
+	 * The current envelope needs to be enqueued at the new channel.
 	 *
-	 * @param env The envelope to be sent.
+	 * @param env the envelope to be sent
+	 * @return true, if successfully enqueued or false, if the channel was requested to be closed
 	 */
-	public void enqueue(Envelope env) {
-		// the user event trigger ensure thread-safe hand-over of the envelope
-		this.channel.pipeline().fireUserEventTriggered(env);
-	}
+	public boolean enqueue(Envelope env) {
+		boolean triggerWrite;
 
-	@Override
-	public void userEventTriggered(ChannelHandlerContext ctx, Object envelopeToEnqueue) throws Exception {
-		boolean triggerWrite = this.queuedEnvelopes.isEmpty();
+		synchronized (channel) {
+			if (hasRequestedClose) {
+				// The caller has to ensure that the envelope gets queued to
+				// a new channel.
+				return false;
+			}
+
+			// Initiate envelope processing, after the queue state has
+			// changed from empty to non-empty.
+			triggerWrite = queuedEnvelopes.isEmpty();
 
-		this.queuedEnvelopes.addLast((Envelope) envelopeToEnqueue);
-		this.numQueuedEnvelopes.incrementAndGet();
+			queuedEnvelopes.add(env);
+		}
 
 		if (triggerWrite) {
-			writeAndFlushNextEnvelopeIfPossible();
+			channel.pipeline().fireUserEventTriggered(QueueEvent.TRIGGER_WRITE);
 		}
-	}
 
-	@Override
-	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
-		writeAndFlushNextEnvelopeIfPossible();
+		return true;
 	}
 
 	@Override
-	public void operationComplete(ChannelFuture future) throws Exception {
-		if (future.isSuccess()) {
+	public void userEventTriggered(ChannelHandlerContext ctx, Object event) throws Exception {
+		if (event.getClass() == QueueEvent.class) {
+			// Initiate envelope processing, after the queue state has
+			// changed from empty to non-empty.
 			writeAndFlushNextEnvelopeIfPossible();
 		}
-		else if (future.cause() != null) {
-			exceptionOccurred(future.cause());
+		else if (event.getClass() == IdleStateEvent.class) {
+			// Channel idle => try to close
+			boolean closeConnection = false;
+
+			// Only close the connection, if there are no queued envelopes. We have
+			// to ensure that there is no race between closing the channel and
+			// enqueuing a new envelope.
+			synchronized (channel) {
+				if (queuedEnvelopes.isEmpty() && !hasRequestedClose) {
+
+					hasRequestedClose = true;
+
+					closeConnection = true;
+
+					// Notify the connection manager that this channel has been
+					// closed.
+					connectionManager.close(receiver);
+				}
+			}
+
+			if (closeConnection) {
+				ctx.close().addListener(new ChannelCloseListener());
+			}
 		}
 		else {
-			exceptionOccurred(new Exception("Envelope send aborted."));
+			throw new IllegalStateException("Triggered unknown event.");
 		}
 	}
 
+	@Override
+	public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
+		writeAndFlushNextEnvelopeIfPossible();
+	}
+
 	public int getNumQueuedEnvelopes() {
-		return this.numQueuedEnvelopes.intValue();
+		synchronized (channel) {
+			return queuedEnvelopes.size();
+		}
 	}
 
 	@Override
@@ -100,16 +162,48 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter implem
 	}
 
 	private void writeAndFlushNextEnvelopeIfPossible() {
-		if (this.channel.isWritable() && !this.queuedEnvelopes.isEmpty()) {
-			Envelope nextEnvelope = this.queuedEnvelopes.pollFirst();
-			this.numQueuedEnvelopes.decrementAndGet();
+		Envelope nextEnvelope = null;
+
+		synchronized (channel) {
+			if (channel.isWritable() && !queuedEnvelopes.isEmpty()) {
+				nextEnvelope = queuedEnvelopes.poll();
+			}
+		}
+
+		if (nextEnvelope != null) {
+			channel.writeAndFlush(nextEnvelope).addListener(writeListener);
+		}
+	}
+
+	private class ChannelWriteListener implements ChannelFutureListener {
+
+		@Override
+		public void operationComplete(ChannelFuture future) throws Exception {
+			if (future.isSuccess()) {
+				writeAndFlushNextEnvelopeIfPossible();
+			}
+			else {
+				exceptionOccurred(future.cause() == null
+						? new Exception("Envelope send aborted.")
+						: future.cause());
+			}
+		}
+	}
+
+	private class ChannelCloseListener implements ChannelFutureListener {
 
-			this.channel.writeAndFlush(nextEnvelope).addListener(this);
+		@Override
+		public void operationComplete(ChannelFuture future) throws Exception {
+			if (!future.isSuccess()) {
+				exceptionOccurred(future.cause() == null
+						? new Exception("Close failed.")
+						: future.cause());
+			}
 		}
 	}
 
 	private void exceptionOccurred(Throwable t) throws Exception {
-		LOG.error(String.format("An exception occurred in Channel %s: %s", this.channel, t.getMessage()));
+		LOG.error(String.format("Exception in Channel %s: %s", channel, t.getMessage()));
 		throw new Exception(t);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
index 69d4869..7521218 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskManager.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.taskmanager;
 
 import java.io.File;
@@ -318,17 +317,13 @@ public class TaskManager implements TaskOperationProtocol {
 							ConfigConstants.TASK_MANAGER_NET_NUM_OUT_THREADS_KEY,
 							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NUM_OUT_THREADS);
 
-					int lowWaterMark = GlobalConfiguration.getInteger(
-							ConfigConstants.TASK_MANAGER_NET_NETTY_LOW_WATER_MARK,
-							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_LOW_WATER_MARK);
-
-					int highWaterMark = GlobalConfiguration.getInteger(
-							ConfigConstants.TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK,
-							ConfigConstants.DEFAULT_TASK_MANAGER_NET_NETTY_HIGH_WATER_MARK);
+					int closeAfterIdleForMs = GlobalConfiguration.getInteger(
+							ConfigConstants.TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS_KEY,
+							ConfigConstants.DEFAULT_TASK_MANAGER_NET_CLOSE_AFTER_IDLE_FOR_MS);
 
 					networkConnectionManager = new NettyConnectionManager(
 							localInstanceConnectionInfo.address(), localInstanceConnectionInfo.dataPort(),
-							bufferSize, numInThreads, numOutThreads, lowWaterMark, highWaterMark);
+							bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
 					break;
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
index dfb654d..cfb6ae5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManagerTest.java
@@ -16,18 +16,16 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.io.network.netty;
 
-import org.junit.Assert;
-
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.Envelope;
 import org.apache.flink.runtime.io.network.RemoteReceiver;
 import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Assert;
 import org.mockito.Matchers;
+import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
@@ -38,158 +36,206 @@ import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
 
 public class NettyConnectionManagerTest {
 
 	private final static long RANDOM_SEED = 520346508276087l;
 
-	private final static Random random = new Random(RANDOM_SEED);
+	private Random rand = new Random(RANDOM_SEED);
 
-	private final static int BIND_PORT = 20000;
+	private NettyConnectionManager senderManager;
 
-	private final static int BUFFER_SIZE = 32 * 1024;
+	private NettyConnectionManager receiverManager;
 
-	public void testEnqueueRaceAndDeadlockFreeMultipleChannels() throws Exception {
-		Integer[][] configs = new Integer[][]{
-				{64, 4096, 1, 1, 1},
-				{128, 2048, 1, 1, 1},
-				{256, 1024, 1, 1, 1},
-				{512, 512, 1, 1, 1},
-				{64, 4096, 4, 1, 1},
-				{128, 2048, 4, 1, 1},
-				{256, 1024, 4, 1, 1},
-				{512, 512, 4, 1, 1},
-				{64, 4096, 4, 2, 2},
-				{128, 2048, 4, 2, 2},
-				{256, 1024, 4, 2, 2},
-				{512, 512, 4, 2, 2}
-		};
+	private ChannelManager channelManager;
 
-		for (Integer[] params : configs) {
-			System.out.println(String.format("Running %s with config: %d sub tasks, %d envelopes to send per subtasks, "
-					+ "%d num channels, %d num in threads, %d num out threads.",
-					"testEnqueueRaceAndDeadlockFreeMultipleChannels", params[0], params[1], params[2], params[3], params[4]));
+	private RemoteReceiver[] receivers;
 
-			long start = System.currentTimeMillis();
-			doTestEnqueueRaceAndDeadlockFreeMultipleChannels(params[0], params[1], params[2], params[3], params[4]);
-			long end = System.currentTimeMillis();
+	private CountDownLatch receivedAllEnvelopesLatch;
 
-			System.out.println(String.format("Runtime: %d ms.", (end - start)));
-		}
-	}
+	private void initTest(
+			int numProducers,
+			final int numEnvelopesPerProducer,
+			int numInThreads,
+			int numOutThreads,
+			int closeAfterIdleForMs) throws Exception {
 
-	private void doTestEnqueueRaceAndDeadlockFreeMultipleChannels(
-			int numSubtasks, final int numToSendPerSubtask, int numChannels, int numInThreads, int numOutThreads)
-			throws Exception {
+		final InetAddress bindAddress = InetAddress.getLocalHost();
+		final int bindPort = 20000;
+		final int bufferSize = 32 * 1024;
 
-		final InetAddress localhost = InetAddress.getLocalHost();
-		final CountDownLatch latch = new CountDownLatch(numSubtasks);
+		senderManager = Mockito.spy(new NettyConnectionManager(
+				bindAddress, bindPort, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs));
 
-		// --------------------------------------------------------------------
-		// setup
-		// --------------------------------------------------------------------
-		ChannelManager channelManager = mock(ChannelManager.class);
-		doAnswer(new VerifyEnvelopes(latch, numToSendPerSubtask))
-				.when(channelManager).dispatchFromNetwork(Matchers.<Envelope>anyObject());
+		receiverManager = new NettyConnectionManager(
+				bindAddress, bindPort + 1, bufferSize, numInThreads, numOutThreads, closeAfterIdleForMs);
 
-		final NettyConnectionManager senderConnManager = new NettyConnectionManager(localhost, BIND_PORT, BUFFER_SIZE,
-				numInThreads, numOutThreads, -1, -1);
-		senderConnManager.start(channelManager);
+		channelManager = Mockito.mock(ChannelManager.class);
 
-		NettyConnectionManager receiverConnManager = new NettyConnectionManager(localhost, BIND_PORT + 1, BUFFER_SIZE,
-				numInThreads, numOutThreads, -1, -1);
-		receiverConnManager.start(channelManager);
+		senderManager.start(channelManager);
+		receiverManager.start(channelManager);
+
+		receivers = new RemoteReceiver[numProducers];
+		for (int i = 0; i < numProducers; i++) {
+			receivers[i] = new RemoteReceiver(new InetSocketAddress(bindPort + 1), i);
+		}
 
 		// --------------------------------------------------------------------
-		// start sender threads
-		// --------------------------------------------------------------------
-		RemoteReceiver[] receivers = new RemoteReceiver[numChannels];
 
-		for (int i = 0; i < numChannels; i++) {
-			receivers[i] = new RemoteReceiver(new InetSocketAddress(localhost, BIND_PORT + 1), i);
-		}
+		receivedAllEnvelopesLatch = new CountDownLatch(numProducers);
 
-		for (int i = 0; i < numSubtasks; i++) {
-			final RemoteReceiver receiver = receivers[random.nextInt(numChannels)];
+		final ConcurrentMap<ChannelID, Integer> receivedSequenceNums =
+				new ConcurrentHashMap<ChannelID, Integer>();
 
-			final AtomicInteger seqNum = new AtomicInteger(0);
-			final JobID jobId = new JobID();
-			final ChannelID channelId = new ChannelID();
+		// Verifies that the sequence numbers of each producer are received
+		// in ascending incremental order. In addition, manages a latch to
+		// allow synchronization after all envelopes have been received.
+		Mockito.doAnswer(new Answer<Void>() {
+			@Override
+			public Void answer(InvocationOnMock invocation) throws Throwable {
+				final Envelope env = (Envelope) invocation.getArguments()[0];
 
-			new Thread(new Runnable() {
-				@Override
-				public void run() {
-					// enqueue envelopes with ascending seq numbers
-					while (seqNum.get() < numToSendPerSubtask) {
-						try {
-							Envelope env = new Envelope(seqNum.getAndIncrement(), jobId, channelId);
-							senderConnManager.enqueue(env, receiver);
-						} catch (IOException e) {
-							throw new RuntimeException("Unexpected exception while enqueuing envelope.");
-						}
-					}
+				final int currentSeqNum = env.getSequenceNumber();
+				final ChannelID cid = env.getSource();
+
+				if (currentSeqNum < 0 || currentSeqNum >= numEnvelopesPerProducer) {
+					Assert.fail("Received more envelopes than expected from " + cid);
+				}
+
+				Integer previousSeqNum = receivedSequenceNums.put(cid, currentSeqNum);
+
+				if (previousSeqNum != null) {
+					String errMsg = String.format("Received %s with unexpected sequence number.", env);
+					Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
 				}
-			}).start();
-		}
 
-		latch.await();
+				if (currentSeqNum == numEnvelopesPerProducer - 1) {
+					receivedAllEnvelopesLatch.countDown();
+				}
 
-		senderConnManager.shutdown();
-		receiverConnManager.shutdown();
+				return null;
+			}
+		}).when(channelManager).dispatchFromNetwork(Matchers.any(Envelope.class));
 	}
 
-	/**
-	 * Verifies correct ordering of received envelopes (per envelope source channel ID).
-	 */
-	private class VerifyEnvelopes implements Answer<Void> {
+	private void finishTest() throws IOException {
+		senderManager.shutdown();
+		receiverManager.shutdown();
+	}
 
-		private final ConcurrentMap<ChannelID, Integer> received = new ConcurrentHashMap<ChannelID, Integer>();
+	// ------------------------------------------------------------------------
+
+	// Verifies that there are no race conditions or dead locks with
+	// concurrent enqueues and closes.
+	public void testConcurrentEnqueueAndClose() throws Exception {
+		Integer[][] configs = new Integer[][]{
+				// No close after idle
+				{64, 4096, 1, 1, 1, 0, 0},
+				{128, 2048, 1, 1, 1, 0, 0},
+				{256, 1024, 1, 1, 1, 0, 0},
+				{512, 512, 1, 1, 1, 0, 0},
+
+				{64, 4096, 4, 1, 1, 0, 0},
+				{128, 2048, 4, 1, 1, 0, 0},
+				{256, 1024, 4, 1, 1, 0, 0},
+				{512, 512, 4, 1, 1, 0, 0},
+
+				{64, 4096, 4, 2, 2, 0, 0},
+				{128, 2048, 4, 2, 2, 0, 0},
+				{256, 1024, 4, 2, 2, 0, 0},
+				{512, 512, 4, 2, 2, 0, 0},
+				// Note: these need plenty of heap space for the threads
+				{1024, 256, 4, 2, 2, 0, 0},
+				{2048, 128, 4, 2, 2, 0, 0},
+				{4096, 64, 4, 2, 2, 0, 0},
+
+				// With close after idle
+				{4, 1024, 1, 1, 1, 40, 80},
+				{8, 1024, 1, 1, 1, 40, 80},
+				{16, 1024, 1, 1, 1, 40, 80},
+				{32, 1024, 1, 1, 1, 40, 80},
+				{64, 1024, 1, 1, 1, 40, 80},
+
+				{16, 1024, 4, 1, 1, 40, 80},
+				{32, 1024, 4, 1, 1, 40, 80},
+				{64, 1024, 4, 1, 1, 40, 80},
+
+				{16, 1024, 4, 2, 2, 40, 80},
+				{32, 1024, 4, 2, 2, 40, 80},
+				{64, 1024, 4, 2, 2, 40, 80}
+		};
 
-		private final CountDownLatch latch;
+		for (Integer[] params : configs) {
+			System.out.println(String.format("Running testConcurrentEnqueueAndClose with config: " +
+							"%d producers, %d envelopes per producer, %d num channels, " +
+							"%d num in threads, %d num out threads, " +
+							"%d ms min sleep time, %d ms max sleep time.",
+					params[0], params[1], params[2], params[3], params[4], params[5], params[6]));
 
-		private final int numExpectedEnvelopesPerSubtask;
+			long start = System.currentTimeMillis();
+			doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3], params[4], params[5], params[6]);
+			long end = System.currentTimeMillis();
 
-		private VerifyEnvelopes(CountDownLatch latch, int numExpectedEnvelopesPerSubtask) {
-			this.latch = latch;
-			this.numExpectedEnvelopesPerSubtask = numExpectedEnvelopesPerSubtask;
+			System.out.println(String.format("Runtime: %d ms.", (end - start)));
 		}
+	}
 
-		@Override
-		public Void answer(InvocationOnMock invocation) throws Throwable {
-			Envelope env = (Envelope) invocation.getArguments()[0];
+	private void doTestConcurrentEnqueueAndClose(
+			int numProducers,
+			final int numEnvelopesPerProducer,
+			final int numChannels,
+			int numInThreads,
+			int numOutThreads,
+			final int minSleepTimeMs,
+			final int maxSleepTimeMs) throws Exception {
+
+		// The idle time before a close is requested is 1/4th of the min sleep
+		// time of each producer. Depending on the number of concurrent producers
+		// and number of envelopes to send per producer, this will result in a
+		// variable number of close requests.
+		initTest(numProducers, numEnvelopesPerProducer, numInThreads, numOutThreads, minSleepTimeMs / 4);
 
-			ChannelID channelId = env.getSource();
-			int seqNum = env.getSequenceNumber();
+		// --------------------------------------------------------------------
 
-			if (seqNum == 0) {
-				Integer previousSeqNum = this.received.putIfAbsent(channelId, seqNum);
+		Runnable[] producers = new Runnable[numProducers];
 
-				String msg = String.format("Received envelope from %s before, but current seq num is 0", channelId);
-				Assert.assertNull(msg, previousSeqNum);
-			}
-			else {
-				boolean isExpectedPreviousSeqNum = this.received.replace(channelId, seqNum - 1, seqNum);
+		for (int i = 0; i < numProducers; i++) {
+			producers[i] = new Runnable() {
+				@Override
+				public void run() {
+					final JobID jid = new JobID();
+					final ChannelID cid = new ChannelID();
+					final RemoteReceiver receiver = receivers[rand.nextInt(numChannels)];
 
-				String msg = String.format("Received seq num %d from %s, but previous was not %d.",
-						seqNum, channelId, seqNum - 1);
-				Assert.assertTrue(msg, isExpectedPreviousSeqNum);
-			}
+					for (int sequenceNum = 0; sequenceNum < numEnvelopesPerProducer; sequenceNum++) {
+						try {
+							senderManager.enqueue(new Envelope(sequenceNum, jid, cid), receiver);
 
-			// count down the latch if all envelopes received for this source
-			if (seqNum == numExpectedEnvelopesPerSubtask - 1) {
-				this.latch.countDown();
-			}
+							int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
+							Thread.sleep(sleepTime);
+						} catch (Exception e) {
+							throw new RuntimeException(e);
+						}
+					}
+				}
+			};
+		}
 
-			return null;
+		for (int i = 0; i < numProducers; i++) {
+			new Thread(producers[i], "Producer " + i).start();
 		}
+
+		// --------------------------------------------------------------------
+
+		while (receivedAllEnvelopesLatch.getCount() != 0) {
+			receivedAllEnvelopesLatch.await();
+		}
+
+		finishTest();
 	}
 
 	private void runAllTests() throws Exception {
-		testEnqueueRaceAndDeadlockFreeMultipleChannels();
+		testConcurrentEnqueueAndClose();
 
 		System.out.println("Done.");
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/52512636/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
new file mode 100644
index 0000000..1f4d98a
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueueTest.java
@@ -0,0 +1,606 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.embedded.EmbeddedChannel;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.timeout.IdleStateEvent;
+import org.apache.flink.runtime.io.network.Envelope;
+import org.apache.flink.runtime.io.network.NetworkConnectionManager;
+import org.apache.flink.runtime.io.network.RemoteReceiver;
+import org.apache.flink.runtime.io.network.channels.ChannelID;
+import org.apache.flink.runtime.jobgraph.JobID;
+import org.junit.Assert;
+import org.mockito.Mockito;
+import org.powermock.reflect.Whitebox;
+
+import java.net.InetAddress;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Random;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CountDownLatch;
+
+public class OutboundConnectionQueueTest {
+
+	private final static long RANDOM_SEED = 520346508276087l;
+
+	private final Object lock = new Object();
+
+	private Channel channel;
+
+	private NetworkConnectionManager connectionManager;
+
+	private RemoteReceiver receiver;
+
+	private OutboundConnectionQueue queue;
+
+	private TestControlHandler controller;
+
+	private TestVerificationHandler verifier;
+
+	private Throwable exception;
+
+	private void initTest(boolean autoTriggerWrite) {
+		controller = new TestControlHandler(autoTriggerWrite);
+		verifier = new TestVerificationHandler();
+
+		channel = Mockito.spy(new EmbeddedChannel(new ChannelInboundHandlerAdapter() {
+			@Override
+			public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+				exception = cause;
+				super.exceptionCaught(ctx, cause);
+			}
+		}));
+
+		connectionManager = Mockito.mock(NetworkConnectionManager.class);
+
+		receiver = Mockito.mock(RemoteReceiver.class);
+
+		queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
+
+		channel.pipeline().addFirst("Test Control Handler", controller);
+		channel.pipeline().addFirst("Test Verification Handler", verifier);
+
+		exception = null;
+
+		// The testing pipeline looks as follows:
+		// - Test Verification Handler [OUT]
+		// - Test Control Handler [IN]
+		// - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
+		// - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
+		// - Exception setter [IN] [EmbeddedChannel constructor]
+	}
+
+	/**
+	 * Verifies that the channel is closed after an idle event, when
+	 * there are no queued envelopes.
+	 */
+	public void testClose() throws Exception {
+		initTest(false);
+
+		JobID jid = new JobID();
+		ChannelID cid = new ChannelID();
+
+		Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
+		Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
+		Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
+
+		controller.triggerWrite();
+
+		controller.fireIdle();
+
+		verifier.waitForClose();
+
+		verifier.verifyEnvelopeReceived(cid, 3);
+
+		Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
+	}
+
+	/**
+	 * Verifies that the channel is not closed while there are queued
+	 * envelopes.
+	 */
+	public void testCloseWithQueuedEnvelopes() throws Exception {
+		initTest(true);
+
+		final JobID jid = new JobID();
+		final ChannelID cid = new ChannelID();
+		final CountDownLatch sync = verifier.waitForEnvelopes(3, cid);
+
+		// Make channel not writable => envelopes are queued
+		Mockito.when(channel.isWritable()).thenReturn(false);
+
+		Assert.assertTrue(queue.enqueue(new Envelope(1, jid, cid)));
+		Assert.assertTrue(queue.enqueue(new Envelope(2, jid, cid)));
+		Assert.assertTrue(queue.enqueue(new Envelope(3, jid, cid)));
+
+		// Verify idle event doesn't close channel
+		controller.fireIdle();
+
+		Mockito.verify(connectionManager, Mockito.times(0)).close(Mockito.any(RemoteReceiver.class));
+
+		boolean hasRequestedClose = Whitebox.getInternalState(queue, "hasRequestedClose");
+		Assert.assertFalse("Close request while envelope in flight.", hasRequestedClose);
+
+		// Change writability of channel back to writable
+		Mockito.when(channel.isWritable()).thenReturn(true);
+		channel.pipeline().fireChannelWritabilityChanged();
+
+		// Wait for the processing of queued envelopes
+		while (sync.getCount() != 0) {
+			sync.await();
+		}
+
+		verifier.verifyEnvelopeReceived(cid, 3);
+
+		// Now close again
+		controller.fireIdle();
+		verifier.waitForClose();
+
+		Mockito.verify(connectionManager, Mockito.times(1)).close(Mockito.any(RemoteReceiver.class));
+	}
+
+	/**
+	 * Verifies that envelopes are delegated back to the connection
+	 * manager after a close.
+	 */
+	public void testEnqueueAfterClose() throws Exception {
+		initTest(true);
+
+		// Immediately close the channel
+		controller.fireIdle();
+		verifier.waitForClose();
+
+		Assert.assertFalse(queue.enqueue(new Envelope(1, new JobID(), new ChannelID())));
+	}
+
+	/**
+	 * Verifies that multiple idle events are handled correctly.
+	 */
+	public void testMultipleIdleEvents() throws Exception {
+		initTest(true);
+
+		controller.fireIdle();
+		verifier.waitForClose();
+
+		controller.fireIdle();
+
+		// Second close should not cause an exception in the
+		// verification handler.
+		Assert.assertNull(exception);
+	}
+
+	/**
+	 * Verifies that unknown user events throw an exception.
+	 */
+	public void testUnknownUserEvent() throws Exception {
+		initTest(true);
+
+		Assert.assertNull(exception);
+
+		controller.context.fireUserEventTriggered("Unknown user event");
+
+		Assert.assertNotNull(exception);
+		Assert.assertTrue(exception instanceof IllegalStateException);
+	}
+
+	// ------------------------------------------------------------------------
+
+	public void testConcurrentEnqueueAndClose() throws Exception {
+		Integer[][] configs = new Integer[][]{
+				{1, 512, 0, 0},
+				{1, 512, 40, 80},
+				{2, 512, 40, 80},
+				{4, 512, 40, 80},
+				{8, 512, 40, 80},
+				{32, 512, 40, 80},
+				{128, 512, 40, 80},
+				{256, 512, 40, 80},
+				{512, 512, 40, 80}
+		};
+
+		for (Integer[] params : configs) {
+			System.out.println(String.format(
+					"Running %s with config: %d producers, %d envelopes to send per producer, " +
+							"%d ms min sleep time, %d ms max sleep time.", "testConcurrentEnqueueAndClose",
+					params[0], params[1], params[2], params[3]));
+
+			long start = System.currentTimeMillis();
+			doTestConcurrentEnqueueAndClose(params[0], params[1], params[2], params[3]);
+			long end = System.currentTimeMillis();
+
+			System.out.println(String.format("Runtime: %d ms.", (end - start)));
+		}
+	}
+
+	/**
+	 * Verifies that concurrent enqueue and close events are handled
+	 * correctly.
+	 */
+	private void doTestConcurrentEnqueueAndClose(
+			final int numProducers,
+			final int numEnvelopesPerProducer,
+			final int minSleepTimeMs,
+			final int maxSleepTimeMs) throws Exception {
+
+		final InetAddress bindHost = InetAddress.getLocalHost();
+		final int bindPort = 20000;
+
+		// Testing concurrent enqueue and close requires real TCP channels,
+		// because Netty's testing EmbeddedChannel does not implement the
+		// same threading model as the NioEventLoopGroup (for example there
+		// is no difference between being IN and OUTSIDE of the event loop
+		// thread).
+
+		final ServerBootstrap in = new ServerBootstrap();
+		in.group(new NioEventLoopGroup(1))
+				.channel(NioServerSocketChannel.class)
+				.localAddress(bindHost, bindPort)
+				.childHandler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					public void initChannel(SocketChannel channel) throws Exception {
+						channel.pipeline()
+								.addLast(new ChannelInboundHandlerAdapter());
+					}
+				});
+
+		final Bootstrap out = new Bootstrap();
+		out.group(new NioEventLoopGroup(1))
+				.channel(NioSocketChannel.class)
+				.handler(new ChannelInitializer<SocketChannel>() {
+					@Override
+					public void initChannel(SocketChannel channel) throws Exception {
+						channel.pipeline()
+								.addLast(new ChannelOutboundHandlerAdapter());
+					}
+				})
+				.option(ChannelOption.TCP_NODELAY, false)
+				.option(ChannelOption.SO_KEEPALIVE, true);
+
+		in.bind().sync();
+
+		// --------------------------------------------------------------------
+
+		// The testing pipeline looks as follows:
+		// - Test Verification Handler [OUT]
+		// - Test Control Handler [IN]
+		// - Idle State Handler [IN/OUT] [added by OutboundConnectionQueue]
+		// - Outbound queue (SUT) [IN] [added by OutboundConnectionQueue]
+
+		channel = out.connect(bindHost, bindPort).sync().channel();
+
+		queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
+
+		controller = new TestControlHandler(true);
+		verifier = new TestVerificationHandler();
+
+		channel.pipeline().addFirst("Test Control Handler", controller);
+		channel.pipeline().addFirst("Test Verification Handler", verifier);
+
+		// --------------------------------------------------------------------
+
+		final Random rand = new Random(RANDOM_SEED);
+
+		// Every producer works on their local reference of the queue and only
+		// updates it to the new channel when enqueue returns false, which
+		// should only happen if the channel has been closed.
+		final ConcurrentMap<ChannelID, OutboundConnectionQueue> producerQueues =
+				new ConcurrentHashMap<ChannelID, OutboundConnectionQueue>();
+
+		final ChannelID[] ids = new ChannelID[numProducers];
+
+		for (int i = 0; i < numProducers; i++) {
+			ids[i] = new ChannelID();
+
+			producerQueues.put(ids[i], queue);
+		}
+
+		final CountDownLatch receivedAllEnvelopesLatch =
+				verifier.waitForEnvelopes(numEnvelopesPerProducer - 1, ids);
+
+		final List<Channel> closedChannels = new ArrayList<Channel>();
+
+		// --------------------------------------------------------------------
+
+		final Runnable closer = new Runnable() {
+			@Override
+			public void run() {
+				while (receivedAllEnvelopesLatch.getCount() != 0) {
+					try {
+						controller.fireIdle();
+
+						// Test two idle events arriving "closely"
+						// after each other
+						if (rand.nextBoolean()) {
+							controller.fireIdle();
+						}
+
+						Thread.sleep(minSleepTimeMs / 2);
+					} catch (InterruptedException e) {
+						e.printStackTrace();
+					}
+				}
+			}
+		};
+
+		final Runnable[] producers = new Runnable[numProducers];
+
+		for (int i = 0; i < numProducers; i++) {
+			final int index = i;
+
+			producers[i] = new Runnable() {
+				@Override
+				public void run() {
+					final JobID jid = new JobID();
+					final ChannelID cid = ids[index];
+
+					for (int j = 0; j < numEnvelopesPerProducer; j++) {
+						OutboundConnectionQueue localQueue = producerQueues.get(cid);
+
+						try {
+							// This code path is handled by the NetworkConnectionManager
+							// in production to enqueue the envelope either to the current
+							// channel or a new one if it was closed.
+							while (!localQueue.enqueue(new Envelope(j, jid, cid))) {
+								synchronized (lock) {
+									if (localQueue == queue) {
+										closedChannels.add(channel);
+
+										channel = out.connect(bindHost, bindPort).sync().channel();
+
+										queue = new OutboundConnectionQueue(channel, receiver, connectionManager, 0);
+
+										channel.pipeline().addFirst("Test Control Handler", controller);
+										channel.pipeline().addFirst("Test Verification Handler", verifier);
+									}
+								}
+
+								producerQueues.put(cid, queue);
+								localQueue = queue;
+							}
+
+							int sleepTime = rand.nextInt((maxSleepTimeMs - minSleepTimeMs) + 1) + minSleepTimeMs;
+							Thread.sleep(sleepTime);
+						} catch (InterruptedException e) {
+							throw new RuntimeException(e);
+						}
+					}
+				}
+			};
+		}
+
+		for (int i = 0; i < numProducers; i++) {
+			new Thread(producers[i], "Producer " + i).start();
+		}
+
+		new Thread(closer, "Closer").start();
+
+		// --------------------------------------------------------------------
+
+		while (receivedAllEnvelopesLatch.getCount() != 0) {
+			receivedAllEnvelopesLatch.await();
+		}
+
+		// Final close, if the last close didn't make it.
+		synchronized (lock) {
+			if (channel != null) {
+				controller.fireIdle();
+			}
+		}
+
+		verifier.waitForClose();
+
+		// If the producers do not sleep after each envelope, the close
+		// should not make it through and no channel should have been
+		// added to the list of closed channels
+		if (minSleepTimeMs == 0 && maxSleepTimeMs == 0) {
+			Assert.assertEquals(0, closedChannels.size());
+		}
+
+		for (Channel ch : closedChannels) {
+			Assert.assertFalse(ch.isOpen());
+		}
+
+		System.out.println(closedChannels.size() + " channels were closed during execution.");
+
+		out.group().shutdownGracefully().sync();
+		in.group().shutdownGracefully().sync();
+	}
+
+	// ------------------------------------------------------------------------
+
+	// Handler to control the flow of Netty's custom user events. Used
+	// to fire idle events and manually trigger write events.
+	@ChannelHandler.Sharable
+	private static class TestControlHandler extends ChannelInboundHandlerAdapter {
+
+		private final Queue<Object> events = new ArrayDeque<Object>();
+
+		private final boolean forwardUserEvents;
+
+		private ChannelHandlerContext context;
+
+		private TestControlHandler(boolean forwardUserEvents) {
+			this.forwardUserEvents = forwardUserEvents;
+		}
+
+		@Override
+		public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+			synchronized (this) {
+				context = ctx;
+			}
+		}
+
+		@Override
+		public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+			if (forwardUserEvents) {
+				ctx.fireUserEventTriggered(evt);
+			}
+			else {
+				events.add(evt);
+			}
+		}
+
+		@Override
+		public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+			super.exceptionCaught(ctx, cause);
+		}
+
+		public void triggerWrite() {
+			synchronized (this) {
+				if (!events.isEmpty()) {
+					context.fireUserEventTriggered(events.remove());
+				}
+			}
+		}
+
+		public void fireIdle() {
+			synchronized (this) {
+				context.fireUserEventTriggered(IdleStateEvent.ALL_IDLE_STATE_EVENT);
+			}
+		}
+	}
+
+	// Handler to verify writes and close events.
+	@ChannelHandler.Sharable
+	private static class TestVerificationHandler extends ChannelOutboundHandlerAdapter {
+
+		private final Map<ChannelID, Integer> receivedSequenceNums = new HashMap<ChannelID, Integer>();
+
+		private final Map<ChannelID, Integer> expectedSequenceNums = new HashMap<ChannelID, Integer>();
+
+		private final Map<ChannelID, CountDownLatch> envelopeLatches = new HashMap<ChannelID, CountDownLatch>();
+
+		private CountDownLatch closeLatch;
+
+		@Override
+		public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
+			closeLatch = new CountDownLatch(1);
+
+			super.handlerAdded(ctx);
+		}
+
+		@Override
+		public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+			if (closeLatch.getCount() == 0) {
+				throw new IllegalStateException("Write on closed channel.");
+			}
+
+			if (msg.getClass() == Envelope.class) {
+				Envelope env = (Envelope) msg;
+
+				final int currentSeqNum = env.getSequenceNumber();
+				final ChannelID source = env.getSource();
+
+				Integer previousSeqNum = receivedSequenceNums.put(source, currentSeqNum);
+
+				if (previousSeqNum != null) {
+					String errMsg = String.format("Received %s with unexpected sequence number.", env);
+					Assert.assertEquals(errMsg, previousSeqNum + 1, currentSeqNum);
+				}
+
+				promise.setSuccess();
+
+				Integer expectedSeqNum = expectedSequenceNums.get(source);
+				if (expectedSeqNum != null && expectedSeqNum.equals(currentSeqNum)) {
+					envelopeLatches.remove(source).countDown();
+				}
+			}
+		}
+
+		@Override
+		public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
+			if (closeLatch.getCount() == 0) {
+				throw new IllegalStateException("Received multiple close events.");
+			}
+
+			super.close(ctx, promise);
+
+			closeLatch.countDown();
+		}
+
+		public void waitForClose() throws InterruptedException {
+			while (closeLatch.getCount() != 0) {
+				closeLatch.await();
+			}
+		}
+
+		/**
+		 * Verifies that envelope with expected sequence number has been
+		 * processed by the handler (or no envelope has been received by
+		 * the given source, if expected sequence number is null).
+		 */
+		public void verifyEnvelopeReceived(ChannelID source, Integer expectedSequenceNum) {
+			if (expectedSequenceNum == null && receivedSequenceNums.containsKey(source)) {
+				Assert.fail("Received unexpected envelope from channel " + source);
+			}
+			else if (receivedSequenceNums.containsKey(source)) {
+				Assert.assertEquals(expectedSequenceNum, receivedSequenceNums.get(source));
+			}
+			else {
+				Assert.fail("Did not receive any envelope from channel " + source);
+			}
+		}
+
+		public CountDownLatch waitForEnvelopes(Integer expectedSequenceNum, ChannelID... ids) {
+			CountDownLatch latch = new CountDownLatch(ids.length);
+
+			for (ChannelID id : ids) {
+				expectedSequenceNums.put(id, expectedSequenceNum);
+				envelopeLatches.put(id, latch);
+			}
+
+			return latch;
+		}
+	}
+
+	// --------------------------------------------------------------------
+
+	public void runAllTests() throws Exception {
+		testClose();
+		testCloseWithQueuedEnvelopes();
+		testEnqueueAfterClose();
+		testUnknownUserEvent();
+		testMultipleIdleEvents();
+		testConcurrentEnqueueAndClose();
+	}
+
+	public static void main(String[] args) throws Exception {
+		new OutboundConnectionQueueTest().runAllTests();
+	}
+}


Mime
View raw message