flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [17/30] Offer buffer-oriented API for I/O (#25)
Date Mon, 09 Jun 2014 18:30:52 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java
deleted file mode 100644
index c1553d8..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputChannelContext.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.Iterator;
-import java.util.Queue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.BufferOrEvent;
-import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedInputChannelBroker;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InputChannelContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ReceiverNotFoundEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-
-
-final class RuntimeInputChannelContext implements InputChannelContext, ByteBufferedInputChannelBroker {
-
-	private static final Log LOG = LogFactory.getLog(RuntimeInputChannelContext.class);
-
-	private final RuntimeInputGateContext inputGateContext;
-
-	private final AbstractByteBufferedInputChannel<?> byteBufferedInputChannel;
-
-	private final TransferEnvelopeDispatcher transferEnvelopeDispatcher;
-
-	private final Queue<TransferEnvelope> queuedEnvelopes = new ArrayDeque<TransferEnvelope>();
-	
-	private Iterator<AbstractEvent> pendingEvents;
-
-	private int lastReceivedEnvelope = -1;
-
-	private boolean destroyCalled = false;
-
-	RuntimeInputChannelContext(final RuntimeInputGateContext inputGateContext,
-			final TransferEnvelopeDispatcher transferEnvelopeDispatcher,
-			final AbstractByteBufferedInputChannel<?> byteBufferedInputChannel) {
-
-		this.inputGateContext = inputGateContext;
-		this.transferEnvelopeDispatcher = transferEnvelopeDispatcher;
-		this.byteBufferedInputChannel = byteBufferedInputChannel;
-		this.byteBufferedInputChannel.setInputChannelBroker(this);
-	}
-
-
-	@Override
-	public BufferOrEvent getNextBufferOrEvent() throws IOException {
-		// return pending events first
-		if (this.pendingEvents != null) {
-			// if the field is not null, it must always have a next value!
-			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
-			if (!this.pendingEvents.hasNext()) {
-				this.pendingEvents = null;
-			}
-			return next;
-		}
-
-		// if no events are pending, get the next buffer
-		TransferEnvelope nextEnvelope;
-		synchronized (this.queuedEnvelopes) {
-			if (this.queuedEnvelopes.isEmpty()) {
-				return null;
-			}
-			nextEnvelope = this.queuedEnvelopes.poll();
-		}
-
-		// schedule events as pending, because events come always after the buffer!
-		if (nextEnvelope.getEventList() != null) {
-			Iterator<AbstractEvent> events = nextEnvelope.getEventList().iterator();
-			if (events.hasNext()) {
-				this.pendingEvents = events;
-			}
-		}
-		
-		// get the buffer, if there is one
-		if (nextEnvelope.getBuffer() != null) {
-			return new BufferOrEvent(nextEnvelope.getBuffer());
-		}
-		else if (this.pendingEvents != null) {
-			// if the field is not null, it must always have a next value!
-			BufferOrEvent next = new BufferOrEvent(this.pendingEvents.next());
-			if (!this.pendingEvents.hasNext()) {
-				this.pendingEvents = null;
-			}
-			
-			return next;
-		}
-		else {
-			// no buffer and no events, this should be an error
-			throw new IOException("Received an envelope with neither data nor events.");
-		}
-	}
-
-	@Override
-	public void transferEventToOutputChannel(AbstractEvent event) throws IOException, InterruptedException {
-		TransferEnvelope ephemeralTransferEnvelope = new TransferEnvelope(0, getJobID(), getChannelID());
-		ephemeralTransferEnvelope.addEvent(event);
-		
-		this.transferEnvelopeDispatcher.processEnvelopeFromInputChannel(ephemeralTransferEnvelope);
-	}
-
-	@Override
-	public void queueTransferEnvelope(TransferEnvelope transferEnvelope) {
-
-		if (ReceiverNotFoundEvent.isReceiverNotFoundEvent(transferEnvelope)) {
-			return;
-		}
-		
-		// The sequence number of the envelope to be queued
-		final int sequenceNumber = transferEnvelope.getSequenceNumber();
-
-		synchronized (this.queuedEnvelopes) {
-
-			if (this.destroyCalled) {
-				final Buffer buffer = transferEnvelope.getBuffer();
-				if (buffer != null) {
-					buffer.recycleBuffer();
-				}
-				return;
-			}
-
-			final int expectedSequenceNumber = this.lastReceivedEnvelope + 1;
-			if (sequenceNumber != expectedSequenceNumber) {
-				// This is a problem, now we are actually missing some data
-				this.byteBufferedInputChannel.reportIOException(new IOException("Expected data packet "
-						+ expectedSequenceNumber + " but received " + sequenceNumber));
-				
-				// notify that something (an exception) is available
-				this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
-
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Input channel " + getChannelName() + " expected envelope " + expectedSequenceNumber
-						+ " but received " + sequenceNumber);
-				}
-
-				// rescue the buffer
-				final Buffer buffer = transferEnvelope.getBuffer();
-				if (buffer != null) {
-					buffer.recycleBuffer();
-				}
-			} else {
-
-				this.queuedEnvelopes.add(transferEnvelope);
-				this.lastReceivedEnvelope = sequenceNumber;
-
-				// Notify the channel about the new data. notify as much as there is (buffer plus once per event)
-				if (transferEnvelope.getBuffer() != null) {
-					this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
-				}
-				if (transferEnvelope.getEventList() != null) {
-					for (int i = 0; i < transferEnvelope.getEventList().size(); i++) {
-						this.byteBufferedInputChannel.notifyGateThatInputIsAvailable();
-					}
-				}
-			}
-		}
-	}
-
-	@Override
-	public ChannelID getChannelID() {
-		return this.byteBufferedInputChannel.getID();
-	}
-
-	@Override
-	public ChannelID getConnectedChannelID() {
-		return this.byteBufferedInputChannel.getConnectedChannelID();
-	}
-
-	@Override
-	public JobID getJobID() {
-		return this.byteBufferedInputChannel.getJobID();
-	}
-
-	@Override
-	public boolean isInputChannel() {
-		return this.byteBufferedInputChannel.isInputChannel();
-	}
-
-	@Override
-	public void destroy() {
-		final Queue<Buffer> buffersToRecycle = new ArrayDeque<Buffer>();
-
-		synchronized (this.queuedEnvelopes) {
-			this.destroyCalled = true;
-
-			while (!this.queuedEnvelopes.isEmpty()) {
-				final TransferEnvelope envelope = this.queuedEnvelopes.poll();
-				if (envelope.getBuffer() != null) {
-					buffersToRecycle.add(envelope.getBuffer());
-				}
-			}
-		}
-
-		while (!buffersToRecycle.isEmpty()) {
-			buffersToRecycle.poll().recycleBuffer();
-		}
-	}
-
-	@Override
-	public void logQueuedEnvelopes() {
-		int numberOfQueuedEnvelopes = 0;
-		int numberOfQueuedEnvelopesWithMemoryBuffers = 0;
-		int numberOfQueuedEnvelopesWithFileBuffers = 0;
-
-		synchronized (this.queuedEnvelopes) {
-
-			final Iterator<TransferEnvelope> it = this.queuedEnvelopes.iterator();
-			while (it.hasNext()) {
-
-				final TransferEnvelope envelope = it.next();
-				++numberOfQueuedEnvelopes;
-				final Buffer buffer = envelope.getBuffer();
-				if (buffer == null) {
-					continue;
-				}
-
-				if (buffer.isBackedByMemory()) {
-					++numberOfQueuedEnvelopesWithMemoryBuffers;
-				} else {
-					++numberOfQueuedEnvelopesWithFileBuffers;
-				}
-			}
-		}
-
-		System.out.println("\t\t" + getChannelName() + ": " + numberOfQueuedEnvelopes + " ("
-			+ numberOfQueuedEnvelopesWithMemoryBuffers + ", " + numberOfQueuedEnvelopesWithFileBuffers + ")");
-
-	}
-
-	@Override
-	public Buffer requestEmptyBuffer(int minimumSizeOfBuffer) throws IOException {
-		return this.inputGateContext.requestEmptyBuffer(minimumSizeOfBuffer);
-	}
-
-	@Override
-	public Buffer requestEmptyBufferBlocking(int minimumSizeOfBuffer) throws IOException, InterruptedException {
-		return this.inputGateContext.requestEmptyBufferBlocking(minimumSizeOfBuffer);
-	}
-
-	@Override
-	public int getMaximumBufferSize() {
-		return this.inputGateContext.getMaximumBufferSize();
-	}
-
-	@Override
-	public boolean isShared() {
-		return this.inputGateContext.isShared();
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		this.inputGateContext.reportAsynchronousEvent();
-	}
-
-	@Override
-	public ChannelType getType() {
-		return this.byteBufferedInputChannel.getType();
-	}
-
-	/**
-	 * Constructs and returns a human-readable name of this channel used for debugging.
-	 * 
-	 * @return a human-readable name of this channel used for debugging
-	 */
-	private String getChannelName() {
-		StringBuilder sb = new StringBuilder(this.inputGateContext.getTaskName());
-		sb.append(' ');
-		sb.append('(');
-		sb.append(this.byteBufferedInputChannel.getChannelIndex());
-		sb.append(',');
-		sb.append(' ');
-		sb.append(this.byteBufferedInputChannel.getID());
-		sb.append(')');
-		return sb.toString();
-	}
-
-	@Override
-	public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-		return this.inputGateContext.registerBufferAvailabilityListener(bufferAvailabilityListener);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputGateContext.java
deleted file mode 100644
index 7e2d492..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeInputGateContext.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.channels.AbstractInputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedInputChannel;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InputChannelContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InputGateContext;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-
-final class RuntimeInputGateContext implements BufferProvider, InputGateContext, LocalBufferPoolOwner {
-
-	private final String taskName;
-
-	private final LocalBufferPool localBufferPool;
-
-	private final TransferEnvelopeDispatcher transferEnvelopeDispatcher;
-
-	private final InputGate<? extends IOReadableWritable> inputGate;
-
-	RuntimeInputGateContext(final String taskName, final TransferEnvelopeDispatcher transferEnvelopeDispatcher,
-			final InputGate<? extends IOReadableWritable> inputGate) {
-
-		this.taskName = taskName;
-		this.localBufferPool = new LocalBufferPool(1, false);
-
-		this.transferEnvelopeDispatcher = transferEnvelopeDispatcher;
-		this.inputGate = inputGate;
-	}
-
-	@Override
-	public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
-		return this.localBufferPool.requestEmptyBuffer(minimumSizeOfBuffer);
-	}
-
-
-	@Override
-	public Buffer requestEmptyBufferBlocking(final int minimumSizeOfBuffer) throws IOException, InterruptedException {
-
-		final Buffer buffer = this.localBufferPool.requestEmptyBuffer(minimumSizeOfBuffer);
-		if (buffer != null) {
-			return buffer;
-		}
-
-		return this.localBufferPool.requestEmptyBufferBlocking(minimumSizeOfBuffer);
-	}
-
-
-	@Override
-	public int getMaximumBufferSize() {
-
-		return this.localBufferPool.getMaximumBufferSize();
-	}
-
-
-	@Override
-	public boolean isShared() {
-
-		return this.localBufferPool.isShared();
-	}
-
-
-	@Override
-	public void reportAsynchronousEvent() {
-
-		this.localBufferPool.reportAsynchronousEvent();
-	}
-
-	@Override
-	public int getNumberOfChannels() {
-
-		return this.inputGate.getNumberOfInputChannels();
-	}
-
-
-	@Override
-	public void setDesignatedNumberOfBuffers(int numberOfBuffers) {
-
-		this.localBufferPool.setDesignatedNumberOfBuffers(numberOfBuffers);
-	}
-
-
-	@Override
-	public void clearLocalBufferPool() {
-
-		this.localBufferPool.destroy();
-	}
-
-
-	@Override
-	public void logBufferUtilization() {
-
-		final int ava = this.localBufferPool.getNumberOfAvailableBuffers();
-		final int req = this.localBufferPool.getRequestedNumberOfBuffers();
-		final int des = this.localBufferPool.getDesignatedNumberOfBuffers();
-
-		System.out
-			.println("\t\tInput gate " + this.inputGate.getIndex() + " of " + this.taskName + ": " + ava
-				+ " available, " + req + " requested, " + des + " designated");
-	}
-
-
-	@Override
-	public GateID getGateID() {
-
-		return this.inputGate.getGateID();
-	}
-
-
-	@Override
-	public InputChannelContext createInputChannelContext(final ChannelID channelID,
-			final InputChannelContext previousContext) {
-
-		AbstractInputChannel<? extends IOReadableWritable> channel = null;
-		for (int i = 0; i < this.inputGate.getNumberOfInputChannels(); ++i) {
-			AbstractInputChannel<? extends IOReadableWritable> candidateChannel = this.inputGate.getInputChannel(i);
-			if (candidateChannel.getID().equals(channelID)) {
-				channel = candidateChannel;
-				break;
-			}
-		}
-
-		if (channel == null) {
-			throw new IllegalArgumentException("Cannot find input channel with ID " + channelID);
-		}
-
-		if (!(channel instanceof AbstractByteBufferedInputChannel)) {
-			throw new IllegalStateException("Channel with ID" + channelID
-				+ " is not of type AbstractByteBufferedInputChannel");
-		}
-
-		return new RuntimeInputChannelContext(this, this.transferEnvelopeDispatcher,
-			(AbstractByteBufferedInputChannel<? extends IOReadableWritable>) channel);
-	}
-
-
-	@Override
-	public LocalBufferPoolOwner getLocalBufferPoolOwner() {
-
-		return this;
-	}
-
-	/**
-	 * Returns the name of the task this gate belongs to.
-	 * 
-	 * @return the name of the task this gate belongs to
-	 */
-	String getTaskName() {
-
-		return this.taskName;
-	}
-
-
-	@Override
-	public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
-		return this.localBufferPool.registerBufferAvailabilityListener(bufferAvailabilityListener);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelBroker.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelBroker.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelBroker.java
deleted file mode 100644
index 41fc7ff..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelBroker.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedOutputChannel;
-import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedChannelCloseEvent;
-import eu.stratosphere.nephele.io.channels.bytebuffered.ByteBufferedOutputChannelBroker;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelForwardingChain;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ReceiverNotFoundEvent;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelope;
-
-final class RuntimeOutputChannelBroker extends AbstractOutputChannelForwarder implements
-		ByteBufferedOutputChannelBroker {
-
-	/**
-	 * The byte buffered output channel this context belongs to.
-	 */
-	private final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel;
-
-	/**
-	 * The buffer provider this channel broker to obtain buffers from.
-	 */
-	private final RuntimeOutputGateContext outputGateContext;
-
-	/**
-	 * The forwarding chain along which the created transfer envelopes will be pushed.
-	 */
-	private OutputChannelForwardingChain forwardingChain;
-
-	/**
-	 * Points to the {@link TransferEnvelope} object that will be passed to the framework upon
-	 * the next <code>releaseWriteBuffers</code> call.
-	 */
-	private TransferEnvelope outgoingTransferEnvelope = null;
-
-	/**
-	 * Stores whether the receiver has acknowledged the close request from this channel.
-	 */
-	private boolean closeAcknowledgmentReceived = false;
-
-	/**
-	 * Stores the last sequence number of the transfer envelope for which the receiver could not be found.
-	 */
-	private int lastSequenceNumberWithReceiverNotFound = -1;
-
-	/**
-	 * The sequence number for the next {@link TransferEnvelope} to be created.
-	 */
-	private int sequenceNumber = 0;
-
-	RuntimeOutputChannelBroker(final RuntimeOutputGateContext outputGateContext,
-			final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel,
-			final AbstractOutputChannelForwarder next) {
-
-		super(next);
-
-		if (next == null) {
-			throw new IllegalArgumentException("Argument next must not be null");
-		}
-
-		this.outputGateContext = outputGateContext;
-		this.byteBufferedOutputChannel = byteBufferedOutputChannel;
-		this.byteBufferedOutputChannel.setByteBufferedOutputChannelBroker(this);
-	}
-
-	public void setForwardingChain(final OutputChannelForwardingChain forwardingChain) {
-		this.forwardingChain = forwardingChain;
-	}
-
-
-	@Override
-	public boolean hasDataLeft() throws IOException, InterruptedException {
-
-		if (this.closeAcknowledgmentReceived) {
-			return getNext().hasDataLeft();
-		}
-
-		if ((this.lastSequenceNumberWithReceiverNotFound + 1) == this.sequenceNumber) {
-			return getNext().hasDataLeft();
-		}
-
-		return true;
-	}
-
-
-	@Override
-	public void processEvent(final AbstractEvent event) {
-
-		if (event instanceof ByteBufferedChannelCloseEvent) {
-			this.closeAcknowledgmentReceived = true;
-		} else if (event instanceof ReceiverNotFoundEvent) {
-			this.lastSequenceNumberWithReceiverNotFound = ((ReceiverNotFoundEvent) event).getSequenceNumber();
-		} else if (event instanceof AbstractTaskEvent) {
-			throw new IllegalStateException("Received synchronous task event " + event);
-		}
-
-		getNext().processEvent(event);
-	}
-
-
-	@Override
-	public Buffer requestEmptyWriteBuffer() throws InterruptedException, IOException {
-
-		if (this.outgoingTransferEnvelope == null) {
-			this.outgoingTransferEnvelope = createNewOutgoingTransferEnvelope();
-		}
-
-		final int uncompressedBufferSize = calculateBufferSize();
-
-		return this.outputGateContext.requestEmptyBufferBlocking(uncompressedBufferSize);
-	}
-
-	/**
-	 * Creates a new {@link TransferEnvelope} object. The method assigns
-	 * and increases the sequence number. Moreover, it will look up the list of receivers for this transfer envelope.
-	 * This method will block until the lookup is completed.
-	 * 
-	 * @return a new {@link TransferEnvelope} object containing the correct sequence number and receiver list
-	 */
-	private TransferEnvelope createNewOutgoingTransferEnvelope() {
-
-		final TransferEnvelope transferEnvelope = new TransferEnvelope(this.sequenceNumber++,
-			this.byteBufferedOutputChannel.getJobID(),
-			this.byteBufferedOutputChannel.getID());
-
-		return transferEnvelope;
-	}
-
-	/**
-	 * Calculates the recommended size of the next buffer to be
-	 * handed to the attached channel object in bytes.
-	 * 
-	 * @return the recommended size of the next buffer in bytes
-	 */
-	private int calculateBufferSize() {
-
-		// TODO: Include latency considerations
-		return this.outputGateContext.getMaximumBufferSize();
-	}
-
-
-	@Override
-	public void releaseWriteBuffer(final Buffer buffer) throws IOException, InterruptedException {
-
-		// Check for events
-		this.forwardingChain.processQueuedEvents();
-
-		if (this.outgoingTransferEnvelope == null) {
-			throw new IllegalStateException("Cannot find transfer envelope for channel with ID "
-				+ this.byteBufferedOutputChannel.getID());
-		}
-
-		// Consistency check
-		if (this.outgoingTransferEnvelope.getBuffer() != null) {
-			throw new IllegalStateException("Channel " + this.byteBufferedOutputChannel.getID()
-				+ " has already a buffer attached");
-		}
-		buffer.flip();
-		this.outgoingTransferEnvelope.setBuffer(buffer);
-
-		this.forwardingChain.pushEnvelope(this.outgoingTransferEnvelope);
-		this.outgoingTransferEnvelope = null;
-	}
-
-
-	@Override
-	public boolean hasDataLeftToTransmit() throws IOException, InterruptedException {
-
-		// Check for events
-		this.forwardingChain.processQueuedEvents();
-
-		return this.forwardingChain.anyForwarderHasDataLeft();
-	}
-
-
-	@Override
-	public void transferEventToInputChannel(final AbstractEvent event) throws IOException, InterruptedException {
-
-		if (this.outgoingTransferEnvelope != null) {
-			this.outgoingTransferEnvelope.addEvent(event);
-		} else {
-
-			final TransferEnvelope ephemeralTransferEnvelope = createNewOutgoingTransferEnvelope();
-			ephemeralTransferEnvelope.addEvent(event);
-
-			this.forwardingChain.pushEnvelope(ephemeralTransferEnvelope);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelContext.java
deleted file mode 100644
index 01d8ba2..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputChannelContext.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.ChannelType;
-import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedOutputChannel;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelForwardingChain;
-
-public final class RuntimeOutputChannelContext extends AbstractOutputChannelContext {
-
-	private final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel;
-
-	RuntimeOutputChannelContext(final AbstractByteBufferedOutputChannel<?> byteBufferedOutputChannel,
-			final OutputChannelForwardingChain forwardingChain) {
-		super(forwardingChain);
-
-		this.byteBufferedOutputChannel = byteBufferedOutputChannel;
-	}
-
-
-	@Override
-	public boolean isInputChannel() {
-
-		return false;
-	}
-
-
-	@Override
-	public ChannelID getChannelID() {
-
-		return this.byteBufferedOutputChannel.getID();
-	}
-
-
-	@Override
-	public ChannelID getConnectedChannelID() {
-
-		return this.byteBufferedOutputChannel.getConnectedChannelID();
-	}
-
-
-	@Override
-	public JobID getJobID() {
-
-		return this.byteBufferedOutputChannel.getJobID();
-	}
-
-
-	@Override
-	public ChannelType getType() {
-
-		return this.byteBufferedOutputChannel.getType();
-	}
-
-
-	@Override
-	protected void processEventAsynchronously(final AbstractEvent event) {
-
-		this.byteBufferedOutputChannel.processEvent(event);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputGateContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputGateContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputGateContext.java
deleted file mode 100644
index 72e5047..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeOutputGateContext.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.AbstractOutputChannel;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.bytebuffered.AbstractByteBufferedOutputChannel;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.AbstractOutputChannelForwarder;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputChannelForwardingChain;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputGateContext;
-
-final class RuntimeOutputGateContext implements BufferProvider, OutputGateContext {
-
-	private final RuntimeTaskContext taskContext;
-
-	private final OutputGate<? extends IOReadableWritable> outputGate;
-
-	RuntimeOutputGateContext(final RuntimeTaskContext taskContext, final OutputGate<? extends IOReadableWritable> outputGate) {
-
-		this.taskContext = taskContext;
-		this.outputGate = outputGate;
-	}
-
-	AbstractID getFileOwnerID() {
-
-		return this.taskContext.getFileOwnerID();
-	}
-
-
-	@Override
-	public int getMaximumBufferSize() {
-
-		return this.taskContext.getMaximumBufferSize();
-	}
-
-
-	@Override
-	public Buffer requestEmptyBuffer(int minimumSizeOfBuffer) throws IOException {
-
-		return this.taskContext.requestEmptyBuffer(minimumSizeOfBuffer);
-	}
-
-
-	@Override
-	public Buffer requestEmptyBufferBlocking(int minimumSizeOfBuffer) throws IOException, InterruptedException {
-
-		Buffer buffer = this.taskContext.requestEmptyBuffer(minimumSizeOfBuffer);
-
-		// No memory-based buffer available
-		if (buffer == null) {
-			// Wait until a memory-based buffer is available
-			buffer = this.taskContext.requestEmptyBufferBlocking(minimumSizeOfBuffer);
-		}
-
-		return buffer;
-	}
-
-
-	@Override
-	public boolean isShared() {
-
-		return this.taskContext.isShared();
-	}
-
-
-	@Override
-	public void reportAsynchronousEvent() {
-
-		this.taskContext.reportAsynchronousEvent();
-	}
-
-
-	@Override
-	public GateID getGateID() {
-
-		return this.outputGate.getGateID();
-	}
-
-
-	@Override
-	public OutputChannelContext createOutputChannelContext(ChannelID channelID, OutputChannelContext previousContext,
-			boolean isReceiverRunning, boolean mergeSpillBuffers) {
-
-		if (previousContext != null) {
-			throw new IllegalStateException("Found previous output context for channel " + channelID);
-		}
-
-		AbstractOutputChannel<? extends IOReadableWritable> channel = null;
-		for (int i = 0; i < this.outputGate.getNumberOfOutputChannels(); ++i) {
-			AbstractOutputChannel<? extends IOReadableWritable> candidateChannel = this.outputGate.getOutputChannel(i);
-			if (candidateChannel.getID().equals(channelID)) {
-				channel = candidateChannel;
-				break;
-			}
-		}
-
-		if (channel == null) {
-			throw new IllegalArgumentException("Cannot find output channel with ID " + channelID);
-		}
-
-		if (!(channel instanceof AbstractByteBufferedOutputChannel)) {
-			throw new IllegalStateException("Channel with ID" + channelID
-				+ " is not of type AbstractByteBufferedOutputChannel");
-		}
-
-		// The output channel for this context
-		final AbstractByteBufferedOutputChannel<? extends IOReadableWritable> outputChannel = (AbstractByteBufferedOutputChannel<? extends IOReadableWritable>) channel;
-
-		// Construct the forwarding chain
-		RuntimeOutputChannelBroker outputChannelBroker;
-		AbstractOutputChannelForwarder last;
-		// Construction for in-memory and network channels
-		final RuntimeDispatcher runtimeDispatcher = new RuntimeDispatcher(
-			this.taskContext.getTransferEnvelopeDispatcher());
-		/*
-		 * final SpillingBarrier spillingBarrier = new SpillingBarrier(isReceiverRunning, mergeSpillBuffers,
-		 * runtimeDispatcher);
-		 * final ForwardingBarrier forwardingBarrier = new ForwardingBarrier(channelID, spillingBarrier);
-		 */
-		final ForwardingBarrier forwardingBarrier = new ForwardingBarrier(channelID, runtimeDispatcher);
-		outputChannelBroker = new RuntimeOutputChannelBroker(this, outputChannel, forwardingBarrier);
-		last = runtimeDispatcher;
-
-		final OutputChannelForwardingChain forwardingChain = new OutputChannelForwardingChain(outputChannelBroker, last);
-
-		// Set forwarding chain for broker
-		outputChannelBroker.setForwardingChain(forwardingChain);
-
-		return new RuntimeOutputChannelContext(outputChannel, forwardingChain);
-	}
-
-
-	@Override
-	public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
-		return this.taskContext.registerBufferAvailabilityListener(bufferAvailabilityListener);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTask.java
deleted file mode 100644
index 5a44530..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTask.java
+++ /dev/null
@@ -1,346 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import eu.stratosphere.configuration.Configuration;
-import eu.stratosphere.nephele.execution.Environment;
-import eu.stratosphere.nephele.execution.ExecutionListener;
-import eu.stratosphere.nephele.execution.ExecutionObserver;
-import eu.stratosphere.nephele.execution.ExecutionState;
-import eu.stratosphere.nephele.execution.ExecutionStateTransition;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
-import eu.stratosphere.nephele.executiongraph.ExecutionVertexID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.profiling.TaskManagerProfiler;
-import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
-import eu.stratosphere.nephele.taskmanager.Task;
-import eu.stratosphere.nephele.taskmanager.TaskManager;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPoolOwner;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.TaskContext;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-import eu.stratosphere.util.StringUtils;
-
-public final class RuntimeTask implements Task, ExecutionObserver {
-
-	/**
-	 * The log object used for debugging.
-	 */
-	private static final Log LOG = LogFactory.getLog(RuntimeTask.class);
-
-	private final ExecutionVertexID vertexID;
-
-	private final RuntimeEnvironment environment;
-
-	private final TaskManager taskManager;
-
-	/**
-	 * Stores whether the task has been canceled.
-	 */
-	private volatile boolean isCanceled = false;
-
-	/**
-	 * The current execution state of the task
-	 */
-	private volatile ExecutionState executionState = ExecutionState.STARTING;
-
-	private Queue<ExecutionListener> registeredListeners = new ConcurrentLinkedQueue<ExecutionListener>();
-
-	public RuntimeTask(final ExecutionVertexID vertexID, final RuntimeEnvironment environment,
-			final TaskManager taskManager) {
-
-		this.vertexID = vertexID;
-		this.environment = environment;
-		this.taskManager = taskManager;
-
-		this.environment.setExecutionObserver(this);
-	}
-
-
-	@Override
-	public void executionStateChanged(final ExecutionState newExecutionState, final String optionalMessage) {
-
-		// Check the state transition
-		ExecutionStateTransition.checkTransition(false, getTaskName(), this.executionState, newExecutionState);
-
-		// Make sure the reason for a transition to FAILED appears in the log files
-		if (newExecutionState == ExecutionState.FAILED) {
-			LOG.error(optionalMessage);
-		}
-
-		// Notify all listener objects
-		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
-		while (it.hasNext()) {
-			it.next().executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
-				optionalMessage);
-		}
-
-		// Store the new execution state
-		this.executionState = newExecutionState;
-
-		// Finally propagate the state change to the job manager
-		this.taskManager.executionStateChanged(this.environment.getJobID(), this.vertexID, newExecutionState,
-			optionalMessage);
-	}
-
-	/**
-	 * Returns the name of the task associated with this observer object.
-	 * 
-	 * @return the name of the task associated with this observer object
-	 */
-	private String getTaskName() {
-
-		return this.environment.getTaskName() + " (" + (this.environment.getIndexInSubtaskGroup() + 1) + "/"
-			+ this.environment.getCurrentNumberOfSubtasks() + ")";
-	}
-
-
-	@Override
-	public void userThreadStarted(final Thread userThread) {
-
-		// Notify the listeners
-		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
-		while (it.hasNext()) {
-			it.next().userThreadStarted(this.environment.getJobID(), this.vertexID, userThread);
-		}
-	}
-
-
-	@Override
-	public void userThreadFinished(final Thread userThread) {
-
-		// Notify the listeners
-		final Iterator<ExecutionListener> it = this.registeredListeners.iterator();
-		while (it.hasNext()) {
-			it.next().userThreadFinished(this.environment.getJobID(), this.vertexID, userThread);
-		}
-	}
-
-	/**
-	 * Registers the {@link ExecutionListener} object for this task. This object
-	 * will be notified about important events during the task execution.
-	 * 
-	 * @param executionListener
-	 *        the object to be notified for important events during the task execution
-	 */
-
-	public void registerExecutionListener(final ExecutionListener executionListener) {
-
-		this.registeredListeners.add(executionListener);
-	}
-
-	/**
-	 * Unregisters the {@link ExecutionListener} object for this environment. This object
-	 * will no longer be notified about important events during the task execution.
-	 * 
-	 * @param executionListener
-	 *        the lister object to be unregistered
-	 */
-
-	public void unregisterExecutionListener(final ExecutionListener executionListener) {
-
-		this.registeredListeners.remove(executionListener);
-	}
-
-
-	@Override
-	public void markAsFailed() {
-
-		executionStateChanged(ExecutionState.FAILED, "Execution thread died unexpectedly");
-	}
-
-
-	@Override
-	public void cancelExecution() {
-
-		cancelOrKillExecution(true);
-	}
-
-
-	@Override
-	public void killExecution() {
-
-		cancelOrKillExecution(false);
-	}
-
-	/**
-	 * Cancels or kills the task.
-	 * 
-	 * @param cancel
-	 *        <code>true/code> if the task shall be canceled, <code>false</code> if it shall be killed
-	 */
-	private void cancelOrKillExecution(final boolean cancel) {
-
-		final Thread executingThread = this.environment.getExecutingThread();
-
-		if (executingThread == null) {
-			return;
-		}
-
-		if (this.executionState != ExecutionState.RUNNING && this.executionState != ExecutionState.FINISHING) {
-			return;
-		}
-
-		LOG.info((cancel ? "Canceling " : "Killing ") + this.environment.getTaskNameWithIndex());
-
-		if (cancel) {
-			this.isCanceled = true;
-			// Change state
-			executionStateChanged(ExecutionState.CANCELING, null);
-
-			// Request user code to shut down
-			try {
-				final AbstractInvokable invokable = this.environment.getInvokable();
-				if (invokable != null) {
-					invokable.cancel();
-				}
-			} catch (Throwable e) {
-				LOG.error(StringUtils.stringifyException(e));
-			}
-		}
-
-		// Continuously interrupt the user thread until it changed to state CANCELED
-		while (true) {
-
-			executingThread.interrupt();
-
-			if (!executingThread.isAlive()) {
-				break;
-			}
-
-			try {
-				executingThread.join(1000);
-			} catch (InterruptedException e) {}
-			
-			if (!executingThread.isAlive()) {
-				break;
-			}
-
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Sending repeated " + (cancel == true ? "canceling" : "killing") + " signal to " +
-					this.environment.getTaskName() + " with state " + this.executionState);
-			}
-		}
-	}
-
-
-	@Override
-	public void startExecution() {
-
-		final Thread thread = this.environment.getExecutingThread();
-		thread.start();
-	}
-
-
-	@Override
-	public boolean isCanceled() {
-
-		return this.isCanceled;
-	}
-
-
-	@Override
-	public boolean isTerminated() {
-
-		final Thread executingThread = this.environment.getExecutingThread();
-		if (executingThread.getState() == Thread.State.TERMINATED) {
-			return true;
-		}
-
-		return false;
-	}
-
-
-	@Override
-	public Environment getEnvironment() {
-
-		return this.environment;
-	}
-
-	/**
-	 * Returns the runtime environment associated with this task.
-	 * 
-	 * @return the runtime environment associated with this task
-	 */
-	public RuntimeEnvironment getRuntimeEnvironment() {
-
-		return this.environment;
-	}
-
-
-	@Override
-	public JobID getJobID() {
-
-		return this.environment.getJobID();
-	}
-
-
-	@Override
-	public ExecutionVertexID getVertexID() {
-
-		return this.vertexID;
-	}
-
-
-	@Override
-	public void registerProfiler(final TaskManagerProfiler taskManagerProfiler, final Configuration jobConfiguration) {
-
-		taskManagerProfiler.registerExecutionListener(this, jobConfiguration);
-	}
-
-
-	@Override
-	public void unregisterMemoryManager(final MemoryManager memoryManager) {
-
-		if (memoryManager != null) {
-			memoryManager.releaseAll(this.environment.getInvokable());
-		}
-	}
-
-
-	@Override
-	public void unregisterProfiler(final TaskManagerProfiler taskManagerProfiler) {
-
-		if (taskManagerProfiler != null) {
-			taskManagerProfiler.unregisterExecutionListener(this.vertexID);
-		}
-	}
-
-
-	@Override
-	public TaskContext createTaskContext(final TransferEnvelopeDispatcher transferEnvelopeDispatcher,
-			final LocalBufferPoolOwner previousBufferPoolOwner) {
-
-		if (previousBufferPoolOwner != null) {
-			throw new IllegalStateException("Vertex " + this.vertexID + " has a previous buffer pool owner");
-		}
-
-		return new RuntimeTaskContext(this, transferEnvelopeDispatcher);
-	}
-
-
-	@Override
-	public ExecutionState getExecutionState() {
-
-		return this.executionState;
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTaskContext.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTaskContext.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTaskContext.java
deleted file mode 100644
index 7f3a9a0..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/runtime/RuntimeTaskContext.java
+++ /dev/null
@@ -1,211 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.runtime;
-
-import java.io.IOException;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.execution.RuntimeEnvironment;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.GateID;
-import eu.stratosphere.nephele.io.InputGate;
-import eu.stratosphere.nephele.io.OutputGate;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferAvailabilityListener;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.LocalBufferPool;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.InputGateContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.OutputGateContext;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.TaskContext;
-import eu.stratosphere.nephele.taskmanager.transferenvelope.TransferEnvelopeDispatcher;
-
-public final class RuntimeTaskContext implements BufferProvider, TaskContext {
-
-	private final LocalBufferPool localBufferPool;
-
-	private final RuntimeTask task;
-
-	private final int numberOfOutputChannels;
-
-	private final TransferEnvelopeDispatcher transferEnvelopeDispatcher;
-
-	RuntimeTaskContext(final RuntimeTask task, final TransferEnvelopeDispatcher transferEnvelopeDispatcher) {
-
-		this.localBufferPool = new LocalBufferPool(1, false);
-		this.task = task;
-
-		final RuntimeEnvironment environment = task.getRuntimeEnvironment();
-
-		// Compute number of output input channels
-		int nooc = 0;
-		for (int i = 0; i < environment.getNumberOfOutputGates(); ++i) {
-			final OutputGate<? extends IOReadableWritable> outputGate = environment.getOutputGate(i);
-			if (outputGate.isBroadcast()) {
-				++nooc;
-			} else {
-				nooc += outputGate.getNumberOfOutputChannels();
-			}
-		}
-		this.numberOfOutputChannels = nooc;
-
-		this.transferEnvelopeDispatcher = transferEnvelopeDispatcher;
-	}
-
-	TransferEnvelopeDispatcher getTransferEnvelopeDispatcher() {
-
-		return this.transferEnvelopeDispatcher;
-	}
-
-
-
-	@Override
-	public Buffer requestEmptyBuffer(final int minimumSizeOfBuffer) throws IOException {
-
-		return this.localBufferPool.requestEmptyBuffer(minimumSizeOfBuffer);
-	}
-
-
-	@Override
-	public Buffer requestEmptyBufferBlocking(int minimumSizeOfBuffer) throws IOException,
-			InterruptedException {
-
-		return this.localBufferPool.requestEmptyBufferBlocking(minimumSizeOfBuffer);
-	}
-
-
-	@Override
-	public int getMaximumBufferSize() {
-
-		return this.localBufferPool.getMaximumBufferSize();
-	}
-
-
-	@Override
-	public void clearLocalBufferPool() {
-
-		// Clear the buffer cache
-		this.localBufferPool.destroy();
-
-	}
-
-
-	@Override
-	public boolean isShared() {
-
-		return false;
-	}
-
-
-	@Override
-	public void logBufferUtilization() {
-
-		final int ava = this.localBufferPool.getNumberOfAvailableBuffers();
-		final int req = this.localBufferPool.getRequestedNumberOfBuffers();
-		final int des = this.localBufferPool.getDesignatedNumberOfBuffers();
-
-		final RuntimeEnvironment environment = this.task.getRuntimeEnvironment();
-
-		System.out.println("\t\t" + environment.getTaskNameWithIndex() + ": " + ava + " available, " + req
-			+ " requested, " + des + " designated");
-	}
-
-
-
-	@Override
-	public void reportAsynchronousEvent() {
-
-		this.localBufferPool.reportAsynchronousEvent();
-	}
-
-
-	@Override
-	public int getNumberOfChannels() {
-
-		return this.numberOfOutputChannels;
-	}
-
-
-	@Override
-	public void setDesignatedNumberOfBuffers(int numberOfBuffers) {
-
-		this.localBufferPool.setDesignatedNumberOfBuffers(numberOfBuffers);
-	}
-
-	AbstractID getFileOwnerID() {
-
-		return this.task.getVertexID();
-	}
-
-
-	@Override
-	public OutputGateContext createOutputGateContext(final GateID gateID) {
-
-		if (gateID == null) {
-			throw new IllegalArgumentException("Argument gateID must not be null");
-		}
-
-		OutputGate<? extends IOReadableWritable> outputGate = null;
-		final RuntimeEnvironment re = this.task.getRuntimeEnvironment();
-		for (int i = 0; i < re.getNumberOfOutputGates(); ++i) {
-			final OutputGate<? extends IOReadableWritable> candidateGate = re.getOutputGate(i);
-			if (candidateGate.getGateID().equals(gateID)) {
-				outputGate = candidateGate;
-				break;
-			}
-		}
-
-		if (outputGate == null) {
-			throw new IllegalStateException("Cannot find output gate with ID " + gateID);
-		}
-
-		return new RuntimeOutputGateContext(this, outputGate);
-	}
-
-
-	@Override
-	public InputGateContext createInputGateContext(final GateID gateID) {
-
-		if (gateID == null) {
-			throw new IllegalArgumentException("Argument gateID must not be null");
-		}
-
-		InputGate<? extends IOReadableWritable> inputGate = null;
-		final RuntimeEnvironment re = this.task.getRuntimeEnvironment();
-		for (int i = 0; i < re.getNumberOfInputGates(); ++i) {
-			final InputGate<? extends IOReadableWritable> candidateGate = re.getInputGate(i);
-			if (candidateGate.getGateID().equals(gateID)) {
-				inputGate = candidateGate;
-				break;
-			}
-		}
-
-		if (inputGate == null) {
-			throw new IllegalStateException("Cannot find input gate with ID " + gateID);
-		}
-
-		return new RuntimeInputGateContext(re.getTaskNameWithIndex(), this.transferEnvelopeDispatcher, inputGate);
-	}
-
-	public LocalBufferPool getLocalBufferPool() {
-
-		return this.localBufferPool;
-	}
-
-
-	@Override
-	public boolean registerBufferAvailabilityListener(final BufferAvailabilityListener bufferAvailabilityListener) {
-
-		return this.localBufferPool.registerBufferAvailabilityListener(bufferAvailabilityListener);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractDeserializer.java
deleted file mode 100644
index 3a810f6..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractDeserializer.java
+++ /dev/null
@@ -1,355 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.EOFException;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.ReadableByteChannel;
-
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.io.channels.DefaultDeserializer;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public abstract class AbstractDeserializer {
-
-	private enum DeserializationState {
-		NOTDESERIALIZED,
-		SEQUENCENUMBERDESERIALIZED,
-		JOBIDDESERIALIZED,
-		SOURCEDESERIALIZED,
-		NOTIFICATIONSDESERIALIZED,
-		FULLYDESERIALIZED
-	};
-
-	private static final int SIZEOFINT = 4;
-
-	private TransferEnvelope transferEnvelope = null;
-
-	private DeserializationState deserializationState = DeserializationState.NOTDESERIALIZED;
-
-	private final DefaultDeserializer<ChannelID> channelIDDeserializationBuffer = new DefaultDeserializer<ChannelID>(
-			ChannelID.class, true);
-
-	private final DefaultDeserializer<JobID> jobIDDeserializationBuffer = new DefaultDeserializer<JobID>(
-			JobID.class, true);
-
-	private final DefaultDeserializer<EventList> notificationListDeserializationBuffer = new DefaultDeserializer<EventList>(
-			EventList.class, true);
-
-	private final ByteBuffer tempBuffer = ByteBuffer.allocate(8); // TODO: Make this configurable
-
-	private boolean bufferExistanceDeserialized = false;
-
-	private boolean eventListExistanceDeserialized = false;
-
-	private boolean sequenceNumberDeserializationStarted = false;
-
-	private int sizeOfBuffer = -1;
-
-	private int deserializedSequenceNumber = -1;
-
-	private Buffer buffer = null;
-
-	private JobID deserializedJobID = null;
-
-	private ChannelID deserializedSourceID = null;
-
-	private EventList deserializedEventList = null;
-
-	public void read(ReadableByteChannel readableByteChannel) throws IOException, NoBufferAvailableException {
-
-		while (true) {
-
-			// System.out.println("INCOMING State: " + this.deserializationState);
-
-			boolean waitingForMoreData = false;
-
-			switch (deserializationState) {
-			case NOTDESERIALIZED:
-				waitingForMoreData = readSequenceNumber(readableByteChannel);
-				break;
-			case SEQUENCENUMBERDESERIALIZED:
-				waitingForMoreData = readID(readableByteChannel);
-				break;
-			case JOBIDDESERIALIZED:
-				waitingForMoreData = readID(readableByteChannel);
-				break;
-			case SOURCEDESERIALIZED:
-				waitingForMoreData = readNotificationList(readableByteChannel);
-				break;
-			case NOTIFICATIONSDESERIALIZED:
-				waitingForMoreData = readBuffer(readableByteChannel);
-				break;
-			case FULLYDESERIALIZED:
-				return;
-			}
-
-			if (waitingForMoreData) {
-				return;
-			}
-
-		}
-	}
-
-	protected final ByteBuffer getTempBuffer() {
-		return this.tempBuffer;
-	}
-
-	protected void setBuffer(final Buffer buffer) {
-		this.buffer = buffer;
-	}
-
-	protected int getSizeOfBuffer() {
-		return this.sizeOfBuffer;
-	}
-
-	protected JobID getDeserializedJobID() {
-		return this.deserializedJobID;
-	}
-
-	protected ChannelID getDeserializedSourceID() {
-		return this.deserializedSourceID;
-	}
-
-	private boolean readSequenceNumber(ReadableByteChannel readableByteChannel) throws IOException {
-
-		if (!this.sequenceNumberDeserializationStarted) {
-			this.tempBuffer.position(0);
-			this.tempBuffer.limit(SIZEOFINT);
-			this.sequenceNumberDeserializationStarted = true;
-		}
-
-		if (readableByteChannel.read(this.tempBuffer) == -1) {
-
-			if (this.tempBuffer.position() == 0) {
-				// Regular end of stream
-				throw new EOFException();
-			} else {
-				throw new IOException("Unexpected end of stream while deserializing the sequence number");
-			}
-		}
-
-		if (!this.tempBuffer.hasRemaining()) {
-
-			this.deserializedSequenceNumber = byteBufferToInteger(this.tempBuffer, 0);
-			if (this.deserializedSequenceNumber < 0) {
-				throw new IOException("Received invalid sequence number: " + this.deserializedSequenceNumber);
-			}
-
-			this.deserializationState = DeserializationState.SEQUENCENUMBERDESERIALIZED;
-			this.sequenceNumberDeserializationStarted = false;
-			this.transferEnvelope = null;
-			this.sizeOfBuffer = -1;
-			this.bufferExistanceDeserialized = false;
-			this.eventListExistanceDeserialized = false;
-			this.tempBuffer.clear();
-			this.buffer = null;
-			this.jobIDDeserializationBuffer.clear();
-			this.channelIDDeserializationBuffer.clear();
-			this.deserializedEventList = null;
-			return false;
-		}
-
-		return true;
-	}
-
-	private boolean readID(ReadableByteChannel readableByteChannel) throws IOException {
-
-		if (this.deserializationState == DeserializationState.SEQUENCENUMBERDESERIALIZED) {
-
-			this.deserializedJobID = this.jobIDDeserializationBuffer.readData(null, readableByteChannel);
-			if (this.deserializedJobID == null) {
-				return true;
-			}
-
-			this.deserializationState = DeserializationState.JOBIDDESERIALIZED;
-
-		} else {
-
-			this.deserializedSourceID = this.channelIDDeserializationBuffer.readData(null, readableByteChannel);
-			if (this.deserializedSourceID == null) {
-				return true;
-			}
-
-			this.deserializationState = DeserializationState.SOURCEDESERIALIZED;
-		}
-
-		return false;
-	}
-
-	private boolean readNotificationList(ReadableByteChannel readableByteChannel) throws IOException {
-
-		if (!this.eventListExistanceDeserialized) {
-			this.tempBuffer.position(0);
-			this.tempBuffer.limit(1);
-			readableByteChannel.read(this.tempBuffer);
-
-			if (this.tempBuffer.hasRemaining()) {
-				return true;
-			}
-
-			this.eventListExistanceDeserialized = true;
-			final boolean eventListFollows = (this.tempBuffer.get(0) == (byte) 1);
-			this.tempBuffer.clear();
-
-			if (!eventListFollows) {
-				// No event list here
-				this.transferEnvelope = new TransferEnvelope(this.deserializedSequenceNumber, this.deserializedJobID,
-					this.deserializedSourceID, this.deserializedEventList);
-				this.deserializationState = DeserializationState.NOTIFICATIONSDESERIALIZED;
-				return false;
-			}
-		}
-
-		this.deserializedEventList = this.notificationListDeserializationBuffer.readData(null, readableByteChannel);
-		if (this.deserializedEventList == null) {
-			return true;
-		} else {
-			this.transferEnvelope = new TransferEnvelope(this.deserializedSequenceNumber, this.deserializedJobID,
-				this.deserializedSourceID, this.deserializedEventList);
-			this.deserializationState = DeserializationState.NOTIFICATIONSDESERIALIZED;
-			return false;
-		}
-	}
-
-	/**
-	 * Read the buffer's actual data from the stream.
-	 * 
-	 * @param readableByteChannel
-	 *        the stream to read the buffer data from
-	 * @return <code>true</code> if more buffer data need to be read from the stream, <code>false</code> otherwise
-	 * @throws IOException
-	 *         thrown if an I/O error occurred while reading data from the stream
-	 * @throws NoBufferAvailableException
-	 *         thrown if the deserialization process could not be continued due to a lack of buffers
-	 */
-	protected abstract boolean readBufferData(ReadableByteChannel readableByteChannel) throws IOException,
-			NoBufferAvailableException;
-
-	private boolean readBuffer(final ReadableByteChannel readableByteChannel) throws IOException,
-			NoBufferAvailableException {
-
-		if (!this.bufferExistanceDeserialized) {
-
-			this.tempBuffer.position(0);
-			this.tempBuffer.limit(1);
-
-			final int bytesRead = readableByteChannel.read(this.tempBuffer);
-			if (bytesRead == -1) {
-				if (this.tempBuffer.get(0) == 0 && this.tempBuffer.position() == 1) { // Regular end, no
-					// buffer will follow
-					throw new EOFException();
-				} else {
-					throw new IOException("Deserialization error: Expected at least "
-						+ this.tempBuffer.remaining() + " more bytes to follow");
-				}
-			} else if (bytesRead == 0) {
-				try {
-					Thread.sleep(50);
-				} catch (InterruptedException e) {
-				}
-			}
-
-			if (!this.tempBuffer.hasRemaining()) {
-				this.bufferExistanceDeserialized = true;
-				this.tempBuffer.position(0);
-				this.tempBuffer.limit(SIZEOFINT);
-				if (this.tempBuffer.get(0) == 0) {
-					// No buffer will follow, we are done
-					this.transferEnvelope.setBuffer(null);
-					this.deserializationState = DeserializationState.FULLYDESERIALIZED;
-					return false;
-				}
-			} else {
-				return true;
-			}
-		}
-
-		if (this.sizeOfBuffer < 0) {
-
-			// We need to deserialize the size of the buffer
-			final int bytesRead = readableByteChannel.read(this.tempBuffer);
-			if (bytesRead == -1) {
-				throw new IOException("Deserialization error: Expected at least " + this.tempBuffer.remaining()
-					+ " more bytes to follow");
-			}
-
-			if (!this.tempBuffer.hasRemaining()) {
-				this.sizeOfBuffer = byteBufferToInteger(this.tempBuffer, 0);
-				// System.out.println("INCOMING: Buffer size is " + this.sizeOfBuffer);
-
-				if (this.sizeOfBuffer <= 0) {
-					throw new IOException("Invalid buffer size: " + this.sizeOfBuffer);
-				}
-			} else {
-				return true;
-			}
-		}
-
-		if (readBufferData(readableByteChannel)) {
-			return true;
-		}
-
-		this.transferEnvelope.setBuffer(this.buffer);
-		this.deserializationState = DeserializationState.FULLYDESERIALIZED;
-		return false;
-	}
-
-	public TransferEnvelope getFullyDeserializedTransferEnvelope() {
-
-		if (this.deserializationState == DeserializationState.FULLYDESERIALIZED) {
-			this.deserializationState = DeserializationState.NOTDESERIALIZED;
-			return this.transferEnvelope;
-		}
-
-		return null;
-	}
-
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-
-	public void reset() {
-		this.deserializationState = DeserializationState.NOTDESERIALIZED;
-		this.sequenceNumberDeserializationStarted = false;
-	}
-
-	public boolean hasUnfinishedData() {
-
-		if (this.deserializationState != DeserializationState.NOTDESERIALIZED) {
-			return true;
-		}
-
-		return this.channelIDDeserializationBuffer.hasUnfinishedData();
-	}
-
-	private int byteBufferToInteger(ByteBuffer byteBuffer, int offset) throws IOException {
-
-		int integer = 0;
-
-		if ((offset + SIZEOFINT) > byteBuffer.limit()) {
-			throw new IOException("Cannot convert byte buffer to integer, not enough data in byte buffer ("
-				+ byteBuffer.limit() + ")");
-		}
-
-		for (int i = 0; i < SIZEOFINT; ++i) {
-			integer |= (byteBuffer.get((offset + SIZEOFINT - 1) - i) & 0xff) << (i << 3);
-		}
-
-		return integer;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractSerializer.java
deleted file mode 100644
index bdbc592..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/AbstractSerializer.java
+++ /dev/null
@@ -1,274 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.core.io.IOReadableWritable;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.AbstractID;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.SerializationBuffer;
-
-public abstract class AbstractSerializer {
-
-	private enum SerializationState {
-		NOTSERIALIZED,
-		SEQUENCENUMBERSERIALIZED,
-		JOBIDSERIALIZED,
-		SOURCESERIALIZED,
-		NOTIFICATIONSSERIALIZED,
-		FULLYSERIALIZED
-	};
-
-	private final static int SIZEOFINT = 4;
-
-	private TransferEnvelope transferEnvelope = null;
-
-	private SerializationState serializationState;
-
-	private final SerializationBuffer<IOReadableWritable> serializationBuffer = new SerializationBuffer<IOReadableWritable>();
-
-	private final ByteBuffer tempBuffer = ByteBuffer.allocate(64); // TODO: Make this configurable
-
-	private boolean serializationStarted = false;
-
-	private boolean bufferExistanceSerialized = false;
-
-	private boolean eventListExistanceSerialized = false;
-
-	public final void setTransferEnvelope(TransferEnvelope transferEnvelope) {
-
-		this.transferEnvelope = transferEnvelope;
-		reset();
-	}
-
-	protected final SerializationBuffer<IOReadableWritable> getSerializationBuffer() {
-
-		return this.serializationBuffer;
-	}
-
-	protected final ByteBuffer getTempBuffer() {
-		return this.tempBuffer;
-	}
-
-	public final boolean write(WritableByteChannel writableByteChannel) throws IOException {
-
-		while (true) {
-
-			boolean moreDataFollows = false;
-
-			// System.out.println("OUTGOING State: " + this.serializationState);
-
-			switch (serializationState) {
-			case NOTSERIALIZED:
-				moreDataFollows = writeSequenceNumber(writableByteChannel, this.transferEnvelope.getSequenceNumber());
-				break;
-			case SEQUENCENUMBERSERIALIZED:
-				moreDataFollows = writeID(writableByteChannel, this.transferEnvelope.getJobID());
-				break;
-			case JOBIDSERIALIZED:
-				moreDataFollows = writeID(writableByteChannel, this.transferEnvelope.getSource());
-				break;
-			case SOURCESERIALIZED:
-				moreDataFollows = writeNotification(writableByteChannel, this.transferEnvelope.getEventList());
-				break;
-			case NOTIFICATIONSSERIALIZED:
-				moreDataFollows = writeBuffer(writableByteChannel, this.transferEnvelope.getBuffer());
-				break;
-			case FULLYSERIALIZED:
-				return false;
-			}
-
-			if (moreDataFollows) {
-				return true;
-			}
-		}
-	}
-
-	private boolean writeSequenceNumber(WritableByteChannel writableByteChannel, int sequenceNumber) throws IOException {
-
-		if (sequenceNumber < 0) {
-			throw new IOException("Invalid sequence number: " + sequenceNumber);
-		}
-
-		if (!this.serializationStarted) {
-			this.tempBuffer.clear();
-			integerToByteBuffer(sequenceNumber, 0, this.tempBuffer);
-			this.serializationStarted = true;
-		}
-
-		if (writableByteChannel.write(this.tempBuffer) == -1) {
-			throw new IOException("Unexpected end of stream while serializing the sequence number");
-		}
-
-		if (!this.tempBuffer.hasRemaining()) {
-			this.serializationState = SerializationState.SEQUENCENUMBERSERIALIZED;
-			this.serializationStarted = false;
-			return false;
-		}
-
-		return true;
-	}
-
-	private boolean writeID(WritableByteChannel writableByteChannel, AbstractID id) throws IOException {
-
-		if (!writeIOReadableWritable(writableByteChannel, id)) {
-			// We're done, all the data has been written to the channel
-			if (this.serializationState == SerializationState.SEQUENCENUMBERSERIALIZED) {
-				// System.out.println("OUTGOING Serialized source: " + channelID);
-				this.serializationState = SerializationState.JOBIDSERIALIZED;
-			} else {
-				// System.out.println("OUTGOING Serialized target: " + channelID);
-				this.serializationState = SerializationState.SOURCESERIALIZED;
-			}
-			return false;
-		}
-
-		return true;
-	}
-
-	private boolean writeIOReadableWritable(WritableByteChannel writableByteChannel,
-			IOReadableWritable ioReadableWritable) throws IOException {
-
-		if (!this.serializationStarted) {
-			this.serializationBuffer.clear();
-			this.serializationBuffer.serialize(ioReadableWritable);
-			this.serializationStarted = true;
-		}
-
-		if (this.serializationBuffer.dataLeftFromPreviousSerialization()) {
-			this.serializationBuffer.read(writableByteChannel);
-		} else {
-			this.serializationStarted = false;
-			return false;
-		}
-
-		return true;
-	}
-
-	private boolean writeNotification(WritableByteChannel writableByteChannel, EventList notificationList)
-			throws IOException {
-
-		if (!this.eventListExistanceSerialized) {
-			this.tempBuffer.position(0);
-			if (notificationList == null) {
-				this.tempBuffer.put(0, (byte) 0);
-			} else {
-				this.tempBuffer.put(0, (byte) 1);
-			}
-			this.tempBuffer.limit(1);
-
-			writableByteChannel.write(this.tempBuffer);
-			if (this.tempBuffer.hasRemaining()) {
-				return true;
-			}
-
-			this.eventListExistanceSerialized = true;
-		}
-
-		if (notificationList != null) {
-			if (writeIOReadableWritable(writableByteChannel, notificationList)) {
-				return true;
-			}
-		}
-
-		this.serializationState = SerializationState.NOTIFICATIONSSERIALIZED;
-		return false;
-	}
-
-	public void reset() {
-		this.serializationState = SerializationState.NOTSERIALIZED;
-		this.serializationStarted = false;
-		this.bufferExistanceSerialized = false;
-		this.eventListExistanceSerialized = false;
-	}
-
-	private boolean writeBuffer(WritableByteChannel writableByteChannel, Buffer buffer) throws IOException {
-
-		while (true) {
-
-			if (!this.bufferExistanceSerialized) {
-
-				if (!this.serializationStarted) {
-					this.tempBuffer.position(0);
-
-					if (buffer == null) {
-						this.tempBuffer.put(0, (byte) 0);
-						this.tempBuffer.limit(1);
-					} else {
-						this.tempBuffer.put(0, (byte) 1);
-						// System.out.println("OUTGOING: Buffer size is " + buffer.size());
-						integerToByteBuffer(buffer.size(), 1, this.tempBuffer);
-					}
-					this.serializationStarted = true;
-				}
-
-				if (this.tempBuffer.hasRemaining()) {
-					if (writableByteChannel.write(tempBuffer) == 0) {
-						return true;
-					}
-				} else {
-					this.bufferExistanceSerialized = true;
-					this.serializationStarted = false;
-					if (buffer == null) {
-						// That's it, we're done. No buffer will follow
-						this.serializationState = SerializationState.FULLYSERIALIZED;
-						return false;
-					}
-				}
-
-			} else {
-
-				if (!writeBufferData(writableByteChannel, buffer)) {
-					this.serializationState = SerializationState.FULLYSERIALIZED;
-					return false;
-				}
-
-				return true;
-			}
-		}
-	}
-
-	/**
-	 * Writes the buffer's actual data.
-	 * 
-	 * @param writableByteChannel
-	 *        the channel to write the buffer data to
-	 * @param buffer
-	 *        the buffer whose data shall be written
-	 * @return <code>true</code> if the buffer has more data to be written, <code>false</code> otherwise
-	 * @throws IOException
-	 *         thrown if an I/O error occurs while writing the buffer's data
-	 */
-	protected abstract boolean writeBufferData(WritableByteChannel writableByteChannel, Buffer buffer)
-			throws IOException;
-
-	private void integerToByteBuffer(int integerToSerialize, int offset, ByteBuffer byteBuffer) throws IOException {
-
-		if ((offset + SIZEOFINT) > byteBuffer.capacity()) {
-			throw new IOException("Cannot convert integer to byte buffer, buffer is too small (" + byteBuffer.limit()
-				+ ", required " + (offset + SIZEOFINT) + ")");
-		}
-
-		byteBuffer.limit(offset + SIZEOFINT);
-
-		for (int i = 0; i < SIZEOFINT; ++i) {
-			final int shift = i << 3; // i * 8
-			byteBuffer.put((offset + SIZEOFINT - 1) - i, (byte) ((integerToSerialize & (0xff << shift)) >>> shift));
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/CapacityConstrainedArrayQueue.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/CapacityConstrainedArrayQueue.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/CapacityConstrainedArrayQueue.java
deleted file mode 100644
index 9e9ac66..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/CapacityConstrainedArrayQueue.java
+++ /dev/null
@@ -1,322 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.Queue;
-
-/**
- * This class implements a simple, capacity-constrained queue. The implementation is highlighted by its small
- * overhead in terms of memory consumption and management.
- * <p>
- * This class is not thread-safe.
- * 
- * @param <E>
- *        the type of the elements stored in this queue
- */
-public final class CapacityConstrainedArrayQueue<E> implements Queue<E> {
-
-	/**
-	 * The maximum capacity supported by this type of queue.
-	 */
-	private static final int MAX_CAPACITY = 128;
-
-	/**
-	 * The array storing the actual elements of the queue.
-	 */
-	private final E[] elements;
-
-	/**
-	 * Index to the current head of the queue.
-	 */
-	private byte head = 0;
-
-	/**
-	 * Index to the current tail of the queue.
-	 */
-	private byte tail = 0;
-
-	/**
-	 * The current size of the queue.
-	 */
-	private byte size = 0;
-
-	/**
-	 * Constructs a new capacity-constrained array queue.
-	 * 
-	 * @param capacity
-	 *        the capacity limit of the queue
-	 */
-	@SuppressWarnings("unchecked")
-	public CapacityConstrainedArrayQueue(final int capacity) {
-
-		if (capacity > MAX_CAPACITY) {
-			throw new IllegalArgumentException("This queue does only support capacities up to " + MAX_CAPACITY);
-		}
-
-		this.elements = (E[]) new Object[capacity];
-	}
-
-
-	@Override
-	public boolean addAll(final Collection<? extends E> c) {
-
-		throw new UnsupportedOperationException("addAll is not supported on this type of queue");
-	}
-
-	/**
-	 * Checks if there is capacity left in the queue.
-	 * 
-	 * @return <code>true</code> if there is capacity left in the queue, <code>false</code> otherwise
-	 */
-	private boolean capacityLeft() {
-
-		return ((this.elements.length - this.size) > 0);
-	}
-
-	/**
-	 * Increments the head of the queue.
-	 */
-	private void incrementHead() {
-
-		if (++this.head == this.elements.length) {
-			this.head = 0;
-		}
-	}
-
-	/**
-	 * Increments the tail of the queue.
-	 */
-	private void incrementTail() {
-
-		if (++this.tail == this.elements.length) {
-			this.tail = 0;
-		}
-	}
-
-
-	@Override
-	public void clear() {
-
-		this.head = 0;
-		this.tail = 0;
-		this.size = 0;
-	}
-
-
-	@Override
-	public boolean containsAll(final Collection<?> c) {
-
-		throw new UnsupportedOperationException("containsAll is not supported on this type of queue");
-	}
-
-
-	@Override
-	public boolean isEmpty() {
-
-		return (this.size == 0);
-	}
-
-
-	@Override
-	public boolean removeAll(final Collection<?> c) {
-
-		throw new UnsupportedOperationException("removeAll is not supported on this type of queue");
-	}
-
-
-	@Override
-	public boolean retainAll(final Collection<?> c) {
-
-		throw new UnsupportedOperationException("retainAll is not supported on this type of queue");
-	}
-
-
-	@Override
-	public Object[] toArray() {
-
-		throw new UnsupportedOperationException("toArray is not supported on this type of queue");
-	}
-
-
-	@Override
-	public <T> T[] toArray(T[] a) {
-
-		throw new UnsupportedOperationException("toArray is not supported on this type of queue");
-	}
-
-
-	@Override
-	public boolean add(final E arg0) {
-
-		throw new UnsupportedOperationException("add is not supported on this type of queue");
-	}
-
-
-	@Override
-	public boolean contains(final Object arg0) {
-
-		throw new UnsupportedOperationException("contains is not supported on this type of queue");
-	}
-
-
-	@Override
-	public E element() {
-
-		throw new UnsupportedOperationException("element is not supported on this type of queue");
-	}
-
-
-	@Override
-	public Iterator<E> iterator() {
-
-		return new CapacityConstrainedArrayQueueIterator(this.head);
-	}
-
-
-	@Override
-	public boolean offer(final E arg0) {
-
-		if (!capacityLeft()) {
-			return false;
-		}
-
-		this.elements[this.tail] = arg0;
-		incrementTail();
-		++this.size;
-
-		return true;
-	}
-
-
-	@Override
-	public E peek() {
-
-		if (isEmpty()) {
-			return null;
-		}
-
-		return this.elements[this.head];
-	}
-
-
-	@Override
-	public E poll() {
-
-		if (isEmpty()) {
-			return null;
-		}
-
-		final E retVal = this.elements[this.head];
-		incrementHead();
-		--this.size;
-
-		return retVal;
-	}
-
-
-	@Override
-	public E remove() {
-
-		final E retVal = poll();
-		if (retVal == null) {
-			throw new NoSuchElementException();
-		}
-
-		return retVal;
-	}
-
-
-	@Override
-	public boolean remove(final Object arg0) {
-
-		throw new UnsupportedOperationException("remove is not supported on this type of queue");
-	}
-
-
-	@Override
-	public int size() {
-
-		return this.size;
-	}
-
-	/**
-	 * This class implements an iterator for the capacity-constrained array queue.
-	 * 
-	 */
-	private final class CapacityConstrainedArrayQueueIterator implements Iterator<E> {
-
-		/**
-		 * The current position of the iterator.
-		 */
-		private byte pos;
-
-		/**
-		 * Counter how many this the position index has been modified.
-		 */
-		private byte count = 0;
-
-		/**
-		 * Constructs a new capacity-constrained array queue iterator.
-		 * 
-		 * @param startPos
-		 *        the start position of the iterator
-		 */
-		private CapacityConstrainedArrayQueueIterator(final byte startPos) {
-			this.pos = startPos;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public boolean hasNext() {
-
-			if (this.count < size) {
-				return true;
-			}
-
-			return false;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public E next() {
-
-			final E retVal = elements[this.pos];
-
-			if (++this.pos == elements.length) {
-				this.pos = 0;
-			}
-
-			++this.count;
-
-			return retVal;
-		}
-
-		/**
-		 * {@inheritDoc}
-		 */
-		@Override
-		public void remove() {
-
-			throw new UnsupportedOperationException("remove is not supported by this iterator");
-		}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializer.java
deleted file mode 100644
index 4ff6965..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultDeserializer.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.IOException;
-import java.nio.channels.ReadableByteChannel;
-
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProviderBroker;
-
-public final class DefaultDeserializer extends AbstractDeserializer {
-
-	private final BufferProviderBroker bufferProviderBroker;
-
-	private BufferProvider bufferProvider = null;
-
-	private JobID lastDeserializedJobID = null;
-
-	private ChannelID lastDeserializedSourceID = null;
-
-	public DefaultDeserializer(final BufferProviderBroker bufferProviderBroker) {
-		this.bufferProviderBroker = bufferProviderBroker;
-	}
-
-
-	@Override
-	protected boolean readBufferData(final ReadableByteChannel readableByteChannel) throws IOException,
-			NoBufferAvailableException {
-
-		if (getBuffer() == null) {
-
-			// Find buffer provider for this channel
-			if (!getDeserializedJobID().equals(this.lastDeserializedJobID)
-				|| !getDeserializedSourceID().equals(this.lastDeserializedSourceID)) {
-
-				try {
-					this.bufferProvider = this.bufferProviderBroker.getBufferProvider(getDeserializedJobID(),
-						getDeserializedSourceID());
-				} catch (InterruptedException e) {
-					return true;
-				}
-
-				this.lastDeserializedJobID = getDeserializedJobID();
-				this.lastDeserializedSourceID = getDeserializedSourceID();
-			}
-
-			final Buffer buf = this.bufferProvider.requestEmptyBuffer(getSizeOfBuffer());
-
-			if (buf == null) {
-				throw new NoBufferAvailableException(this.bufferProvider);
-			}
-
-			setBuffer(buf);
-
-		} else {
-
-			final Buffer buffer = getBuffer();
-
-			final int bytesWritten = buffer.write(readableByteChannel);
-
-			if (!buffer.hasRemaining()) {
-				// We are done, the buffer has been fully read
-				buffer.flip();
-				return false;
-			} else {
-				if (bytesWritten == -1) {
-					throw new IOException("Deserialization error: Expected at least " + buffer.remaining()
-						+ " more bytes to follow");
-				}
-			}
-		}
-
-		return true;
-	}
-
-	public BufferProvider getBufferProvider() {
-
-		return this.bufferProvider;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializer.java
deleted file mode 100644
index 34be592..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/DefaultSerializer.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.IOException;
-import java.nio.channels.WritableByteChannel;
-
-import eu.stratosphere.nephele.io.channels.Buffer;
-
-/**
- * This class is the default implementation to serialize a {@link TransferEnvelope} into a byte stream. In case the
- * transfer envelope contains a buffer, this implementation copies the buffer's data into the byte stream.
- * 
- */
-public class DefaultSerializer extends AbstractSerializer {
-
-
-	@Override
-	protected boolean writeBufferData(final WritableByteChannel writableByteChannel, final Buffer buffer)
-			throws IOException {
-
-		buffer.writeTo(writableByteChannel);
-
-		return buffer.hasRemaining();
-	}
-
-}


Mime
View raw message