flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [16/30] Offer buffer-oriented API for I/O (#25)
Date Mon, 09 Jun 2014 18:30:51 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/NoBufferAvailableException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/NoBufferAvailableException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/NoBufferAvailableException.java
deleted file mode 100644
index fe25837..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/NoBufferAvailableException.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import eu.stratosphere.nephele.taskmanager.bufferprovider.BufferProvider;
-
-/**
- * This exception is thrown to indicate that the deserialization process of a {@link TransferEnvelope} could not be
- * continued because a {@link Buffer} to store the envelope's content is currently not available.
- * 
- */
-public final class NoBufferAvailableException extends Exception {
-
-	/**
-	 * Generated serial UID.
-	 */
-	private static final long serialVersionUID = -9164212953646457026L;
-
-	/**
-	 * The buffer provider which could not deliver a buffer.
-	 */
-	private final BufferProvider bufferProvider;
-
-	/**
-	 * Constructs a new exception.
-	 * 
-	 * @param bufferProvider
-	 *        the buffer provider which could not deliver a buffer
-	 */
-	NoBufferAvailableException(final BufferProvider bufferProvider) {
-		this.bufferProvider = bufferProvider;
-	}
-
-	/**
-	 * Returns the buffer provider which could not deliver a buffer.
-	 * 
-	 * @return the buffer provider which could not deliver a buffer
-	 */
-	public BufferProvider getBufferProvider() {
-		return this.bufferProvider;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelope.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelope.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelope.java
deleted file mode 100644
index 32c56c5..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelope.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.IOException;
-
-import eu.stratosphere.nephele.event.task.AbstractEvent;
-import eu.stratosphere.nephele.event.task.EventList;
-import eu.stratosphere.nephele.io.channels.Buffer;
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.jobgraph.JobID;
-
-public final class TransferEnvelope {
-
-	private final JobID jobID;
-
-	private final ChannelID source;
-
-	private final int sequenceNumber;
-
-	private EventList eventList;
-
-	private Buffer buffer = null;
-
-	public TransferEnvelope(int sequenceNumber, JobID jobID, ChannelID source) {
-		this(sequenceNumber, jobID, source, null);
-	}
-
-	public TransferEnvelope(int sequenceNumber, JobID jobID, ChannelID source, EventList eventList) {
-
-		this.sequenceNumber = sequenceNumber;
-		this.jobID = jobID;
-		this.source = source;
-		this.eventList = eventList;
-	}
-
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	public ChannelID getSource() {
-		return this.source;
-	}
-
-	public void addEvent(AbstractEvent event) {
-
-		if (this.eventList == null) {
-			this.eventList = new EventList();
-		}
-
-		this.eventList.add(event);
-	}
-
-	public EventList getEventList() {
-
-		return this.eventList;
-	}
-
-	public int getSequenceNumber() {
-		return this.sequenceNumber;
-	}
-
-	public void setBuffer(Buffer buffer) {
-		this.buffer = buffer;
-	}
-
-	public Buffer getBuffer() {
-		return this.buffer;
-	}
-
-	public TransferEnvelope duplicate() throws IOException, InterruptedException {
-
-		final TransferEnvelope duplicatedTransferEnvelope = new TransferEnvelope(this.sequenceNumber, this.jobID,
-			this.source, this.eventList); // No need to duplicate event list
-
-		if (this.buffer != null) {
-			duplicatedTransferEnvelope.buffer = this.buffer.duplicate();
-		} else {
-			duplicatedTransferEnvelope.buffer = null;
-		}
-
-		return duplicatedTransferEnvelope;
-	}
-
-	public TransferEnvelope duplicateWithoutBuffer() {
-
-		final TransferEnvelope duplicatedTransferEnvelope = new TransferEnvelope(this.sequenceNumber, this.jobID,
-			this.source, this.eventList); // No need to duplicate event list
-
-		duplicatedTransferEnvelope.buffer = null;
-
-		return duplicatedTransferEnvelope;
-	}
-
-	@Override
-	public boolean equals(final Object obj) {
-
-		if (!(obj instanceof TransferEnvelope)) {
-			return false;
-		}
-
-		final TransferEnvelope te = (TransferEnvelope) obj;
-
-		if (!this.jobID.equals(te.jobID)) {
-			return false;
-		}
-
-		if (!this.source.equals(te.source)) {
-			return false;
-		}
-
-		if (this.sequenceNumber != te.sequenceNumber) {
-			return false;
-		}
-
-		if (this.buffer == null) {
-			if (te.buffer != null) {
-				return false;
-			}
-			// Both are null
-		} else {
-			if (te.buffer == null) {
-				return false;
-			}
-			// Both are non-null
-			if (!this.buffer.equals(te.buffer)) {
-				return false;
-			}
-		}
-
-		if (this.eventList == null) {
-			if (te.eventList != null) {
-				return false;
-			}
-			// Both are null
-		} else {
-			if (te.eventList == null) {
-				return false;
-			}
-			// Both are non-null
-			if (!this.eventList.equals(te.eventList)) {
-				return false;
-			}
-		}
-
-		return true;
-	}
-
-	@Override
-	public int hashCode() {
-
-		return (31 * this.sequenceNumber * this.jobID.hashCode());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeDispatcher.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeDispatcher.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeDispatcher.java
deleted file mode 100644
index 7e1fb87..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeDispatcher.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.io.IOException;
-
-/**
- * A transfer envelope dispatcher receives {@link TransferEnvelopes} and sends them to all of this destinations.
- * 
- */
-public interface TransferEnvelopeDispatcher {
-
-	/**
-	 * Processes a transfer envelope from an output channel. The method may block until the system has allocated enough
-	 * resources to further process the envelope.
-	 * 
-	 * @param transferEnvelope
-	 *        the transfer envelope to be processed
-	 */
-	void processEnvelopeFromOutputChannel(TransferEnvelope transferEnvelope) throws IOException, InterruptedException;
-
-	void processEnvelopeFromInputChannel(TransferEnvelope transferEnvelope) throws IOException, InterruptedException;
-
-	void processEnvelopeFromNetwork(TransferEnvelope transferEnvelope, boolean freeSourceBuffer) throws IOException,
-			InterruptedException;
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeReceiverList.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeReceiverList.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeReceiverList.java
deleted file mode 100644
index 678897a..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/taskmanager/transferenvelope/TransferEnvelopeReceiverList.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.taskmanager.transferenvelope;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import eu.stratosphere.nephele.io.channels.ChannelID;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.ConnectionInfoLookupResponse;
-import eu.stratosphere.nephele.taskmanager.bytebuffered.RemoteReceiver;
-
-/**
- * A transfer envelope receiver list contains all recipients of a transfer envelope. Their are three different types of
- * receivers: Local receivers identified by {@link ChannelID} objects, remote receivers identified by
- * {@link InetAddress} objects and finally checkpoints which are identified by
- * <p>
- * This class is thread-safe.
- * 
- */
-public class TransferEnvelopeReceiverList {
-
-	private final List<ChannelID> localReceivers;
-
-	private final List<RemoteReceiver> remoteReceivers;
-
-	public TransferEnvelopeReceiverList(final ConnectionInfoLookupResponse cilr) {
-
-		this.localReceivers = Collections.unmodifiableList(cilr.getLocalTargets());
-		this.remoteReceivers = Collections.unmodifiableList(cilr.getRemoteTargets());
-	}
-
-	public TransferEnvelopeReceiverList(final ChannelID localReceiver) {
-
-		final List<ChannelID> lr = new ArrayList<ChannelID>(1);
-		lr.add(localReceiver);
-
-		this.localReceivers = Collections.unmodifiableList(lr);
-		this.remoteReceivers = Collections.emptyList();
-	}
-
-	public TransferEnvelopeReceiverList(final RemoteReceiver remoteReceiver) {
-
-		final List<RemoteReceiver> rr = new ArrayList<RemoteReceiver>(1);
-		rr.add(remoteReceiver);
-
-		this.localReceivers = Collections.emptyList();
-		this.remoteReceivers = Collections.unmodifiableList(rr);
-	}
-
-	public boolean hasLocalReceivers() {
-
-		return (!this.localReceivers.isEmpty());
-	}
-
-	public boolean hasRemoteReceivers() {
-
-		return (!this.remoteReceivers.isEmpty());
-	}
-
-	public int getTotalNumberOfReceivers() {
-
-		return (this.localReceivers.size() + this.remoteReceivers.size());
-	}
-
-	public List<RemoteReceiver> getRemoteReceivers() {
-
-		return this.remoteReceivers;
-	}
-
-	public List<ChannelID> getLocalReceivers() {
-
-		return this.localReceivers;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/BufferPoolConnector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
new file mode 100644
index 0000000..ff1d2be
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/BufferPoolConnector.java
@@ -0,0 +1,45 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.nephele.util;
+
+import eu.stratosphere.core.memory.MemorySegment;
+import eu.stratosphere.runtime.io.BufferRecycler;
+
+import java.util.Queue;
+
+public class BufferPoolConnector implements BufferRecycler {
+
+	/**
+	 * Reference to the memory pool the byte buffer was originally taken from.
+	 */
+	private final Queue<MemorySegment> memoryPool;
+
+	/**
+	 * Constructs a new buffer pool connector
+	 *
+	 * @param bufferPool
+	 *        a reference to the memory pool the byte buffer was originally taken from
+	 */
+	public BufferPoolConnector(final Queue<MemorySegment> bufferPool) {
+		this.memoryPool = bufferPool;
+	}
+
+	@Override
+	public void recycle(final MemorySegment buffer) {
+		synchronized (this.memoryPool) {
+			this.memoryPool.add(buffer);
+			this.memoryPool.notify();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/TaskUtils.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/TaskUtils.java b/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/TaskUtils.java
deleted file mode 100644
index 40f6167..0000000
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/nephele/util/TaskUtils.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
- * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations under the License.
- **********************************************************************************************************************/
-
-package eu.stratosphere.nephele.util;
-
-import eu.stratosphere.nephele.annotations.Stateless;
-import eu.stratosphere.nephele.template.AbstractInvokable;
-
-/**
- * This class implements several convenience methods to determine properties of Nephele task classes.
- * 
- */
-public class TaskUtils {
-
-	/**
-	 * Private constructor, so class cannot be instantiated.
-	 */
-	private TaskUtils() {
-	}
-
-	/**
-	 * Checks if a task is declared to be stateless.
-	 * 
-	 * @param taskClass
-	 *        the class of the task to check
-	 * @return <code>true</code> if the given class is declared to be stateless, <code>false</code> otherwise
-	 */
-	public static boolean isStateless(final Class<? extends AbstractInvokable> taskClass) {
-
-		return taskClass.isAnnotationPresent(Stateless.class);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
index 1795776..ced186b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/io/FakeOutputTask.java
@@ -13,7 +13,7 @@
 
 package eu.stratosphere.pact.runtime.iterative.io;
 
-import eu.stratosphere.nephele.io.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
 import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.types.Record;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
index 2d725b8..c39e3ef 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationHeadPactTask.java
@@ -25,7 +25,6 @@ import eu.stratosphere.api.common.typeutils.TypeComparator;
 import eu.stratosphere.api.common.typeutils.TypeComparatorFactory;
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
-import eu.stratosphere.core.io.IOReadableWritable;
 import eu.stratosphere.core.memory.DataInputView;
 import eu.stratosphere.core.memory.MemorySegment;
 import eu.stratosphere.nephele.io.AbstractRecordWriter;
@@ -78,13 +77,13 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 	private Collector<X> finalOutputCollector;
 
-	private List<AbstractRecordWriter<?>> finalOutputWriters;
+	private List<BufferWriter> finalOutputWriters;
 
 	private TypeSerializerFactory<Y> feedbackTypeSerializer;
 
 	private TypeSerializerFactory<X> solutionTypeSerializer;
 
-	private RecordWriter<?> toSync;
+	private BufferWriter toSync;
 
 	private int initialSolutionSetInput; // undefined for bulk iterations
 
@@ -108,7 +107,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 		// at this time, the outputs to the step function are created
 		// add the outputs for the final solution
-		this.finalOutputWriters = new ArrayList<AbstractRecordWriter<?>>();
+		this.finalOutputWriters = new ArrayList<BufferWriter>();
 		final TaskConfig finalOutConfig = this.config.getIterationHeadFinalOutputConfig();
 		this.finalOutputCollector = RegularPactTask.getOutputCollector(this, finalOutConfig,
 			this.userCodeClassLoader, this.finalOutputWriters, finalOutConfig.getNumOutputs());
@@ -122,7 +121,7 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 			throw new Exception("Error: Inconsistent head task setup - wrong mapping of output gates.");
 		}
 		// now, we can instantiate the sync gate
-		this.toSync = new RecordWriter<IOReadableWritable>(this, IOReadableWritable.class);
+		this.toSync = new BufferWriter(this);
 	}
 
 	/**
@@ -203,6 +202,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 
 	@Override
 	public void run() throws Exception {
+		// initialize the serializers (one per channel) of the record writers
+		RegularPactTask.initOutputWriters(this.finalOutputWriters);
 
 		final String brokerKey = brokerKey();
 		final int workerIndex = getEnvironment().getIndexInSubtaskGroup();
@@ -330,6 +331,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 				streamOutFinalOutputBulk(new InputViewIterator<X>(superstepResult, this.solutionTypeSerializer.getSerializer()));
 			}
 
+			this.finalOutputCollector.close();
+
 		} finally {
 			// make sure we unregister everything from the broker:
 			// - backchannel
@@ -388,7 +391,8 @@ public class IterationHeadPactTask<X, Y, S extends Function, OT> extends Abstrac
 		if (log.isInfoEnabled()) {
 			log.info(formatLogString("sending " + WorkerDoneEvent.class.getSimpleName() + " to sync"));
 		}
-		this.toSync.publishEvent(event);
+
+		this.toSync.broadcastEvent(event);
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationIntermediatePactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationIntermediatePactTask.java
index 4ada733..5bdb3aa 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -19,7 +19,10 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import eu.stratosphere.api.common.functions.Function;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
+import eu.stratosphere.runtime.io.api.BufferWriter;
+import eu.stratosphere.pact.runtime.iterative.concurrent.BlockingBackChannel;
+import eu.stratosphere.runtime.io.channels.EndOfSuperstepEvent;
+import eu.stratosphere.pact.runtime.iterative.event.TerminationEvent;
 import eu.stratosphere.pact.runtime.iterative.io.WorksetUpdateOutputCollector;
 import eu.stratosphere.util.Collector;
 
@@ -107,7 +110,7 @@ public class IterationIntermediatePactTask<S extends Function, OT> extends Abstr
 	}
 
 	private void sendEndOfSuperstep() throws IOException, InterruptedException {
-		for (AbstractRecordWriter<?> eventualOutput : eventualOutputs) {
+		for (BufferWriter eventualOutput : this.eventualOutputs) {
 			eventualOutput.sendEndOfSuperstep();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
index 35786a7..94f9b9f 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -28,7 +28,7 @@ import eu.stratosphere.api.common.aggregators.AggregatorWithName;
 import eu.stratosphere.api.common.aggregators.ConvergenceCriterion;
 import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
 import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.nephele.types.IntegerRecord;
 import eu.stratosphere.pact.runtime.iterative.event.AllWorkersDoneEvent;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
index 5cfe173..859a62d 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/iterative/task/IterationTailPactTask.java
@@ -106,6 +106,7 @@ public class IterationTailPactTask<S extends Function, OT> extends AbstractItera
 				// aggregate workset update element count
 				long numCollected = worksetUpdateOutputCollector.getElementsCollectedAndReset();
 				worksetAggregator.aggregate(numCollected);
+
 			}
 
 			if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputCollector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputCollector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputCollector.java
index f1c945d..a91a59a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputCollector.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputCollector.java
@@ -19,7 +19,7 @@ import java.util.Collections;
 import java.util.List;
 
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
 import eu.stratosphere.util.Collector;
 
@@ -30,7 +30,7 @@ import eu.stratosphere.util.Collector;
 public class OutputCollector<T> implements Collector<T>
 {	
 	// list of writers
-	protected AbstractRecordWriter<SerializationDelegate<T>>[] writers; 
+	protected RecordWriter<SerializationDelegate<T>>[] writers;
 
 	private final SerializationDelegate<T> delegate;
 
@@ -43,10 +43,10 @@ public class OutputCollector<T> implements Collector<T>
 	 * @param writers List of all writers.
 	 */
 	@SuppressWarnings("unchecked")
-	public OutputCollector(List<AbstractRecordWriter<SerializationDelegate<T>>> writers, TypeSerializer<T> serializer)
+	public OutputCollector(List<RecordWriter<SerializationDelegate<T>>> writers, TypeSerializer<T> serializer)
 	{
 		this.delegate = new SerializationDelegate<T>(serializer);
-		this.writers = (AbstractRecordWriter<SerializationDelegate<T>>[]) writers.toArray(new AbstractRecordWriter[writers.size()]);
+		this.writers = (RecordWriter<SerializationDelegate<T>>[]) writers.toArray(new RecordWriter[writers.size()]);
 	}
 	
 	/**
@@ -56,14 +56,14 @@ public class OutputCollector<T> implements Collector<T>
 	 */
 
 	@SuppressWarnings("unchecked")
-	public void addWriter(AbstractRecordWriter<SerializationDelegate<T>> writer)
+	public void addWriter(RecordWriter<SerializationDelegate<T>> writer)
 	{
 		// avoid using the array-list here to reduce one level of object indirection
 		if (this.writers == null) {
-			this.writers = new AbstractRecordWriter[] {writer};
+			this.writers = new RecordWriter[] {writer};
 		}
 		else {
-			AbstractRecordWriter<SerializationDelegate<T>>[] ws = new AbstractRecordWriter[this.writers.length + 1];
+			RecordWriter<SerializationDelegate<T>>[] ws = new RecordWriter[this.writers.length + 1];
 			System.arraycopy(this.writers, 0, ws, 0, this.writers.length);
 			ws[this.writers.length] = writer;
 			this.writers = ws;
@@ -79,7 +79,7 @@ public class OutputCollector<T> implements Collector<T>
 		this.delegate.setInstance(record);
 		try {
 			for (int i = 0; i < writers.length; i++) {
-				this.writers[i].emit(this.delegate);	
+				this.writers[i].emit(this.delegate);
 			}
 		}
 		catch (IOException e) {
@@ -90,19 +90,24 @@ public class OutputCollector<T> implements Collector<T>
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * @see eu.stratosphere.pact.common.stub.Collector#close()
-	 */
 	@Override
 	public void close() {
+		for (RecordWriter<?> writer : writers) {
+			try {
+				writer.flush();
+			} catch (IOException e) {
+				throw new RuntimeException(e.getMessage(), e);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e.getMessage(), e);
+			}
+		}
 	}
 
 	/**
 	 * List of writers that are associated with this output collector
 	 * @return list of writers
 	 */
-	public List<AbstractRecordWriter<SerializationDelegate<T>>> getWriters() {
+	public List<RecordWriter<SerializationDelegate<T>>> getWriters() {
 		return Collections.unmodifiableList(Arrays.asList(this.writers));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
index ebc7ac8..46e3249 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/OutputEmitter.java
@@ -15,7 +15,7 @@ package eu.stratosphere.pact.runtime.shipping;
 
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.typeutils.TypeComparator;
-import eu.stratosphere.nephele.io.ChannelSelector;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
 import eu.stratosphere.pact.runtime.plugable.SerializationDelegate;
 
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputCollector.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputCollector.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputCollector.java
index 852fd3a..4394483 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputCollector.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputCollector.java
@@ -18,7 +18,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
+import eu.stratosphere.runtime.io.api.RecordWriter;
 import eu.stratosphere.types.Record;
 import eu.stratosphere.util.Collector;
 
@@ -27,38 +27,37 @@ import eu.stratosphere.util.Collector;
  * The OutputCollector tracks to which writers a deep-copy must be given and which not.
  */
 public class RecordOutputCollector implements Collector<Record>
-{	
+{
 	// list of writers
-	protected AbstractRecordWriter<Record>[] writers; 
-	
+	protected RecordWriter<Record>[] writers;
+
 	/**
-	 * Initializes the output collector with a set of writers. 
-	 * To specify for a writer that it must be fed with a deep-copy, set the bit in the copy flag bit mask to 1 that 
+	 * Initializes the output collector with a set of writers.
+	 * To specify for a writer that it must be fed with a deep-copy, set the bit in the copy flag bit mask to 1 that
 	 * corresponds to the position of the writer within the {@link List}.
-	 * 
+	 *
 	 * @param writers List of all writers.
 	 */
 	@SuppressWarnings("unchecked")
-	public RecordOutputCollector(List<AbstractRecordWriter<Record>> writers) {
-		
-		this.writers = (AbstractRecordWriter<Record>[]) writers.toArray(new AbstractRecordWriter[writers.size()]);
+	public RecordOutputCollector(List<RecordWriter<Record>> writers) {
+
+		this.writers = (RecordWriter<Record>[]) writers.toArray(new RecordWriter[writers.size()]);
 	}
-	
+
 	/**
 	 * Adds a writer to the OutputCollector.
-	 * 
+	 *
 	 * @param writer The writer to add.
 	 */
-
 	@SuppressWarnings("unchecked")
-	public void addWriter(AbstractRecordWriter<Record> writer)
+	public void addWriter(RecordWriter<Record> writer)
 	{
 		// avoid using the array-list here to reduce one level of object indirection
 		if (this.writers == null) {
-			this.writers = new AbstractRecordWriter[] {writer};
+			this.writers = new RecordWriter[] {writer};
 		}
 		else {
-			AbstractRecordWriter<Record>[] ws = new AbstractRecordWriter[this.writers.length + 1];
+			RecordWriter<Record>[] ws = new RecordWriter[this.writers.length + 1];
 			System.arraycopy(this.writers, 0, ws, 0, this.writers.length);
 			ws[this.writers.length] = writer;
 			this.writers = ws;
@@ -74,7 +73,7 @@ public class RecordOutputCollector implements Collector<Record>
 	{
 		try {
 			for (int i = 0; i < writers.length; i++) {
-				this.writers[i].emit(record);	
+				this.writers[i].emit(record);
 			}
 		}
 		catch (IOException e) {
@@ -85,19 +84,24 @@ public class RecordOutputCollector implements Collector<Record>
 		}
 	}
 
-	/*
-	 * (non-Javadoc)
-	 * @see eu.stratosphere.pact.common.stub.Collector#close()
-	 */
 	@Override
 	public void close() {
+		for (RecordWriter<?> writer : writers) {
+			try {
+				writer.flush();
+			} catch (IOException e) {
+				throw new RuntimeException(e.getMessage(), e);
+			} catch (InterruptedException e) {
+				throw new RuntimeException(e.getMessage(), e);
+			}
+		}
 	}
 
 	/**
 	 * List of writers that are associated with this output collector
 	 * @return list of writers
 	 */
-	public List<AbstractRecordWriter<Record>> getWriters() {
+	public List<RecordWriter<Record>> getWriters() {
 		return Collections.unmodifiableList(Arrays.asList(writers));
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
index 047c1bf..ba352eb 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/shipping/RecordOutputEmitter.java
@@ -16,6 +16,8 @@ package eu.stratosphere.pact.runtime.shipping;
 import eu.stratosphere.api.common.distributions.DataDistribution;
 import eu.stratosphere.api.common.typeutils.TypeComparator;
 import eu.stratosphere.nephele.io.ChannelSelector;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
+import eu.stratosphere.pact.runtime.plugable.pactrecord.RecordComparator;
 import eu.stratosphere.types.Key;
 import eu.stratosphere.types.Record;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
index 250efcc..cb3e782 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSinkTask.java
@@ -15,6 +15,7 @@ package eu.stratosphere.pact.runtime.task;
 
 import java.io.IOException;
 
+import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -29,10 +30,11 @@ import eu.stratosphere.core.fs.FileSystem;
 import eu.stratosphere.core.fs.FileSystem.WriteMode;
 import eu.stratosphere.core.fs.Path;
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.MutableReader;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.MutableUnionRecordReader;
+import eu.stratosphere.runtime.io.api.MutableReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.MutableUnionRecordReader;
 import eu.stratosphere.nephele.template.AbstractOutputTask;
 import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
 import eu.stratosphere.pact.runtime.sort.UnilateralSortMerger;
@@ -183,11 +185,16 @@ public class DataSinkTask<IT> extends AbstractOutputTask {
 			}
 		}
 		catch (Exception ex) {
+			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
+
+			if (ex instanceof CancelTaskException) {
+				// forward canceling exception
+				throw ex;
+			}
 			// drop, if the task was canceled
-			if (!this.taskCanceled) {
-				if (LOG.isErrorEnabled()) {
+			else if (!this.taskCanceled) {
+				if (LOG.isErrorEnabled())
 					LOG.error(getLogString("Error in user code: " + ex.getMessage()), ex);
-				}
 				throw ex;
 			}
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
index 47a8218..af176b9 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/DataSourceTask.java
@@ -17,7 +17,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 
+import eu.stratosphere.pact.runtime.task.chaining.ExceptionInChainedStubException;
+import eu.stratosphere.runtime.io.api.BufferWriter;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -27,6 +30,7 @@ import eu.stratosphere.api.common.typeutils.TypeSerializer;
 import eu.stratosphere.api.common.typeutils.TypeSerializerFactory;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.core.io.InputSplit;
+import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
 import eu.stratosphere.nephele.template.AbstractInputTask;
 import eu.stratosphere.pact.runtime.shipping.OutputCollector;
@@ -41,13 +45,15 @@ import eu.stratosphere.util.Collector;
  * DataSourceTask which is executed by a Nephele task manager. The task reads data and uses an 
  * {@link InputFormat} to create records from the input.
  * 
- * @see eu.stratosphere.api.io.InputFormat
+ * @see eu.stratosphere.api.common.io.InputFormat
  */
 public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 	
 	// Obtain DataSourceTask Logger
 	private static final Log LOG = LogFactory.getLog(DataSourceTask.class);
 
+	private List<BufferWriter> eventualOutputs;
+
 	// Output collector
 	private Collector<OT> output;
 
@@ -116,6 +122,9 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 		final TypeSerializer<OT> serializer = this.serializerFactory.getSerializer();
 		
 		try {
+			// initialize the serializers (one per channel) of the record writers
+			RegularPactTask.initOutputWriters(this.eventualOutputs);
+
 			// start all chained tasks
 			RegularPactTask.openChainedTasks(this.chainedTasks, this);
 			
@@ -249,8 +258,14 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 			
 			RegularPactTask.cancelChainedTasks(this.chainedTasks);
 			
-			// drop exception, if the task was canceled
-			if (!this.taskCanceled) {
+			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
+
+			if (ex instanceof CancelTaskException) {
+				// forward canceling exception
+				throw ex;
+			}
+			else if (!this.taskCanceled) {
+				// drop exception, if the task was canceled
 				RegularPactTask.logAndThrowException(ex, this);
 			}
 		}
@@ -315,7 +330,8 @@ public class DataSourceTask<OT> extends AbstractInputTask<InputSplit> {
 	 */
 	private void initOutputs(ClassLoader cl) throws Exception {
 		this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
-		this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, null);
+		this.eventualOutputs = new ArrayList<BufferWriter>();
+		this.output = RegularPactTask.initOutputs(this, cl, this.config, this.chainedTasks, this.eventualOutputs);
 	}
 	
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
index 7aed8c0..b01799a 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/RegularPactTask.java
@@ -1,5 +1,5 @@
 /***********************************************************************************************************************
- * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
  *
  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
  * the License. You may obtain a copy of the License at
@@ -13,14 +13,6 @@
 
 package eu.stratosphere.pact.runtime.task;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 import eu.stratosphere.api.common.accumulators.Accumulator;
 import eu.stratosphere.api.common.accumulators.AccumulatorHelper;
 import eu.stratosphere.api.common.distributions.DataDistribution;
@@ -34,15 +26,15 @@ import eu.stratosphere.configuration.ConfigConstants;
 import eu.stratosphere.configuration.Configuration;
 import eu.stratosphere.configuration.GlobalConfiguration;
 import eu.stratosphere.core.io.IOReadableWritable;
+import eu.stratosphere.nephele.execution.CancelTaskException;
 import eu.stratosphere.nephele.execution.Environment;
 import eu.stratosphere.nephele.execution.librarycache.LibraryCacheManager;
-import eu.stratosphere.nephele.io.AbstractRecordWriter;
-import eu.stratosphere.nephele.io.BroadcastRecordWriter;
-import eu.stratosphere.nephele.io.ChannelSelector;
-import eu.stratosphere.nephele.io.MutableReader;
-import eu.stratosphere.nephele.io.MutableRecordReader;
-import eu.stratosphere.nephele.io.MutableUnionRecordReader;
-import eu.stratosphere.nephele.io.RecordWriter;
+import eu.stratosphere.runtime.io.api.ChannelSelector;
+import eu.stratosphere.runtime.io.api.RecordWriter;
+import eu.stratosphere.runtime.io.api.MutableReader;
+import eu.stratosphere.runtime.io.api.MutableRecordReader;
+import eu.stratosphere.runtime.io.api.MutableUnionRecordReader;
+import eu.stratosphere.runtime.io.api.BufferWriter;
 import eu.stratosphere.nephele.services.accumulators.AccumulatorEvent;
 import eu.stratosphere.nephele.services.iomanager.IOManager;
 import eu.stratosphere.nephele.services.memorymanager.MemoryManager;
@@ -71,18 +63,22 @@ import eu.stratosphere.types.Record;
 import eu.stratosphere.util.Collector;
 import eu.stratosphere.util.InstantiationUtil;
 import eu.stratosphere.util.MutableObjectIterator;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
 
 /**
  * The abstract base class for all tasks. Encapsulated common behavior and implements the main life-cycle
  * of the user code.
  */
 public class RegularPactTask<S extends Function, OT> extends AbstractTask implements PactTaskContext<S, OT> {
-	
+
 	protected static final Log LOG = LogFactory.getLog(RegularPactTask.class);
-	
-	private static final boolean USE_BROARDCAST_WRITERS = GlobalConfiguration.getBoolean(
-		ConfigConstants.USE_MULTICAST_FOR_BROADCAST, ConfigConstants.DEFAULT_USE_MULTICAST_FOR_BROADCAST);
-	
+
 	// --------------------------------------------------------------------------------------------
 
 	/**
@@ -95,7 +91,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * The instantiated user code of this task's main operator (driver). May be null if the operator has no udf.
 	 */
 	protected S stub;
-	
+
 	/**
 	 * The udf's runtime context.
 	 */
@@ -111,13 +107,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * The output writers for the data that this task forwards to the next task. The latest driver (the central, if no chained
 	 * drivers exist, otherwise the last chained driver) produces its output to these writers.
 	 */
-	protected List<AbstractRecordWriter<?>> eventualOutputs;
-	
+	protected List<BufferWriter> eventualOutputs;
+
 	/**
 	 * The input readers to this task.
 	 */
 	protected MutableReader<?>[] inputReaders;
-	
+
 	/**
 	 * The input readers for the configured broadcast variables for this task.
 	 */
@@ -127,7 +123,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * The inputs reader, wrapped in an iterator. Prior to the local strategies, etc...
 	 */
 	protected MutableObjectIterator<?>[] inputIterators;
-	
+
 	/**
 	 * The input readers for the configured broadcast variables, wrapped in an iterator. 
 	 * Prior to the local strategies, etc...
@@ -142,18 +138,18 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * The local strategies that are applied on the inputs.
 	 */
 	protected volatile CloseableInputProvider<?>[] localStrategies;
-	
+
 	/**
 	 * The optional temp barriers on the inputs for dead-lock breaking. Are
 	 * optionally resettable.
 	 */
 	protected volatile TempBarrier<?>[] tempBarriers;
-	
+
 	/**
 	 * The resettable inputs in the case where no temp barrier is needed.
 	 */
 	protected volatile SpillingResettableMutableObjectIterator<?>[] resettableInputs;
-	
+
 	/**
 	 * The inputs to the operator. Return the readers' data after the application of the local strategy
 	 * and the temp-table barrier.
@@ -189,24 +185,24 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * A list of chained drivers, if there are any.
 	 */
 	protected ArrayList<ChainedDriver<?, ?>> chainedTasks;
-	
+
 	/**
 	 * Certain inputs may be excluded from resetting. For example, the initial partial solution
 	 * in an iteration head must not be reseted (it is read through the back channel), when all
 	 * others are reseted.
 	 */
 	private boolean[] excludeFromReset;
-	
+
 	/**
 	 * Flag indicating for each input whether it is cached and can be reseted.
 	 */
 	private boolean[] inputIsCached;
-			
+
 	/**
 	 * flag indicating for each input whether it must be asynchronously materialized.
 	 */
 	private boolean[] inputIsAsyncMaterialized;
-	
+
 	/**
 	 * The amount of memory per input that is dedicated to the materialization.
 	 */
@@ -283,16 +279,18 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 */
 	@Override
 	public void invoke() throws Exception {
-		
-		if (LOG.isDebugEnabled()) {
+
+		if (LOG.isDebugEnabled())
 			LOG.debug(formatLogString("Start task code."));
-		}
-		
+
 		// whatever happens in this scope, make sure that the local strategies are cleaned up!
 		// note that the initialization of the local strategies is in the try-finally block as well,
 		// so that the thread that creates them catches its own errors that may happen in that process.
 		// this is especially important, since there may be asynchronous closes (such as through canceling).
 		try {
+			// initialize the serializers (one per channel) of the record writers
+			initOutputWriters(this.eventualOutputs);
+
 			// initialize the remaining data structures on the input and trigger the local processing
 			// the local processing includes building the dams / caches
 			try {
@@ -352,23 +350,23 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				throw new RuntimeException("Initializing the input processing failed" +
 					e.getMessage() == null ? "." : ": " + e.getMessage(), e);
 			}
-			
+
 			if (!this.running) {
 				if (LOG.isDebugEnabled()) {
 					LOG.debug(formatLogString("Task cancelled before task code was started."));
 				}
 				return;
 			}
-			
+
 			// pre main-function initialization
 			initialize();
-			
+
 			// read the broadcast variables
 			for (int i = 0; i < this.config.getNumBroadcastInputs(); i++) {
 				final String name = this.config.getBroadcastInputName(i);
 				readAndSetBroadcastInput(i, name, this.runtimeUdfContext);
 			}
-	
+
 			// the work goes here
 			run();
 		}
@@ -376,7 +374,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			// clean up in any case!
 			closeLocalStrategiesAndCaches();
 		}
-		
+
 		if (this.running) {
 			if (LOG.isDebugEnabled()) {
 				LOG.debug(formatLogString("Finished task code."));
@@ -387,15 +385,14 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			}
 		}
 	}
-	
+
 	@Override
 	public void cancel() throws Exception {
 		this.running = false;
-		
-		if (LOG.isDebugEnabled()) {
+
+		if (LOG.isDebugEnabled())
 			LOG.debug(formatLogString("Cancelling task code"));
-		}
-		
+
 		try {
 			if (this.driver != null) {
 				this.driver.cancel();
@@ -434,7 +431,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		// instantiate the UDF
 		try {
 			final Class<? super S> userCodeFunctionType = this.driver.getStubType();
-			// if the class is null, the driver has no user code 
+			// if the class is null, the driver has no user code
 			if (userCodeFunctionType != null) {
 				this.stub = initStub(userCodeFunctionType);
 			}
@@ -462,7 +459,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		}
 		context.setBroadcastVariable(bcVarName, collection);
 	}
-	
+
 	protected void run() throws Exception {
 		// ---------------------------- Now, the actual processing starts ------------------------
 		// check for asynchronous canceling
@@ -483,7 +480,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				throw new Exception("The data preparation for task '" + this.getEnvironment().getTaskName() +
 					"' , caused an error: " + t.getMessage(), t);
 			}
-			
+
 			// check for canceling
 			if (!this.running) {
 				return;
@@ -517,7 +514,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 
 			// close all chained tasks letting them report failure
 			RegularPactTask.closeChainedTasks(this.chainedTasks, this);
-			
+
 			// Collect the accumulators of all involved UDFs and send them to the
 			// JobManager. close() has been called earlier for all involved UDFs
 			// (using this.stub.close() and closeChainedTasks()), so UDFs can no longer
@@ -539,8 +536,14 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 
 			RegularPactTask.cancelChainedTasks(this.chainedTasks);
 
-			// drop exception, if the task was canceled
-			if (this.running) {
+			ex = ExceptionInChainedStubException.exceptionUnwrap(ex);
+
+			if (ex instanceof CancelTaskException) {
+				// forward canceling exception
+				throw ex;
+			}
+			else if (this.running) {
+				// throw only if task was not cancelled. in the case of canceling, exceptions are expected 
 				RegularPactTask.logAndThrowException(ex, this);
 			}
 		}
@@ -553,9 +556,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 * This method is called at the end of a task, receiving the accumulators of
 	 * the task and the chained tasks. It merges them into a single map of
 	 * accumulators and sends them to the JobManager.
-	 * 
-	 * @param env
-	 * @param accumulators
+	 *
 	 * @param chainedTasks
 	 *          Each chained task might have accumulators which will be merged
 	 *          with the accumulators of the stub.
@@ -566,7 +567,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		// We can merge here the accumulators from the stub and the chained
 		// tasks. Type conflicts can occur here if counters with same name but
 		// different type were used.
-		
+
 		for (ChainedDriver<?, ?> chainedTask : chainedTasks) {
 			Map<String, Accumulator<?, ?>> chainedAccumulators = chainedTask.getStub().getRuntimeContext().getAllAccumulators();
 			AccumulatorHelper.mergeInto(accumulators, chainedAccumulators);
@@ -632,8 +633,6 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		}
 	}
 
-	
-
 	// --------------------------------------------------------------------------------------------
 	//                                 Task Setup and Teardown
 	// --------------------------------------------------------------------------------------------
@@ -665,12 +664,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 
 		chainedTasks.get(numChained - 1).setOutputCollector(newOutputCollector);
 	}
-	
+
 	public TaskConfig getLastTasksConfig() {
 		int numChained = this.chainedTasks.size();
 		return (numChained == 0) ? config : chainedTasks.get(numChained - 1).getTaskConfig();
 	}
-	
+
 	protected S initStub(Class<? super S> stubSuperClass) throws Exception {
 		try {
 			S stub = config.<S>getStubWrapper(this.userCodeClassLoader).getUserCodeObject(stubSuperClass, this.userCodeClassLoader);
@@ -687,7 +686,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			throw new Exception("The stub class is not a proper subclass of " + stubSuperClass.getName(), ccex);
 		}
 	}
-	
+
 	/**
 	 * Creates the record readers for the number of inputs as defined by {@link #getNumTaskInputs()}.
 	 *
@@ -697,9 +696,9 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	protected void initInputReaders() throws Exception {
 		final int numInputs = getNumTaskInputs();
 		final MutableReader<?>[] inputReaders = new MutableReader[numInputs];
-		
+
 		int numGates = 0;
-		
+
 		for (int i = 0; i < numInputs; i++) {
 			//  ---------------- create the input readers ---------------------
 			// in case where a logical input unions multiple physical inputs, create a union reader
@@ -720,13 +719,13 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			}
 		}
 		this.inputReaders = inputReaders;
-		
+
 		// final sanity check
 		if (numGates != this.config.getNumInputs()) {
 			throw new Exception("Illegal configuration: Number of input gates and group sizes are not consistent.");
 		}
 	}
-	
+
 	/**
 	 * Creates the record readers for the extra broadcast inputs as configured by {@link TaskConfig#getNumBroadcastInputs()}.
 	 *
@@ -776,7 +775,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				final TypeComparatorFactory<?> comparatorFactory = this.config.getDriverComparator(i, this.userCodeClassLoader);
 				this.inputComparators[i] = comparatorFactory.createComparator();
 			}
-			
+
 			this.inputIterators[i] = createInputIterator(this.inputReaders[i], this.inputSerializers[i]);
 		}
 	}
@@ -796,34 +795,34 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			this.broadcastInputIterators[i] = createInputIterator(this.broadcastInputReaders[i], this.broadcastInputSerializers[i]);
 		}
 	}
-	
+
 	/**
-	 * 
+	 *
 	 * NOTE: This method must be invoked after the invocation of {@code #initInputReaders()} and
 	 * {@code #initInputSerializersAndComparators(int)}!
-	 * 
+	 *
 	 * @param numInputs
 	 */
 	protected void initLocalStrategies(int numInputs) throws Exception {
-		
+
 		final MemoryManager memMan = getMemoryManager();
 		final IOManager ioMan = getIOManager();
-		
+
 		this.localStrategies = new CloseableInputProvider[numInputs];
 		this.inputs = new MutableObjectIterator[numInputs];
 		this.excludeFromReset = new boolean[numInputs];
 		this.inputIsCached = new boolean[numInputs];
 		this.inputIsAsyncMaterialized = new boolean[numInputs];
 		this.materializationMemory = new int[numInputs];
-		
+
 		// set up the local strategies first, such that the can work before any temp barrier is created
 		for (int i = 0; i < numInputs; i++) {
 			initInputLocalStrategy(i);
 		}
-		
+
 		// we do another loop over the inputs, because we want to instantiate all
 		// sorters, etc before requesting the first input (as this call may block)
-		
+
 		// we have two types of materialized inputs, and both are replayable (can act as a cache)
 		// The first variant materializes in a different thread and hence
 		// acts as a pipeline breaker. this one should only be there, if a pipeline breaker is needed.
@@ -831,15 +830,15 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		// in a pipelined fashion.
 		this.resettableInputs = new SpillingResettableMutableObjectIterator[numInputs];
 		this.tempBarriers = new TempBarrier[numInputs];
-		
+
 		for (int i = 0; i < numInputs; i++) {
 			final int memoryPages;
 			final boolean async = this.config.isInputAsynchronouslyMaterialized(i);
 			final boolean cached =  this.config.isInputCached(i);
-			
+
 			this.inputIsAsyncMaterialized[i] = async;
 			this.inputIsCached[i] = cached;
-			
+
 			if (async || cached) {
 				memoryPages = memMan.computeNumberOfPages(this.config.getInputMaterializationMemory(i));
 				if (memoryPages <= 0) {
@@ -849,7 +848,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			} else {
 				memoryPages = 0;
 			}
-			
+
 			if (async) {
 				@SuppressWarnings({ "unchecked", "rawtypes" })
 				TempBarrier<?> barrier = new TempBarrier(this, getInput(i), this.inputSerializers[i], memMan, ioMan, memoryPages);
@@ -865,7 +864,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			}
 		}
 	}
-	
+
 	protected void resetAllInputs() throws Exception {
 		// close all local-strategies. they will either get re-initialized, or we have
 		// read them now and their data is cached
@@ -875,10 +874,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				this.localStrategies[i] = null;
 			}
 		}
-		
+
 		final MemoryManager memMan = getMemoryManager();
 		final IOManager ioMan = getIOManager();
-		
+
 		// reset the caches, or re-run the input local strategy
 		for (int i = 0; i < this.inputs.length; i++) {
 			if (this.excludeFromReset[i]) {
@@ -892,7 +891,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			} else {
 				// make sure the input is not available directly, but are lazily fetched again
 				this.inputs[i] = null;
-				
+
 				if (this.inputIsCached[i]) {
 					if (this.tempBarriers[i] != null) {
 						this.inputs[i] = this.tempBarriers[i].getIterator();
@@ -908,10 +907,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					if (this.tempBarriers[i] != null) {
 						this.tempBarriers[i].close();
 					}
-					
+
 					// recreate the local strategy
 					initInputLocalStrategy(i);
-					
+
 					if (this.inputIsAsyncMaterialized[i]) {
 						final int pages = this.materializationMemory[i];
 						@SuppressWarnings({ "unchecked", "rawtypes" })
@@ -924,17 +923,17 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			}
 		}
 	}
-	
+
 	protected void excludeFromReset(int inputNum) {
 		this.excludeFromReset[inputNum] = true;
 	}
-	
+
 	private void initInputLocalStrategy(int inputNum) throws Exception {
 		// check if there is already a strategy
 		if (this.localStrategies[inputNum] != null) {
 			throw new IllegalStateException();
 		}
-		
+
 		// now set up the local strategy
 		final LocalStrategy localStrategy = this.config.getInputLocalStrategy(inputNum);
 		if (localStrategy != null) {
@@ -960,7 +959,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 				if (inputNum != 0) {
 					throw new IllegalStateException("Performing combining sort outside a (group)reduce task!");
 				}
-				
+
 				// instantiate ourselves a combiner. we should not use the stub, because the sort and the
 				// subsequent (group)reduce would otherwise share it multi-threaded
 				final Class<S> userCodeFunctionType = this.driver.getStubType();
@@ -986,7 +985,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					this.config.getMemoryInput(inputNum), this.config.getFilehandlesInput(inputNum),
 					this.config.getSpillingThresholdInput(inputNum));
 				cSorter.setUdfConfiguration(this.config.getStubParameters());
-				
+
 				// set the input to null such that it will be lazily fetched from the input strategy
 				this.inputs[inputNum] = null;
 				this.localStrategies[inputNum] = cSorter;
@@ -999,7 +998,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 			this.inputs[inputNum] = this.inputIterators[inputNum];
 		}
 	}
-	
+
 	private <T> TypeComparator<T> getLocalStrategyComparator(int inputNum) throws Exception {
 		TypeComparatorFactory<T> compFact = this.config.getInputComparator(inputNum, this.userCodeClassLoader);
 		if (compFact == null) {
@@ -1030,21 +1029,21 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 //		final MutableObjectIterator<?> iter = new ReaderIterator(reader, serializer);
 //		return iter;
 	}
-	
+
 	protected int getNumTaskInputs() {
 		return this.driver.getNumberOfInputs();
 	}
-	
+
 	/**
 	 * Creates a writer for each output. Creates an OutputCollector which forwards its input to all writers.
 	 * The output collector applies the configured shipping strategies for each writer.
 	 */
 	protected void initOutputs() throws Exception {
 		this.chainedTasks = new ArrayList<ChainedDriver<?, ?>>();
-		this.eventualOutputs = new ArrayList<AbstractRecordWriter<?>>();
+		this.eventualOutputs = new ArrayList<BufferWriter>();
 		this.output = initOutputs(this, this.userCodeClassLoader, this.config, this.chainedTasks, this.eventualOutputs);
 	}
-	
+
 	public RuntimeUDFContext createRuntimeContext(String taskName) {
 		Environment env = getEnvironment();
 		return new RuntimeUDFContext(taskName, env.getCurrentNumberOfSubtasks(), env.getIndexInSubtaskGroup(), env.getCopyTask());
@@ -1054,61 +1053,52 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	//                                   Task Context Signature
 	// -------------------------------------------------------------------------------------------
 
-
 	@Override
 	public TaskConfig getTaskConfig() {
 		return this.config;
 	}
 
-
 	@Override
 	public ClassLoader getUserCodeClassLoader() {
 		return this.userCodeClassLoader;
 	}
 
-
 	@Override
 	public MemoryManager getMemoryManager() {
 		return getEnvironment().getMemoryManager();
 	}
 
-
 	@Override
 	public IOManager getIOManager() {
 		return getEnvironment().getIOManager();
 	}
 
-
 	@Override
 	public S getStub() {
 		return this.stub;
 	}
 
-
 	@Override
 	public Collector<OT> getOutputCollector() {
 		return this.output;
 	}
 
-
 	@Override
 	public AbstractInvokable getOwningNepheleTask() {
 		return this;
 	}
 
-
 	@Override
 	public String formatLogString(String message) {
 		return constructLogString(message, getEnvironment().getTaskName(), this);
 	}
 
-
 	@Override
 	public <X> MutableObjectIterator<X> getInput(int index) {
 		if (index < 0 || index > this.driver.getNumberOfInputs()) {
 			throw new IndexOutOfBoundsException();
 		}
-		
+
 		// check for lazy assignment from input strategies
 		if (this.inputs[index] != null) {
 			@SuppressWarnings("unchecked")
@@ -1235,19 +1225,19 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 *
 	 * @return The OutputCollector that data produced in this task is submitted to.
 	 */
-	public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<AbstractRecordWriter<?>> eventualOutputs, int numOutputs)
+	public static <T> Collector<T> getOutputCollector(AbstractInvokable task, TaskConfig config, ClassLoader cl, List<BufferWriter> eventualOutputs, int numOutputs)
 	throws Exception
 	{
 		if (numOutputs <= 0) {
 			throw new Exception("BUG: The task must have at least one output");
 		}
-		
+
 		// get the factory for the serializer
 		final TypeSerializerFactory<T> serializerFactory = config.getOutputSerializer(cl);
 
 		// special case the Record
 		if (serializerFactory.getDataType().equals(Record.class)) {
-			final List<AbstractRecordWriter<Record>> writers = new ArrayList<AbstractRecordWriter<Record>>(numOutputs);
+			final List<RecordWriter<Record>> writers = new ArrayList<RecordWriter<Record>>(numOutputs);
 
 			// create a writer for each output
 			for (int i = 0; i < numOutputs; i++) {
@@ -1267,18 +1257,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					oe = new RecordOutputEmitter(strategy, comparator, distribution);
 				}
 
-				if (strategy == ShipStrategyType.BROADCAST && USE_BROARDCAST_WRITERS) {
-					if (task instanceof AbstractTask) {
-						writers.add(new BroadcastRecordWriter<Record>((AbstractTask) task, Record.class));
-					} else if (task instanceof AbstractInputTask<?>) {
-						writers.add(new BroadcastRecordWriter<Record>((AbstractInputTask<?>) task, Record.class));
-					}
-				} else {
-					if (task instanceof AbstractTask) {
-						writers.add(new RecordWriter<Record>((AbstractTask) task, Record.class, oe));
-					} else if (task instanceof AbstractInputTask<?>) {
-						writers.add(new RecordWriter<Record>((AbstractInputTask<?>) task, Record.class, oe));
-					}
+				if (task instanceof AbstractTask) {
+					writers.add(new RecordWriter<Record>((AbstractTask) task, oe));
+				} else if (task instanceof AbstractInputTask<?>) {
+					writers.add(new RecordWriter<Record>((AbstractInputTask<?>) task, oe));
 				}
 			}
 			if (eventualOutputs != null) {
@@ -1291,9 +1273,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		}
 		else {
 			// generic case
-			final List<AbstractRecordWriter<SerializationDelegate<T>>> writers = new ArrayList<AbstractRecordWriter<SerializationDelegate<T>>>(numOutputs);
-			@SuppressWarnings("unchecked") // uncritical, simply due to broken generics
-			final Class<SerializationDelegate<T>> delegateClazz = (Class<SerializationDelegate<T>>) (Class<?>) SerializationDelegate.class;
+			final List<RecordWriter<SerializationDelegate<T>>> writers = new ArrayList<RecordWriter<SerializationDelegate<T>>>(numOutputs);
 
 			// create a writer for each output
 			for (int i = 0; i < numOutputs; i++)
@@ -1314,18 +1294,10 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 					oe = new OutputEmitter<T>(strategy, comparator, dataDist);
 				}
 
-				if (strategy == ShipStrategyType.BROADCAST && USE_BROARDCAST_WRITERS) {
-					if (task instanceof AbstractTask) {
-						writers.add(new BroadcastRecordWriter<SerializationDelegate<T>>((AbstractTask) task, delegateClazz));
-					} else if (task instanceof AbstractInputTask<?>) {
-						writers.add(new BroadcastRecordWriter<SerializationDelegate<T>>((AbstractInputTask<?>) task, delegateClazz));
-					}
-				} else {
-					if (task instanceof AbstractTask) {
-						writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractTask) task, delegateClazz, oe));
-					} else if (task instanceof AbstractInputTask<?>) {
-						writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractInputTask<?>) task, delegateClazz, oe));
-					}
+				if (task instanceof AbstractTask) {
+					writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractTask) task, oe));
+				} else if (task instanceof AbstractInputTask<?>) {
+					writers.add(new RecordWriter<SerializationDelegate<T>>((AbstractInputTask<?>) task, oe));
 				}
 			}
 			if (eventualOutputs != null) {
@@ -1341,7 +1313,7 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 	 */
 	@SuppressWarnings("unchecked")
 	public static <T> Collector<T> initOutputs(AbstractInvokable nepheleTask, ClassLoader cl, TaskConfig config,
-					List<ChainedDriver<?, ?>> chainedTasksTarget, List<AbstractRecordWriter<?>> eventualOutputs)
+					List<ChainedDriver<?, ?>> chainedTasksTarget, List<BufferWriter> eventualOutputs)
 	throws Exception
 	{
 		final int numOutputs = config.getNumOutputs();
@@ -1391,6 +1363,12 @@ public class RegularPactTask<S extends Function, OT> extends AbstractTask implem
 		// instantiate the output collector the default way from this configuration
 		return getOutputCollector(nepheleTask , config, cl, eventualOutputs, numOutputs);
 	}
+
+	public static void initOutputWriters(List<BufferWriter> writers) {
+		for (BufferWriter writer : writers) {
+			((RecordWriter<?>) writer).initializeSerializers();
+		}
+	}
 	
 	// --------------------------------------------------------------------------------------------
 	//                                  User Code LifeCycle

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ExceptionInChainedStubException.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ExceptionInChainedStubException.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ExceptionInChainedStubException.java
index 568bb7d..786928b 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ExceptionInChainedStubException.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/ExceptionInChainedStubException.java
@@ -47,4 +47,12 @@ public class ExceptionInChainedStubException extends RuntimeException
 	public Exception getWrappedException() {
 		return exception;
 	}
+
+	public static Exception exceptionUnwrap(Exception e) {
+		if (e instanceof ExceptionInChainedStubException) {
+			return exceptionUnwrap(((ExceptionInChainedStubException) e).getWrappedException());
+		}
+
+		return e;
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
index c5c26eb..814eb62 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/chaining/SynchronousChainedCombineDriver.java
@@ -165,6 +165,8 @@ public class SynchronousChainedCombineDriver<T> extends ChainedDriver<T, T> {
 		} catch (Exception e) {
 			throw new ExceptionInChainedStubException(this.taskName, e);
 		}
+
+		this.outputCollector.close();
 	}
 
 	private void sortAndCombine() throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/ReaderIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/ReaderIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/ReaderIterator.java
index c25a106..b02850c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/ReaderIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/ReaderIterator.java
@@ -16,7 +16,7 @@ package eu.stratosphere.pact.runtime.task.util;
 import java.io.IOException;
 
 import eu.stratosphere.api.common.typeutils.TypeSerializer;
-import eu.stratosphere.nephele.io.MutableReader;
+import eu.stratosphere.runtime.io.api.MutableReader;
 import eu.stratosphere.pact.runtime.plugable.DeserializationDelegate;
 import eu.stratosphere.util.MutableObjectIterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/RecordReaderIterator.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/RecordReaderIterator.java b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/RecordReaderIterator.java
index 7e3ce5f..b087c2c 100644
--- a/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/RecordReaderIterator.java
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/pact/runtime/task/util/RecordReaderIterator.java
@@ -15,7 +15,7 @@ package eu.stratosphere.pact.runtime.task.util;
 
 import java.io.IOException;
 
-import eu.stratosphere.nephele.io.MutableReader;
+import eu.stratosphere.runtime.io.api.MutableReader;
 import eu.stratosphere.types.Record;
 import eu.stratosphere.util.MutableObjectIterator;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
new file mode 100644
index 0000000..c192cb9
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/Buffer.java
@@ -0,0 +1,93 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+package eu.stratosphere.runtime.io;
+
+import eu.stratosphere.core.memory.MemorySegment;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class Buffer {
+
+	private final MemorySegment memorySegment;
+
+	private final BufferRecycler recycler;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private final AtomicInteger referenceCounter;
+
+	private int size;
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public Buffer(MemorySegment memorySegment, int size, BufferRecycler recycler) {
+		this.memorySegment = memorySegment;
+		this.size = size;
+		this.recycler = recycler;
+
+		// we are the first, so we start with reference count of one
+		this.referenceCounter = new AtomicInteger(1);
+	}
+
+	/**
+	 * NOTE: Requires that the reference counter was increased prior to the constructor call!
+	 *
+	 * @param toDuplicate Buffer instance to duplicate
+	 */
+	private Buffer(Buffer toDuplicate) {
+		if (toDuplicate.referenceCounter.getAndIncrement() == 0) {
+			throw new IllegalStateException("Buffer was released before duplication.");
+		}
+		
+		this.memorySegment = toDuplicate.memorySegment;
+		this.size = toDuplicate.size;
+		this.recycler = toDuplicate.recycler;
+		this.referenceCounter = toDuplicate.referenceCounter;
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	public MemorySegment getMemorySegment() {
+		return this.memorySegment;
+	}
+
+	public int size() {
+		return this.size;
+	}
+
+	public void limitSize(int size) {
+		if (size >= 0 && size <= this.memorySegment.size()) {
+			this.size = size;
+		} else {
+			throw new IllegalArgumentException();
+		}
+	}
+
+	public void recycleBuffer() {
+		if (this.referenceCounter.decrementAndGet() == 0) {
+			this.recycler.recycle(this.memorySegment);
+		}
+	}
+
+	public Buffer duplicate() {
+		return new Buffer(this);
+	}
+
+	public void copyToBuffer(Buffer destinationBuffer) {
+		if (size() > destinationBuffer.size()) {
+			throw new IllegalArgumentException("Destination buffer is too small to store content of source buffer.");
+		}
+
+		this.memorySegment.copyTo(0, destinationBuffer.memorySegment, 0, size);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/BufferRecycler.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/BufferRecycler.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/BufferRecycler.java
new file mode 100644
index 0000000..88f9edb
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/BufferRecycler.java
@@ -0,0 +1,26 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io;
+
+import eu.stratosphere.core.memory.MemorySegment;
+
+public interface BufferRecycler {
+
+	/**
+	 * Called by {@link eu.stratosphere.runtime.io.Buffer} to return a {@link MemorySegment} to its original buffer pool.
+	 *
+	 * @param buffer the segment to be recycled
+	 */
+	void recycle(MemorySegment buffer);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/2db78a8d/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git a/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractRecordReader.java b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractRecordReader.java
new file mode 100644
index 0000000..d71695c
--- /dev/null
+++ b/stratosphere-runtime/src/main/java/eu/stratosphere/runtime/io/api/AbstractRecordReader.java
@@ -0,0 +1,98 @@
+/***********************************************************************************************************************
+ * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu)
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ **********************************************************************************************************************/
+
+package eu.stratosphere.runtime.io.api;
+
+import eu.stratosphere.nephele.event.task.AbstractTaskEvent;
+import eu.stratosphere.nephele.event.task.EventListener;
+import eu.stratosphere.nephele.event.task.EventNotificationManager;
+
+/**
+ * This is an abstract base class for a record reader, either dealing with mutable or immutable records,
+ * and dealing with reads from single gates (single end points) or multiple gates (union).
+ */
+public abstract class AbstractRecordReader implements ReaderBase {
+	
+	
+	private final EventNotificationManager eventHandler = new EventNotificationManager();
+	
+	private int numEventsUntilEndOfSuperstep = -1;
+	
+	private int endOfSuperstepEventsCount;
+	
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Subscribes the listener object to receive events of the given type.
+	 * 
+	 * @param eventListener
+	 *        the listener object to register
+	 * @param eventType
+	 *        the type of event to register the listener for
+	 */
+	@Override
+	public void subscribeToEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+		this.eventHandler.subscribeToEvent(eventListener, eventType);
+	}
+
+	/**
+	 * Removes the subscription for events of the given type for the listener object.
+	 * 
+	 * @param eventListener The listener object to cancel the subscription for.
+	 * @param eventType The type of the event to cancel the subscription for.
+	 */
+	@Override
+	public void unsubscribeFromEvent(EventListener eventListener, Class<? extends AbstractTaskEvent> eventType) {
+		this.eventHandler.unsubscribeFromEvent(eventListener, eventType);
+	}
+	
+	
+	protected void handleEvent(AbstractTaskEvent evt) {
+		this.eventHandler.deliverEvent(evt);
+	}
+	
+	@Override
+	public void setIterative(int numEventsUntilEndOfSuperstep) {
+		this.numEventsUntilEndOfSuperstep = numEventsUntilEndOfSuperstep;
+	}
+
+	@Override
+	public void startNextSuperstep() {
+		if (this.numEventsUntilEndOfSuperstep == -1) {
+			throw new IllegalStateException("Called 'startNextSuperstep()' in a non-iterative reader.");
+		}
+		else if (endOfSuperstepEventsCount < numEventsUntilEndOfSuperstep) {
+			throw new IllegalStateException("Premature 'startNextSuperstep()'. Not yet reached the end-of-superstep.");
+		}
+		this.endOfSuperstepEventsCount = 0;
+	}
+	
+	@Override
+	public boolean hasReachedEndOfSuperstep() {
+		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
+	}
+	
+	protected boolean incrementEndOfSuperstepEventAndCheck() {
+		if (numEventsUntilEndOfSuperstep == -1) {
+			throw new IllegalStateException("Received EndOfSuperstep event in a non-iterative reader.");
+		}
+		
+		endOfSuperstepEventsCount++;
+		
+		if (endOfSuperstepEventsCount > numEventsUntilEndOfSuperstep) {
+			throw new IllegalStateException("Received EndOfSuperstep events beyond the number to indicate the end of the superstep");
+		}
+		
+		return endOfSuperstepEventsCount== numEventsUntilEndOfSuperstep;
+	}
+}


Mime
View raw message