flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [18/34] Offer buffer-oriented API for I/O (#25)
Date Tue, 10 Jun 2014 19:35:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
deleted file mode 100644
index d0fa683..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnection.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.channels.SelectionKey;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.DefaultDeserializer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.NoBufferAvailableException;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * This class represents an incoming data connection through which data streams are read and transformed into
- * {@link TransferEnvelope} objects. The source of the data stream is a TCP connection.
- * 
- */
-public class IncomingConnection {
-
-	/**
-	 * The log object used to report debug information and possible errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(IncomingConnection.class);
-
-	/**
-	 * The readable byte channel through which the input data is retrieved.
-	 */
-	private final ReadableByteChannel readableByteChannel;
-
-	/**
-	 * The {@link DefaultDeserializer} used to transform the read bytes into transfer envelopes which can be
-	 * passed on to the respective channels.
-	 */
-	private final DefaultDeserializer deserializer;
-
-	/**
-	 * The byte buffered channel manager which handles and dispatches the received transfer envelopes.
-	 */
-	private final ByteBufferedChannelManager byteBufferedChannelManager;
-
-	public IncomingConnection(ByteBufferedChannelManager byteBufferedChannelManager,
-			ReadableByteChannel readableByteChannel) {
-		this.byteBufferedChannelManager = byteBufferedChannelManager;
-		this.deserializer = new DefaultDeserializer(byteBufferedChannelManager);
-		this.readableByteChannel = readableByteChannel;
-	}
-
-	public void reportTransmissionProblem(SelectionKey key, IOException ioe) {
-
-		LOG.error(StringUtils.stringifyException(ioe));
-
-		try {
-			this.readableByteChannel.close();
-		} catch (IOException e) {
-			LOG.debug("An error occurred while closing the byte channel");
-		}
-
-		// Cancel key
-		if (key != null) {
-			key.cancel();
-		}
-
-		// Recycle read buffer
-		if (this.deserializer.getBuffer() != null) {
-			this.deserializer.getBuffer().recycleBuffer();
-		}
-
-		this.deserializer.reset();
-	}
-
-	public void read() throws IOException, InterruptedException, NoBufferAvailableException {
-
-		this.deserializer.read(this.readableByteChannel);
-
-		final TransferEnvelope transferEnvelope = this.deserializer.getFullyDeserializedTransferEnvelope();
-		if (transferEnvelope != null) {
-
-			final BufferProvider bufferProvider = this.deserializer.getBufferProvider();
-			if (bufferProvider == null) {
-				this.byteBufferedChannelManager.processEnvelopeFromNetwork(transferEnvelope, false);
-			} else {
-				this.byteBufferedChannelManager.processEnvelopeFromNetwork(transferEnvelope, bufferProvider.isShared());
-			}
-		}
-	}
-
-	public boolean isCloseUnexpected() {
-
-		return this.deserializer.hasUnfinishedData();
-	}
-
-	public ReadableByteChannel getReadableByteChannel() {
-		return this.readableByteChannel;
-	}
-
-	public void closeConnection(SelectionKey key) {
-
-		try {
-			this.readableByteChannel.close();
-		} catch (IOException ioe) {
-			LOG.error("On IOException occured while closing the socket: + " + StringUtils.stringifyException(ioe));
-		}
-
-		// Cancel key
-		if (key != null) {
-			key.cancel();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
deleted file mode 100644
index 05c3326..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/IncomingConnectionThread.java
+++ /dev/null
@@ -1,223 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.ServerSocketChannel;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.NoBufferAvailableException;
-import eu.stratosphere.util.StringUtils;
-
-public class IncomingConnectionThread extends Thread {
-
-	private static final Log LOG = LogFactory.getLog(IncomingConnectionThread.class);
-
-	private final ByteBufferedChannelManager byteBufferedChannelManager;
-
-	private final Selector selector;
-
-	private final Queue<SelectionKey> pendingReadEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
-	private final ServerSocketChannel listeningSocket;
-
-	private static final class IncomingConnectionBufferAvailListener implements BufferAvailabilityListener {
-
-		private final Queue<SelectionKey> pendingReadEventSubscribeRequests;
-
-		private final SelectionKey key;
-
-		private IncomingConnectionBufferAvailListener(final Queue<SelectionKey> pendingReadEventSubscribeRequests,
-				final SelectionKey key) {
-
-			this.pendingReadEventSubscribeRequests = pendingReadEventSubscribeRequests;
-			this.key = key;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void bufferAvailable() {
-
-			synchronized (this.pendingReadEventSubscribeRequests) {
-				this.pendingReadEventSubscribeRequests.add(this.key);
-			}
-		}
-	}
-
-	public IncomingConnectionThread(ByteBufferedChannelManager byteBufferedChannelManager,
-			boolean isListeningThread, InetSocketAddress listeningAddress) throws IOException {
-		super("Incoming Connection Thread");
-
-		this.selector = Selector.open();
-		this.byteBufferedChannelManager = byteBufferedChannelManager;
-
-		if (isListeningThread) {
-			this.listeningSocket = ServerSocketChannel.open();
-			this.listeningSocket.configureBlocking(false);
-			listeningSocket.register(this.selector, SelectionKey.OP_ACCEPT);
-			this.listeningSocket.socket().bind(listeningAddress);
-			LOG.debug("Listening on " + this.listeningSocket.socket().getLocalSocketAddress());
-		} else {
-			this.listeningSocket = null;
-		}
-	}
-
-	@Override
-	public void run() {
-
-		while (!this.isInterrupted()) {
-
-			synchronized (this.pendingReadEventSubscribeRequests) {
-				while (!this.pendingReadEventSubscribeRequests.isEmpty()) {
-					final SelectionKey key = this.pendingReadEventSubscribeRequests.poll();
-					final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
-					final SocketChannel socketChannel = (SocketChannel) key.channel();
-
-					try {
-						final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-						newKey.attach(incomingConnection);
-					} catch (ClosedChannelException e) {
-						incomingConnection.reportTransmissionProblem(key, e);
-					}
-				}
-			}
-
-			try {
-				this.selector.select(500);
-			} catch (IOException e) {
-				LOG.error(e);
-			}
-
-			final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
-			while (iter.hasNext()) {
-				final SelectionKey key = iter.next();
-
-				iter.remove();
-				if (key.isValid()) {
-					if (key.isReadable()) {
-						doRead(key);
-					} else if (key.isAcceptable()) {
-						doAccept(key);
-					} else {
-						LOG.error("Unknown key: " + key);
-					}
-				} else {
-					LOG.error("Received invalid key: " + key);
-				}
-			}
-		}
-
-		// Do cleanup, if necessary
-		if (this.listeningSocket != null) {
-			try {
-				this.listeningSocket.close();
-			} catch (IOException ioe) {
-				// Actually, we can ignore this exception
-				LOG.debug(ioe);
-			}
-		}
-
-		// Finally, close the selector
-		try {
-			this.selector.close();
-		} catch (IOException ioe) {
-			LOG.debug(StringUtils.stringifyException(ioe));
-		}
-	}
-
-	private void doAccept(SelectionKey key) {
-
-		SocketChannel clientSocket = null;
-
-		try {
-			clientSocket = this.listeningSocket.accept();
-			if (clientSocket == null) {
-				LOG.error("Client socket is null");
-				return;
-			}
-		} catch (IOException ioe) {
-			LOG.error(ioe);
-			return;
-		}
-
-		final IncomingConnection incomingConnection = new IncomingConnection(this.byteBufferedChannelManager,
-			clientSocket);
-		SelectionKey clientKey = null;
-		try {
-			clientSocket.configureBlocking(false);
-			clientKey = clientSocket.register(this.selector, SelectionKey.OP_READ);
-			clientKey.attach(incomingConnection);
-		} catch (IOException ioe) {
-			incomingConnection.reportTransmissionProblem(clientKey, ioe);
-		}
-	}
-
-	private void doRead(SelectionKey key) {
-
-		final IncomingConnection incomingConnection = (IncomingConnection) key.attachment();
-		try {
-			incomingConnection.read();
-		} catch (EOFException eof) {
-			if (incomingConnection.isCloseUnexpected()) {
-				final SocketChannel socketChannel = (SocketChannel) key.channel();
-				LOG.error("Connection from " + socketChannel.socket().getRemoteSocketAddress()
-					+ " was closed unexpectedly");
-				incomingConnection.reportTransmissionProblem(key, eof);
-			} else {
-				incomingConnection.closeConnection(key);
-			}
-		} catch (IOException ioe) {
-			incomingConnection.reportTransmissionProblem(key, ioe);
-		} catch (InterruptedException e) {
-			// Nothing to do here
-		} catch (NoBufferAvailableException e) {
-			// There are no buffers available, unsubscribe from read event
-			final SocketChannel socketChannel = (SocketChannel) key.channel();
-			try {
-				final SelectionKey newKey = socketChannel.register(this.selector, 0);
-				newKey.attach(incomingConnection);
-			} catch (ClosedChannelException e1) {
-				incomingConnection.reportTransmissionProblem(key, e1);
-			}
-
-			final BufferAvailabilityListener bal = new IncomingConnectionBufferAvailListener(
-				this.pendingReadEventSubscribeRequests, key);
-			if (!e.getBufferProvider().registerBufferAvailabilityListener(bal)) {
-				// In the meantime, a buffer has become available again, subscribe to read event again
-
-				try {
-					final SelectionKey newKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-					newKey.attach(incomingConnection);
-				} catch (ClosedChannelException e1) {
-					incomingConnection.reportTransmissionProblem(key, e1);
-				}
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
deleted file mode 100644
index 739ec3d..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputChannelContext.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-
-public interface InputChannelContext extends ChannelContext, BufferProvider {
-
-	void logQueuedEnvelopes();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
deleted file mode 100644
index 0b05262..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InputGateContext.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-
-public interface InputGateContext extends GateContext {
-
-	InputChannelContext createInputChannelContext(ChannelID channelID, InputChannelContext previousContext);
-	
-	LocalBufferPoolOwner getLocalBufferPoolOwner();
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
deleted file mode 100644
index 87543b8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/InsufficientResourcesException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-/**
- * This exception is thrown by the {@link ByteBufferedChannelManager} to indicate that a task cannot be accepted because
- * there are not enough resources available to safely execute it.
- * 
- */
-public final class InsufficientResourcesException extends Exception {
-
-	/**
-	 * The generated serial version UID.
-	 */
-	private static final long serialVersionUID = -8977049569413215169L;
-
-	/**
-	 * Constructs a new insufficient resources exception.
-	 * 
-	 * @param msg
-	 *        the message describing the exception
-	 */
-	InsufficientResourcesException(final String msg) {
-		super(msg);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
deleted file mode 100644
index 6a5cc47..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/NetworkConnectionManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.configuration.GlobalConfiguration;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * The network connection manager manages incoming and outgoing network connection from and to other hosts.
- * <p>
- * This class is thread-safe.
- * 
- */
-public final class NetworkConnectionManager {
-
-	/**
-	 * The default number of threads dealing with outgoing connections.
-	 */
-	private static final int DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS = 1;
-
-	/**
-	 * The default number of connection retries before giving up.
-	 */
-	private static final int DEFAULT_NUMBER_OF_CONNECTION_RETRIES = 10;
-
-	/**
-	 * List of active threads dealing with outgoing connections.
-	 */
-	private final List<OutgoingConnectionThread> outgoingConnectionThreads = new CopyOnWriteArrayList<OutgoingConnectionThread>();
-
-	/**
-	 * Thread dealing with incoming connections.
-	 */
-	private final IncomingConnectionThread incomingConnectionThread;
-
-	/**
-	 * Map containing currently active outgoing connections.
-	 */
-	private final ConcurrentMap<RemoteReceiver, OutgoingConnection> outgoingConnections = new ConcurrentHashMap<RemoteReceiver, OutgoingConnection>();
-
-	/**
-	 * The number of connection retries before giving up.
-	 */
-	private final int numberOfConnectionRetries;
-
-	/**
-	 * A buffer provider for read buffers
-	 */
-	private final ByteBufferedChannelManager byteBufferedChannelManager;
-
-	public NetworkConnectionManager(final ByteBufferedChannelManager byteBufferedChannelManager,
-			final InetAddress bindAddress, final int dataPort) throws IOException {
-
-		final Configuration configuration = GlobalConfiguration.getConfiguration();
-
-		this.byteBufferedChannelManager = byteBufferedChannelManager;
-
-		// Start the connection threads
-		final int numberOfOutgoingConnectionThreads = configuration.getInteger(
-			"channel.network.numberOfOutgoingConnectionThreads", DEFAULT_NUMBER_OF_OUTGOING_CONNECTION_THREADS);
-
-		for (int i = 0; i < numberOfOutgoingConnectionThreads; i++) {
-			final OutgoingConnectionThread outgoingConnectionThread = new OutgoingConnectionThread();
-			outgoingConnectionThread.start();
-			this.outgoingConnectionThreads.add(outgoingConnectionThread);
-		}
-
-		this.incomingConnectionThread = new IncomingConnectionThread(
-			this.byteBufferedChannelManager, true, new InetSocketAddress(bindAddress, dataPort));
-		this.incomingConnectionThread.start();
-
-		this.numberOfConnectionRetries = configuration.getInteger("channel.network.numberOfConnectionRetries",
-			DEFAULT_NUMBER_OF_CONNECTION_RETRIES);
-	}
-
-	/**
-	 * Randomly selects one of the active threads dealing with outgoing connections.
-	 * 
-	 * @return one of the active threads dealing with outgoing connections
-	 */
-	private OutgoingConnectionThread getOutgoingConnectionThread() {
-
-		return this.outgoingConnectionThreads.get((int) (this.outgoingConnectionThreads.size() * Math.random()));
-	}
-
-	/**
-	 * Queues an envelope for transfer to a particular target host.
-	 * 
-	 * @param remoteReceiver
-	 *        the address of the remote receiver
-	 * @param transferEnvelope
-	 *        the envelope to be transfered
-	 */
-	public void queueEnvelopeForTransfer(final RemoteReceiver remoteReceiver, final TransferEnvelope transferEnvelope) {
-
-		getOutgoingConnection(remoteReceiver).queueEnvelope(transferEnvelope);
-	}
-
-	/**
-	 * Returns (and possibly creates) the outgoing connection for the given target address.
-	 * 
-	 * @param targetAddress
-	 *        the address of the connection target
-	 * @return the outgoing connection object
-	 */
-	private OutgoingConnection getOutgoingConnection(final RemoteReceiver remoteReceiver) {
-
-		OutgoingConnection outgoingConnection = this.outgoingConnections.get(remoteReceiver);
-
-		if (outgoingConnection == null) {
-
-			outgoingConnection = new OutgoingConnection(remoteReceiver, getOutgoingConnectionThread(),
-				this.numberOfConnectionRetries);
-
-			final OutgoingConnection oldEntry = this.outgoingConnections
-				.putIfAbsent(remoteReceiver, outgoingConnection);
-
-			// We had a race, use the old value
-			if (oldEntry != null) {
-				outgoingConnection = oldEntry;
-			}
-		}
-
-		return outgoingConnection;
-	}
-
-	public void shutDown() {
-
-		// Interrupt the threads we started
-		this.incomingConnectionThread.interrupt();
-
-		final Iterator<OutgoingConnectionThread> it = this.outgoingConnectionThreads.iterator();
-		while (it.hasNext()) {
-			it.next().interrupt();
-		}
-	}
-
-	public void logBufferUtilization() {
-
-		System.out.println("\tOutgoing connections:");
-
-		final Iterator<Map.Entry<RemoteReceiver, OutgoingConnection>> it = this.outgoingConnections.entrySet()
-			.iterator();
-
-		while (it.hasNext()) {
-
-			final Map.Entry<RemoteReceiver, OutgoingConnection> entry = it.next();
-			System.out.println("\t\tOC " + entry.getKey() + ": " + entry.getValue().getNumberOfQueuedWriteBuffers());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
deleted file mode 100644
index 94463b6..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnection.java
+++ /dev/null
@@ -1,531 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.SocketChannel;
-import java.nio.channels.WritableByteChannel;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.DefaultSerializer;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * This class represents an outgoing TCP connection through which {@link TransferEnvelope} objects can be sent.
- * {@link TransferEnvelope} objects are received from the {@link ByteBufferedChannelManager} and added to a queue. An
- * additional network thread then takes the envelopes from the queue and transmits them to the respective destination
- * host.
- * 
- */
-public class OutgoingConnection {
-
-	/**
-	 * The log object used to report debug information and possible errors.
-	 */
-	private static final Log LOG = LogFactory.getLog(OutgoingConnection.class);
-
-	/**
-	 * The address this outgoing connection is connected to.
-	 */
-	private final RemoteReceiver remoteReceiver;
-
-	/**
-	 * The outgoing connection thread which actually transmits the queued transfer envelopes.
-	 */
-	private final OutgoingConnectionThread connectionThread;
-
-	/**
-	 * The queue of transfer envelopes to be transmitted.
-	 */
-	private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque<TransferEnvelope>();
-
-	/**
-	 * The {@link DefaultSerializer} object used to transform the envelopes into a byte stream.
-	 */
-	private final DefaultSerializer serializer = new DefaultSerializer();
-
-	/**
-	 * The {@link TransferEnvelope} that is currently processed.
-	 */
-	private TransferEnvelope currentEnvelope = null;
-
-	/**
-	 * Stores whether the underlying TCP connection is established. As this variable is accessed by the byte buffered
-	 * channel manager and the outgoing connection thread, it must be protected by a monitor.
-	 */
-	private boolean isConnected = false;
-
-	/**
-	 * Stores whether is underlying TCP connection is subscribed to the NIO write event. As this variable is accessed by
-	 * the byte buffered channel and the outgoing connection thread, it must be protected by a monitor.
-	 */
-	private boolean isSubscribedToWriteEvent = false;
-
-	/**
-	 * The overall number of connection retries which shall be performed before a connection error is reported.
-	 */
-	private final int numberOfConnectionRetries;
-
-	/**
-	 * The number of connection retries left before an I/O error is reported.
-	 */
-	private int retriesLeft = 0;
-
-	/**
-	 * The timestamp of the last connection retry.
-	 */
-	private long timstampOfLastRetry = 0;
-
-	/**
-	 * The current selection key representing the interest set of the underlying TCP NIO connection. This variable may
-	 * only be accessed the the outgoing connection thread.
-	 */
-	private SelectionKey selectionKey = null;
-
-	/**
-	 * The period of time in milliseconds that shall be waited before a connection attempt is considered to be failed.
-	 */
-	private static long RETRYINTERVAL = 1000L; // 1 second
-
-	/**
-	 * Constructs a new outgoing connection object.
-	 * 
-	 * @param remoteReceiver
-	 *        the address of the destination host this outgoing connection object is supposed to connect to
-	 * @param connectionThread
-	 *        the connection thread which actually handles the network transfer
-	 * @param numberOfConnectionRetries
-	 *        the number of connection retries allowed before an I/O error is reported
-	 */
-	public OutgoingConnection(RemoteReceiver remoteReceiver, OutgoingConnectionThread connectionThread,
-			int numberOfConnectionRetries) {
-
-		this.remoteReceiver = remoteReceiver;
-		this.connectionThread = connectionThread;
-		this.numberOfConnectionRetries = numberOfConnectionRetries;
-	}
-
-	/**
-	 * Adds a new {@link TransferEnvelope} to the queue of envelopes to be transmitted to the destination host of this
-	 * connection.
-	 * <p>
-	 * This method should only be called by the {@link ByteBufferedChannelManager} object.
-	 * 
-	 * @param transferEnvelope
-	 *        the envelope to be added to the transfer queue
-	 */
-	public void queueEnvelope(TransferEnvelope transferEnvelope) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			checkConnection();
-			this.queuedEnvelopes.add(transferEnvelope);
-		}
-	}
-
-	private void checkConnection() {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (!this.isConnected) {
-
-				this.retriesLeft = this.numberOfConnectionRetries;
-				this.timstampOfLastRetry = System.currentTimeMillis();
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-			} else {
-
-				if (!this.isSubscribedToWriteEvent) {
-					this.connectionThread.subscribeToWriteEvent(this.selectionKey);
-					this.isSubscribedToWriteEvent = true;
-				}
-			}
-
-		}
-	}
-
-	/**
-	 * Returns the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
-	 * connected to.
-	 * <p>
-	 * This method should be called by the {@link OutgoingConnectionThread} object only.
-	 * 
-	 * @return the {@link InetSocketAddress} to the destination host this outgoing connection is supposed to be
-	 *         connected to
-	 */
-	public InetSocketAddress getConnectionAddress() {
-
-		return this.remoteReceiver.getConnectionAddress();
-	}
-
-	/**
-	 * Reports a problem which occurred while establishing the underlying TCP connection to this outgoing connection
-	 * object. Depending on the number of connection retries left, this method will either try to reestablish the TCP
-	 * connection or report an I/O error to all tasks which have queued envelopes for this connection. In the latter
-	 * case all queued envelopes will be dropped and all included buffers will be freed.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @param ioe
-	 *        thrown if an error occurs while reseting the underlying TCP connection
-	 */
-	public void reportConnectionProblem(IOException ioe) {
-
-		// First, write exception to log
-		final long currentTime = System.currentTimeMillis();
-		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
-			LOG.error("Cannot connect to " + this.remoteReceiver + ", " + this.retriesLeft + " retries left");
-		}
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.selectionKey != null) {
-
-				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-				if (socketChannel != null) {
-					try {
-						socketChannel.close();
-					} catch (IOException e) {
-						LOG.debug("Error while trying to close the socket channel to " + this.remoteReceiver);
-					}
-				}
-
-				this.selectionKey.cancel();
-				this.selectionKey = null;
-				this.isConnected = false;
-				this.isSubscribedToWriteEvent = false;
-			}
-
-			if (hasRetriesLeft(currentTime)) {
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-				return;
-			}
-
-			// Error is fatal
-			LOG.error(ioe);
-
-			// Notify source of current envelope and release buffer
-			if (this.currentEnvelope != null) {
-				if (this.currentEnvelope.getBuffer() != null) {
-					this.currentEnvelope.getBuffer().recycleBuffer();
-					this.currentEnvelope = null;
-				}
-			}
-
-			// Notify all other tasks which are waiting for data to be transmitted
-			final Iterator<TransferEnvelope> iter = this.queuedEnvelopes.iterator();
-			while (iter.hasNext()) {
-				final TransferEnvelope envelope = iter.next();
-				iter.remove();
-				// Recycle the buffer inside the envelope
-				if (envelope.getBuffer() != null) {
-					envelope.getBuffer().recycleBuffer();
-				}
-			}
-
-			this.queuedEnvelopes.clear();
-		}
-	}
-
-	/**
-	 * Reports an I/O error which occurred while writing data to the TCP connection. As a result of the I/O error the
-	 * connection is closed and the interest keys are canceled. Moreover, the task which queued the currently
-	 * transmitted transfer envelope is notified about the error and the current envelope is dropped. If the current
-	 * envelope contains a buffer, the buffer is freed.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @param ioe
-	 *        thrown if an error occurs while reseting the connection
-	 */
-	public void reportTransmissionProblem(IOException ioe) {
-
-		final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-
-		// First, write exception to log
-		if (this.currentEnvelope != null) {
-			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
-				+ socketChannel.socket().getRemoteSocketAddress()
-				+ " experienced an IOException for transfer envelope " + this.currentEnvelope.getSequenceNumber());
-		} else {
-			LOG.error("The connection between " + socketChannel.socket().getLocalAddress() + " and "
-				+ socketChannel.socket().getRemoteSocketAddress() + " experienced an IOException");
-		}
-
-		// Close the connection and cancel the interest key
-		synchronized (this.queuedEnvelopes) {
-			try {
-				LOG.debug("Closing connection to " + socketChannel.socket().getRemoteSocketAddress());
-				socketChannel.close();
-			} catch (IOException e) {
-				LOG.debug("An error occurred while responding to an IOException");
-				LOG.debug(e);
-			}
-
-			this.selectionKey.cancel();
-
-			// Error is fatal
-			LOG.error(ioe);
-
-			// Trigger new connection if there are more envelopes to be transmitted
-			if (this.queuedEnvelopes.isEmpty()) {
-				this.isConnected = false;
-				this.isSubscribedToWriteEvent = false;
-			} else {
-				this.connectionThread.triggerConnect(this);
-				this.isConnected = true;
-				this.isSubscribedToWriteEvent = true;
-			}
-
-			// We must assume the current envelope is corrupted so we notify the task which created it.
-			if (this.currentEnvelope != null) {
-				if (this.currentEnvelope.getBuffer() != null) {
-					this.currentEnvelope.getBuffer().recycleBuffer();
-					this.currentEnvelope = null;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Checks whether further retries are left for establishing the underlying TCP connection.
-	 * 
-	 * @param currentTime
-	 *        the current system time in milliseconds since January 1st, 1970
-	 * @return <code>true</code> if there are retries left, <code>false</code> otherwise
-	 */
-	private boolean hasRetriesLeft(long currentTime) {
-
-		if (currentTime - this.timstampOfLastRetry >= RETRYINTERVAL) {
-			this.retriesLeft--;
-			this.timstampOfLastRetry = currentTime;
-			if (this.retriesLeft == 0) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Writes the content of the current {@link TransferEnvelope} object to the underlying TCP connection.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @return <code>true</code> if there is more data from this/other queued envelopes to be written to this channel
-	 * @throws IOException
-	 *         thrown if an error occurs while writing the data to the channel
-	 */
-	public boolean write() throws IOException {
-
-		final WritableByteChannel writableByteChannel = (WritableByteChannel) this.selectionKey.channel();
-
-		if (this.currentEnvelope == null) {
-			synchronized (this.queuedEnvelopes) {
-				if (this.queuedEnvelopes.isEmpty()) {
-					return false;
-				} else {
-					this.currentEnvelope = this.queuedEnvelopes.peek();
-					this.serializer.setTransferEnvelope(this.currentEnvelope);
-				}
-			}
-		}
-
-		if (!this.serializer.write(writableByteChannel)) {
-
-			// Make sure we recycle the attached memory or file buffers correctly
-			if (this.currentEnvelope.getBuffer() != null) {
-				this.currentEnvelope.getBuffer().recycleBuffer();
-			}
-
-			synchronized (this.queuedEnvelopes) {
-				this.queuedEnvelopes.poll();
-				this.currentEnvelope = null;
-			}
-		}
-
-		return true;
-	}
-
-	/**
-	 * Requests to close the underlying TCP connection. The request is ignored if at least one {@link TransferEnvelope}
-	 * is queued.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @throws IOException
-	 *         thrown if an error occurs while closing the TCP connection
-	 */
-	public void requestClose() throws IOException {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.queuedEnvelopes.isEmpty()) {
-
-				if (this.isSubscribedToWriteEvent) {
-
-					this.connectionThread.unsubscribeFromWriteEvent(this.selectionKey);
-					this.isSubscribedToWriteEvent = false;
-				}
-			}
-		}
-	}
-
-	/**
-	 * Closes the underlying TCP connection if no more {@link TransferEnvelope} objects are in the transmission queue.
-	 * <p>
-	 * This method should only be called by the {@link OutgoingConnectionThread} object.
-	 * 
-	 * @throws IOException
-	 */
-	public void closeConnection() throws IOException {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (!this.queuedEnvelopes.isEmpty()) {
-				return;
-			}
-
-			if (this.selectionKey != null) {
-
-				final SocketChannel socketChannel = (SocketChannel) this.selectionKey.channel();
-				socketChannel.close();
-				this.selectionKey.cancel();
-				this.selectionKey = null;
-			}
-
-			this.isConnected = false;
-			this.isSubscribedToWriteEvent = false;
-		}
-	}
-
-	/**
-	 * Returns the number of queued {@link TransferEnvelope} objects with the given source channel ID.
-	 * 
-	 * @param sourceChannelID
-	 *        the source channel ID to count the queued envelopes for
-	 * @return the number of queued transfer envelopes with the given source channel ID
-	 */
-	public int getNumberOfQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			int number = 0;
-
-			final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-				final TransferEnvelope te = it.next();
-				if (sourceChannelID.equals(te.getSource())) {
-					number++;
-				}
-			}
-
-			return number;
-		}
-	}
-
-	/**
-	 * Removes all queued {@link TransferEnvelope} objects from the transmission which match the given source channel
-	 * ID.
-	 * 
-	 * @param sourceChannelID
-	 *        the source channel ID of the transfered transfer envelopes to be dropped
-	 */
-	public void dropAllQueuedEnvelopesFromChannel(final ChannelID sourceChannelID) {
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-				final TransferEnvelope te = it.next();
-				if (sourceChannelID.equals(te.getSource())) {
-					it.remove();
-					if (te.getBuffer() != null) {
-						te.getBuffer().recycleBuffer();
-					}
-				}
-			}
-		}
-	}
-
-	/**
-	 * Checks whether this outgoing connection object manages an active connection or can be removed by the
-	 * {@link ByteBufferedChannelManager} object.
-	 * <p>
-	 * This method should only be called by the byte buffered channel manager.
-	 * 
-	 * @return <code>true</code> if this object is no longer manages an active connection and can be removed,
-	 *         <code>false</code> otherwise.
-	 */
-	public boolean canBeRemoved() {
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.isConnected) {
-				return false;
-			}
-
-			if (this.currentEnvelope != null) {
-				return false;
-			}
-
-			return this.queuedEnvelopes.isEmpty();
-		}
-	}
-
-	/**
-	 * Sets the selection key representing the interest set of the underlying TCP NIO connection.
-	 * 
-	 * @param selectionKey
-	 *        the selection of the underlying TCP connection
-	 */
-	public void setSelectionKey(SelectionKey selectionKey) {
-		this.selectionKey = selectionKey;
-	}
-
-	/**
-	 * Returns the number of currently queued envelopes which contain a write buffer.
-	 * 
-	 * @return the number of currently queued envelopes which contain a write buffer
-	 */
-	public int getNumberOfQueuedWriteBuffers() {
-
-		int retVal = 0;
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-
-				final TransferEnvelope envelope = it.next();
-				if (envelope.getBuffer() != null) {
-					++retVal;
-				}
-			}
-		}
-
-		return retVal;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
deleted file mode 100644
index ef03a9c..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutgoingConnectionThread.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.SelectionKey;
-import java.nio.channels.Selector;
-import java.nio.channels.SocketChannel;
-import java.util.ArrayDeque;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.util.StringUtils;
-
-public class OutgoingConnectionThread extends Thread {
-
-	/**
-	 * The minimum time a TCP connection must be idle it is closed.
-	 */
-	private static final long MIN_IDLE_TIME_BEFORE_CLOSE = 80000L; // 80 seconds
-
-	private static final Log LOG = LogFactory.getLog(OutgoingConnectionThread.class);
-
-	private final Selector selector;
-
-	private final Queue<OutgoingConnection> pendingConnectionRequests = new ArrayDeque<OutgoingConnection>();
-
-	private final Queue<SelectionKey> pendingWriteEventSubscribeRequests = new ArrayDeque<SelectionKey>();
-
-	private final Map<OutgoingConnection, Long> connectionsToClose = new HashMap<OutgoingConnection, Long>();
-
-	public OutgoingConnectionThread() throws IOException {
-		super("Outgoing Connection Thread");
-
-		this.selector = Selector.open();
-	}
-
-
-	@Override
-	public void run() {
-
-		while (!isInterrupted()) {
-
-			synchronized (this.pendingConnectionRequests) {
-
-				if (!this.pendingConnectionRequests.isEmpty()) {
-
-					final OutgoingConnection outgoingConnection = this.pendingConnectionRequests.poll();
-					try {
-						final SocketChannel socketChannel = SocketChannel.open();
-						socketChannel.configureBlocking(false);
-						final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT);
-						socketChannel.connect(outgoingConnection.getConnectionAddress());
-						key.attach(outgoingConnection);
-					} catch (final IOException ioe) {
-						// IOException is reported by separate thread to avoid deadlocks
-						final Runnable reporterThread = new Runnable() {
-
-							@Override
-							public void run() {
-								outgoingConnection.reportConnectionProblem(ioe);
-							}
-						};
-						new Thread(reporterThread).start();
-					}
-				}
-			}
-
-			synchronized (this.pendingWriteEventSubscribeRequests) {
-
-				if (!this.pendingWriteEventSubscribeRequests.isEmpty()) {
-					final SelectionKey oldSelectionKey = this.pendingWriteEventSubscribeRequests.poll();
-					final OutgoingConnection outgoingConnection = (OutgoingConnection) oldSelectionKey.attachment();
-					final SocketChannel socketChannel = (SocketChannel) oldSelectionKey.channel();
-
-					try {
-						final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ
-							| SelectionKey.OP_WRITE);
-						newSelectionKey.attach(outgoingConnection);
-						outgoingConnection.setSelectionKey(newSelectionKey);
-					} catch (final IOException ioe) {
-						// IOException is reported by separate thread to avoid deadlocks
-						final Runnable reporterThread = new Runnable() {
-
-							@Override
-							public void run() {
-								outgoingConnection.reportTransmissionProblem(ioe);
-							}
-						};
-						new Thread(reporterThread).start();
-					}
-				}
-			}
-
-			synchronized (this.connectionsToClose) {
-
-				final Iterator<Map.Entry<OutgoingConnection, Long>> closeIt = this.connectionsToClose.entrySet()
-					.iterator();
-				final long now = System.currentTimeMillis();
-				while (closeIt.hasNext()) {
-
-					final Map.Entry<OutgoingConnection, Long> entry = closeIt.next();
-					if ((entry.getValue().longValue() + MIN_IDLE_TIME_BEFORE_CLOSE) < now) {
-						final OutgoingConnection outgoingConnection = entry.getKey();
-						closeIt.remove();
-						// Create new thread to close connection to avoid deadlocks
-						final Runnable closeThread = new Runnable() {
-
-							@Override
-							public void run() {
-								try {
-									outgoingConnection.closeConnection();
-								} catch (IOException ioe) {
-									outgoingConnection.reportTransmissionProblem(ioe);
-								}
-							}
-						};
-
-						new Thread(closeThread).start();
-					}
-
-				}
-			}
-
-			try {
-				this.selector.select(10);
-			} catch (IOException e) {
-				LOG.error(e);
-			}
-
-			final Iterator<SelectionKey> iter = this.selector.selectedKeys().iterator();
-
-			while (iter.hasNext()) {
-				final SelectionKey key = iter.next();
-
-				iter.remove();
-				if (key.isValid()) {
-					if (key.isConnectable()) {
-						doConnect(key);
-					} else {
-						if (key.isReadable()) {
-							doRead(key);
-							// A read will always result in an exception, so the write key will not be valid anymore
-							continue;
-						}
-						if (key.isWritable()) {
-							doWrite(key);
-						}
-					}
-				} else {
-					LOG.error("Received invalid key: " + key);
-				}
-			}
-		}
-
-		// Finally, try to close the selector
-		try {
-			this.selector.close();
-		} catch (IOException ioe) {
-			LOG.debug(StringUtils.stringifyException(ioe));
-		}
-	}
-
-	private void doConnect(SelectionKey key) {
-
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-		final SocketChannel socketChannel = (SocketChannel) key.channel();
-		try {
-			while (!socketChannel.finishConnect()) {
-				try {
-					Thread.sleep(100);
-				} catch (InterruptedException e1) {
-					LOG.error(e1);
-				}
-			}
-
-			final SelectionKey channelKey = socketChannel.register(selector, SelectionKey.OP_WRITE
-				| SelectionKey.OP_READ);
-			outgoingConnection.setSelectionKey(channelKey);
-			channelKey.attach(outgoingConnection);
-
-		} catch (IOException ioe) {
-			outgoingConnection.reportConnectionProblem(ioe);
-		}
-	}
-
-	private void doWrite(SelectionKey key) {
-
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-
-		try {
-
-			if (!outgoingConnection.write()) {
-				// Try to close the connection
-				outgoingConnection.requestClose();
-			}
-
-		} catch (IOException ioe) {
-			outgoingConnection.reportTransmissionProblem(ioe);
-		}
-	}
-
-	private void doRead(SelectionKey key) {
-
-		final SocketChannel socketChannel = (SocketChannel) key.channel();
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) key.attachment();
-		final ByteBuffer buf = ByteBuffer.allocate(8);
-
-		try {
-
-			if (socketChannel.read(buf) == -1) {
-				outgoingConnection.reportTransmissionProblem(new IOException(
-					"Read unexpected EOF from channel"));
-			} else {
-				LOG.error("Outgoing connection read real data from channel");
-			}
-		} catch (IOException ioe) {
-			outgoingConnection.reportTransmissionProblem(ioe);
-		}
-	}
-
-	public void triggerConnect(OutgoingConnection outgoingConnection) {
-
-		synchronized (this.pendingConnectionRequests) {
-			this.pendingConnectionRequests.add(outgoingConnection);
-		}
-	}
-
-	public void unsubscribeFromWriteEvent(SelectionKey selectionKey) throws IOException {
-
-		final SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
-		final OutgoingConnection outgoingConnection = (OutgoingConnection) selectionKey.attachment();
-
-		final SelectionKey newSelectionKey = socketChannel.register(this.selector, SelectionKey.OP_READ);
-		newSelectionKey.attach(outgoingConnection);
-		outgoingConnection.setSelectionKey(newSelectionKey);
-
-		synchronized (this.connectionsToClose) {
-			this.connectionsToClose.put(outgoingConnection, Long.valueOf(System.currentTimeMillis()));
-		}
-	}
-
-	public void subscribeToWriteEvent(SelectionKey selectionKey) {
-
-		synchronized (this.pendingWriteEventSubscribeRequests) {
-			this.pendingWriteEventSubscribeRequests.add(selectionKey);
-		}
-		synchronized (this.connectionsToClose) {
-			this.connectionsToClose.remove((OutgoingConnection) selectionKey.attachment());
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
deleted file mode 100644
index 7d8a571..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelContext.java
+++ /dev/null
@@ -1,17 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-public interface OutputChannelContext extends ChannelContext {
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
deleted file mode 100644
index d9a16b7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputChannelForwardingChain.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.IOException;
-import java.util.Queue;
-import java.util.concurrent.LinkedBlockingDeque;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class OutputChannelForwardingChain {
-
-	private final Queue<AbstractEvent> incomingEventQueue = new LinkedBlockingDeque<AbstractEvent>();
-
-	private final AbstractOutputChannelForwarder first;
-
-	private final AbstractOutputChannelForwarder last;
-
-	public OutputChannelForwardingChain(final AbstractOutputChannelForwarder first,
-			final AbstractOutputChannelForwarder last) {
-
-		if (first == null) {
-			throw new IllegalArgumentException("Argument first must not be null");
-		}
-
-		if (last == null) {
-			throw new IllegalArgumentException("Argument last must not be null");
-		}
-
-		this.first = first;
-		this.last = last;
-	}
-
-	public void pushEnvelope(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
-		this.first.push(transferEnvelope);
-	}
-
-	public TransferEnvelope pullEnvelope() {
-
-		return this.last.pull();
-	}
-
-	public void processEvent(final AbstractEvent event) {
-
-		this.first.processEvent(event);
-	}
-
-	public boolean anyForwarderHasDataLeft() throws IOException, InterruptedException {
-
-		return this.first.hasDataLeft();
-	}
-
-	public void destroy() {
-
-		this.first.destroy();
-	}
-
-	public void processQueuedEvents() {
-
-		AbstractEvent event = this.incomingEventQueue.poll();
-		while (event != null) {
-
-			this.first.processEvent(event);
-			event = this.incomingEventQueue.poll();
-		}
-	}
-
-	void offerEvent(final AbstractEvent event) {
-		this.incomingEventQueue.offer(event);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
deleted file mode 100644
index fb2022a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/OutputGateContext.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-
-public interface OutputGateContext extends GateContext {
-
-	OutputChannelContext createOutputChannelContext(ChannelID channelID, OutputChannelContext previousContext,
-			boolean isReceiverRunning, boolean mergeSpillBuffers);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
deleted file mode 100644
index 0b8f351..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/ReceiverNotFoundEvent.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-/**
- * An unknown receiver event can be used by the framework to inform a sender task that the delivery of a
- * {@link TransferEnvelope} has failed since the receiver could not be found.
- * 
- */
-public final class ReceiverNotFoundEvent extends AbstractEvent {
-
-	/**
-	 * The sequence number that will be set for transfer envelopes which contain the receiver not found event.
-	 */
-	private static final int RECEIVER_NOT_FOUND_SEQUENCE_NUMBER = 0;
-
-	/**
-	 * The ID of the receiver which could not be found
-	 */
-	private ChannelID receiverID;
-
-	/**
-	 * The sequence number of the envelope this event refers to
-	 */
-	private int sequenceNumber;
-
-	/**
-	 * Constructs a new unknown receiver event.
-	 * 
-	 * @param receiverID
-	 *        the ID of the receiver which could not be found
-	 * @param sequenceNumber
-	 *        the sequence number of the envelope this event refers to
-	 */
-	public ReceiverNotFoundEvent(final ChannelID receiverID, final int sequenceNumber) {
-
-		if (receiverID == null) {
-			throw new IllegalArgumentException("Argument unknownReceiverID must not be null");
-		}
-
-		if (sequenceNumber < 0) {
-			throw new IllegalArgumentException("Argument sequenceNumber must be non-negative");
-		}
-
-		this.receiverID = receiverID;
-		this.sequenceNumber = sequenceNumber;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public ReceiverNotFoundEvent() {
-
-		this.receiverID = new ChannelID();
-	}
-
-	/**
-	 * Returns the ID of the receiver which could not be found.
-	 * 
-	 * @return the ID of the receiver which could not be found
-	 */
-	public ChannelID getReceiverID() {
-
-		return this.receiverID;
-	}
-
-	/**
-	 * Returns the sequence number of the envelope this event refers to.
-	 * 
-	 * @return the sequence number of the envelope this event refers to
-	 */
-	public int getSequenceNumber() {
-
-		return this.sequenceNumber;
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		this.receiverID.write(out);
-		out.writeInt(this.sequenceNumber);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		this.receiverID.read(in);
-		this.sequenceNumber = in.readInt();
-	}
-
-	/**
-	 * Creates a transfer envelope which only contains a ReceiverNotFoundEvent.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the event relates to.
-	 * @param receiver
-	 *        the channel ID of the receiver that could not be found
-	 * @param sequenceNumber
-	 *        the sequence number of the transfer envelope which caused the creation of this event
-	 * @return a transfer envelope which only contains a ReceiverNotFoundEvent
-	 */
-	public static TransferEnvelope createEnvelopeWithEvent(final JobID jobID, final ChannelID receiver,
-			final int sequenceNumber) {
-
-		final TransferEnvelope transferEnvelope = new TransferEnvelope(RECEIVER_NOT_FOUND_SEQUENCE_NUMBER, jobID,
-			receiver);
-
-		final ReceiverNotFoundEvent unknownReceiverEvent = new ReceiverNotFoundEvent(receiver, sequenceNumber);
-		transferEnvelope.addEvent(unknownReceiverEvent);
-
-		return transferEnvelope;
-	}
-
-	/**
-	 * Checks if the given envelope only contains a ReceiverNotFoundEvent.
-	 * 
-	 * @param transferEnvelope
-	 *        the envelope to be checked
-	 * @return <code>true</code> if the envelope only contains a ReceiverNotFoundEvent, <code>false</code> otherwise
-	 */
-	public static boolean isReceiverNotFoundEvent(final TransferEnvelope transferEnvelope) {
-
-		if (transferEnvelope.getSequenceNumber() != RECEIVER_NOT_FOUND_SEQUENCE_NUMBER) {
-			return false;
-		}
-
-		if (transferEnvelope.getBuffer() != null) {
-			return false;
-		}
-
-		final EventList eventList = transferEnvelope.getEventList();
-		if (eventList == null) {
-			return false;
-		}
-
-		if (eventList.size() != 1) {
-			return false;
-		}
-
-		if (!(eventList.get(0) instanceof ReceiverNotFoundEvent)) {
-			return false;
-		}
-
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
deleted file mode 100644
index db4e412..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/RemoteReceiver.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.net.UnknownHostException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.util.StringUtils;
-
-/**
- * Objects of this class uniquely identify a connection to a remote {@link TaskManager}.
- * 
- */
-public final class RemoteReceiver implements IOReadableWritable {
-
-	/**
-	 * The address of the connection to the remote {@link TaskManager}.
-	 */
-	private InetSocketAddress connectionAddress;
-
-	/**
-	 * The index of the connection to the remote {@link TaskManager}.
-	 */
-	private int connectionIndex;
-
-	/**
-	 * Constructs a new remote receiver object.
-	 * 
-	 * @param connectionAddress
-	 *        the address of the connection to the remote {@link TaskManager}
-	 * @param connectionIndex
-	 *        the index of the connection to the remote {@link TaskManager}
-	 */
-	public RemoteReceiver(final InetSocketAddress connectionAddress, final int connectionIndex) {
-
-		if (connectionAddress == null) {
-			throw new IllegalArgumentException("Argument connectionAddress must not be null");
-		}
-
-		if (connectionIndex < 0) {
-			throw new IllegalArgumentException("Argument connectionIndex must be a non-negative integer number");
-		}
-
-		this.connectionAddress = connectionAddress;
-		this.connectionIndex = connectionIndex;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public RemoteReceiver() {
-		this.connectionAddress = null;
-		this.connectionIndex = -1;
-	}
-
-	/**
-	 * Returns the address of the connection to the remote {@link TaskManager}.
-	 * 
-	 * @return the address of the connection to the remote {@link TaskManager}
-	 */
-	public InetSocketAddress getConnectionAddress() {
-
-		return this.connectionAddress;
-	}
-
-	/**
-	 * Returns the index of the connection to the remote {@link TaskManager}.
-	 * 
-	 * @return the index of the connection to the remote {@link TaskManager}
-	 */
-	public int getConnectionIndex() {
-
-		return this.connectionIndex;
-	}
-
-
-	@Override
-	public int hashCode() {
-
-		return this.connectionAddress.hashCode() + (31 * this.connectionIndex);
-	}
-
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof RemoteReceiver)) {
-			return false;
-		}
-
-		final RemoteReceiver rr = (RemoteReceiver) obj;
-		if (!this.connectionAddress.equals(rr.connectionAddress)) {
-			return false;
-		}
-
-		if (this.connectionIndex != rr.connectionIndex) {
-			return false;
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		final InetAddress ia = this.connectionAddress.getAddress();
-		out.writeInt(ia.getAddress().length);
-		out.write(ia.getAddress());
-		out.writeInt(this.connectionAddress.getPort());
-
-		out.writeInt(this.connectionIndex);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		final int addr_length = in.readInt();
-		final byte[] address = new byte[addr_length];
-		in.readFully(address);
-
-		InetAddress ia = null;
-		try {
-			ia = InetAddress.getByAddress(address);
-		} catch (UnknownHostException uhe) {
-			throw new IOException(StringUtils.stringifyException(uhe));
-		}
-		final int port = in.readInt();
-		this.connectionAddress = new InetSocketAddress(ia, port);
-
-		this.connectionIndex = in.readInt();
-	}
-
-
-	@Override
-	public String toString() {
-
-		return this.connectionAddress + " (" + this.connectionIndex + ")";
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
deleted file mode 100644
index a6aebb1..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/SenderHintEvent.java
+++ /dev/null
@@ -1,119 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class SenderHintEvent extends AbstractEvent {
-
-	/**
-	 * The sequence number that will be set for transfer envelopes which contain the sender hint event.
-	 */
-	private static final int SENDER_HINT_SEQUENCE_NUMBER = 0;
-
-	private final ChannelID source;
-
-	private final RemoteReceiver remoteReceiver;
-
-	SenderHintEvent(final ChannelID source, final RemoteReceiver remoteReceiver) {
-
-		if (source == null) {
-			throw new IllegalArgumentException("Argument source must not be null");
-		}
-
-		if (remoteReceiver == null) {
-			throw new IllegalArgumentException("Argument remoteReceiver must not be null");
-		}
-
-		this.source = source;
-		this.remoteReceiver = remoteReceiver;
-	}
-
-	public SenderHintEvent() {
-
-		this.source = new ChannelID();
-		this.remoteReceiver = new RemoteReceiver();
-	}
-
-	public ChannelID getSource() {
-
-		return this.source;
-	}
-
-	public RemoteReceiver getRemoteReceiver() {
-
-		return this.remoteReceiver;
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		this.source.write(out);
-		this.remoteReceiver.write(out);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		this.source.read(in);
-		this.remoteReceiver.read(in);
-	}
-
-	public static TransferEnvelope createEnvelopeWithEvent(final TransferEnvelope originalEnvelope,
-			final ChannelID source, final RemoteReceiver remoteReceiver) {
-
-		final TransferEnvelope transferEnvelope = new TransferEnvelope(SENDER_HINT_SEQUENCE_NUMBER,
-			originalEnvelope.getJobID(), originalEnvelope.getSource());
-
-		final SenderHintEvent senderEvent = new SenderHintEvent(source, remoteReceiver);
-		transferEnvelope.addEvent(senderEvent);
-
-		return transferEnvelope;
-	}
-
-	static boolean isSenderHintEvent(final TransferEnvelope transferEnvelope) {
-
-		if (transferEnvelope.getSequenceNumber() != SENDER_HINT_SEQUENCE_NUMBER) {
-			return false;
-		}
-
-		if (transferEnvelope.getBuffer() != null) {
-			return false;
-		}
-
-		final EventList eventList = transferEnvelope.getEventList();
-		if (eventList == null) {
-			return false;
-		}
-
-		if (eventList.size() != 1) {
-			return false;
-		}
-
-		if (!(eventList.get(0) instanceof SenderHintEvent)) {
-			return false;
-		}
-
-		return true;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
deleted file mode 100644
index 6c41a4f..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/TaskContext.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-
-public interface TaskContext extends LocalBufferPoolOwner {
-
-	OutputGateContext createOutputGateContext(GateID gateID);
-
-	InputGateContext createInputGateContext(GateID gateID);
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
deleted file mode 100644
index b7d59c6..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/bytebuffered/UnexpectedEnvelopeEvent.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.bytebuffered;
-
-import java.io.DataInput;
-import java.io.DataOutput;
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-
-/**
- * This event is sent by an {@link InputChannelContext}. It indicates that the input channel context has received a
- * {@link TransferEnvelope} with a lower sequence number than expected. The typical reason for this is that data is
- * being replayed from a checkpoint. With the help of this event it is possible to request the sender to skip sending
- * transfer envelopes up to the given expected sequence number.
- * 
- */
-public final class UnexpectedEnvelopeEvent extends AbstractEvent {
-
-	/**
-	 * The expected sequence number.
-	 */
-	private int expectedSequenceNumber;
-
-	/**
-	 * Constructs a new unexpected envelope event.
-	 * 
-	 * @param expectedSequenceNumber
-	 *        the expected sequence number
-	 */
-	public UnexpectedEnvelopeEvent(final int expectedSequenceNumber) {
-
-		if (expectedSequenceNumber < 0) {
-			throw new IllegalArgumentException("Argument expectedSequenceNumber must be non-negative.");
-		}
-
-		this.expectedSequenceNumber = expectedSequenceNumber;
-	}
-
-	/**
-	 * Default constructor for serialization/deserialization.
-	 */
-	public UnexpectedEnvelopeEvent() {
-	}
-
-	/**
-	 * Returns the expected sequence number.
-	 * 
-	 * @return the expected sequence number
-	 */
-	public int getExpectedSequenceNumber() {
-
-		return this.expectedSequenceNumber;
-	}
-
-
-	@Override
-	public void write(final DataOutput out) throws IOException {
-
-		out.writeInt(this.expectedSequenceNumber);
-	}
-
-
-	@Override
-	public void read(final DataInput in) throws IOException {
-
-		this.expectedSequenceNumber = in.readInt();
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
deleted file mode 100644
index ed845e8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ExecutorThreadFactory.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicInteger;
-
-public class ExecutorThreadFactory implements ThreadFactory {
-	
-	public static final ExecutorThreadFactory INSTANCE = new ExecutorThreadFactory();
-
-	private static final String THREAD_NAME = "Nephele Executor Thread ";
-	
-	private final AtomicInteger threadNumber = new AtomicInteger(1);
-	
-	
-	private ExecutorThreadFactory() {}
-	
-	
-	public Thread newThread(Runnable target) {
-		Thread t = new Thread(target, THREAD_NAME + threadNumber.getAndIncrement());
-		t.setDaemon(true);
-		return t;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
deleted file mode 100644
index 4a104d7..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/ForwardingBarrier.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.UnexpectedEnvelopeEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-public final class ForwardingBarrier extends AbstractOutputChannelForwarder {
-
-	private static final Log LOG = LogFactory.getLog(ForwardingBarrier.class);
-
-	private final ChannelID outputChannelID;
-
-	private int forwardingBarrier = -1;
-
-	public ForwardingBarrier(final ChannelID outputChannelID, final AbstractOutputChannelForwarder next) {
-		super(next);
-
-		if (next == null) {
-			throw new IllegalArgumentException("Argument next must not be null");
-		}
-
-		this.outputChannelID = outputChannelID;
-	}
-
-
-	@Override
-	public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
-		if (transferEnvelope.getSequenceNumber() < this.forwardingBarrier) {
-			recycleTransferEnvelope(transferEnvelope);
-			return;
-		}
-
-		getNext().push(transferEnvelope);
-	}
-
-
-	@Override
-	public void processEvent(final AbstractEvent event) {
-
-		if (event instanceof UnexpectedEnvelopeEvent) {
-
-			final UnexpectedEnvelopeEvent uee = (UnexpectedEnvelopeEvent) event;
-			if (uee.getExpectedSequenceNumber() > this.forwardingBarrier) {
-				this.forwardingBarrier = uee.getExpectedSequenceNumber();
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Setting forwarding barrier to sequence number " + this.forwardingBarrier
-						+ " for output channel " + this.outputChannelID);
-				}
-			}
-		}
-
-		getNext().processEvent(event);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/352c1b99/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
deleted file mode 100644
index 5f14743..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeDispatcher.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-
-public final class RuntimeDispatcher extends AbstractOutputChannelForwarder {
-
-	private final TransferEnvelopeDispatcher dispatcher;
-
-	public RuntimeDispatcher(final TransferEnvelopeDispatcher dispatcher) {
-		super(null);
-
-		this.dispatcher = dispatcher;
-	}
-
-
-	@Override
-	public void push(final TransferEnvelope transferEnvelope) throws IOException, InterruptedException {
-
-		this.dispatcher.processEnvelopeFromOutputChannel(transferEnvelope);
-	}
-}


Mime
View raw message