flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [17/18] flink git commit: [FLINK-986] [FLINK-25] [Distributed runtime] Add initial support for intermediate results
Date Mon, 12 Jan 2015 08:16:25 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
new file mode 100644
index 0000000..148f8d4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionDeploymentDescriptor.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionType;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * A partition deployment descriptor combines information for a produced intermediate result
+ * partition.
+ */
+public class PartitionDeploymentDescriptor implements IOReadableWritable, Serializable {
+
+	private final IntermediateDataSetID resultId;
+
+	private final IntermediateResultPartitionID partitionId;
+
+	private IntermediateResultPartitionType partitionType;
+
+	private int numberOfQueues;
+
+	public PartitionDeploymentDescriptor() {
+		this.resultId = new IntermediateDataSetID();
+		this.partitionId = new IntermediateResultPartitionID();
+		this.numberOfQueues = -1;
+	}
+
+	public PartitionDeploymentDescriptor(IntermediateDataSetID resultId, IntermediateResultPartitionID partitionId, IntermediateResultPartitionType partitionType, int numberOfQueues) {
+		this.resultId = resultId;
+		this.partitionId = partitionId;
+		this.partitionType = partitionType;
+		this.numberOfQueues = numberOfQueues;
+	}
+
+	// ------------------------------------------------------------------------
+	// Properties
+	// ------------------------------------------------------------------------
+
+	public IntermediateDataSetID getResultId() {
+		return resultId;
+	}
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	public IntermediateResultPartitionType getPartitionType() {
+		return partitionType;
+	}
+
+	public int getNumberOfQueues() {
+		return numberOfQueues;
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		resultId.write(out);
+		partitionId.write(out);
+		out.writeInt(partitionType.ordinal());
+		out.writeInt(numberOfQueues);
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		resultId.read(in);
+		partitionId.read(in);
+		partitionType = IntermediateResultPartitionType.values()[in.readInt()];
+		numberOfQueues = in.readInt();
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static PartitionDeploymentDescriptor fromIntermediateResultPartition(IntermediateResultPartition partition) {
+
+		IntermediateResultPartitionID partitionId = partition.getPartitionId();
+
+		// The produced data is partitioned at runtime among a number of queues.
+		// If no consumers are known at this point, we use a single queue,
+		// otherwise we have a queue for each consumer sub task.
+		int numberOfQueues = 1;
+
+		if (!partition.getConsumers().isEmpty() && !partition.getConsumers().get(0).isEmpty()) {
+			numberOfQueues = partition.getConsumers().get(0).size();
+		}
+
+		return new PartitionDeploymentDescriptor(partition.getIntermediateResult().getId(), partitionId, partition.getIntermediateResult().getResultType(), numberOfQueues);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
new file mode 100644
index 0000000..cdaf289
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.deployment;
+
+import org.apache.flink.core.io.IOReadableWritable;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.executiongraph.Execution;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.executiongraph.ExecutionEdge;
+import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
+import org.apache.flink.runtime.instance.AllocatedSlot;
+import org.apache.flink.runtime.io.network.RemoteAddress;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+/**
+ * A partition info instance contains all information necessary for a reader to create an input
+ * channel to request a partition at runtime.
+ */
+public class PartitionInfo implements IOReadableWritable, Serializable {
+
+	public enum PartitionLocation {
+		LOCAL, REMOTE, UNKNOWN
+	}
+
+	private final IntermediateResultPartitionID partitionId;
+
+	private ExecutionAttemptID producerExecutionId;
+
+	private PartitionLocation producerLocation;
+
+	private RemoteAddress producerAddress; // != null, iff known remote producer
+
+	public PartitionInfo(IntermediateResultPartitionID partitionId, ExecutionAttemptID producerExecutionId, PartitionLocation producerLocation, RemoteAddress producerAddress) {
+		this.partitionId = checkNotNull(partitionId);
+		this.producerExecutionId = checkNotNull(producerExecutionId);
+		this.producerLocation = checkNotNull(producerLocation);
+		this.producerAddress = producerAddress;
+	}
+
+	public PartitionInfo() {
+		this.partitionId = new IntermediateResultPartitionID();
+		this.producerExecutionId = new ExecutionAttemptID();
+		this.producerLocation = PartitionLocation.UNKNOWN;
+		this.producerAddress = null;
+	}
+
+	// ------------------------------------------------------------------------
+	// Properties
+	// ------------------------------------------------------------------------
+
+	public IntermediateResultPartitionID getPartitionId() {
+		return partitionId;
+	}
+
+	public ExecutionAttemptID getProducerExecutionId() {
+		return producerExecutionId;
+	}
+
+	public PartitionLocation getProducerLocation() {
+		return producerLocation;
+	}
+
+	public RemoteAddress getProducerAddress() {
+		return producerAddress;
+	}
+
+	// ------------------------------------------------------------------------
+	// Serialization
+	// ------------------------------------------------------------------------
+
+	@Override
+	public void write(DataOutputView out) throws IOException {
+		partitionId.write(out);
+		producerExecutionId.write(out);
+		out.writeInt(producerLocation.ordinal());
+		if (producerLocation == PartitionLocation.REMOTE) {
+			producerAddress.write(out);
+		}
+	}
+
+	@Override
+	public void read(DataInputView in) throws IOException {
+		partitionId.read(in);
+		producerExecutionId.read(in);
+		producerLocation = PartitionLocation.values()[in.readInt()];
+		if (producerLocation == PartitionLocation.REMOTE) {
+			producerAddress = new RemoteAddress();
+			producerAddress.read(in);
+		}
+	}
+
+	// ------------------------------------------------------------------------
+
+	public static PartitionInfo fromEdge(ExecutionEdge edge, AllocatedSlot consumerSlot) {
+		IntermediateResultPartition partition = edge.getSource();
+		IntermediateResultPartitionID partitionId = partition.getPartitionId();
+
+		// Intermediate result partition producer
+		Execution producer = partition.getProducer().getCurrentExecutionAttempt();
+
+		ExecutionAttemptID producerExecutionId = producer.getAttemptId();
+		RemoteAddress producerAddress = null;
+		PartitionLocation producerLocation = PartitionLocation.UNKNOWN;
+
+		AllocatedSlot producerSlot = producer.getAssignedResource();
+		ExecutionState producerState = producer.getState();
+
+		// The producer needs to be running, otherwise the consumer might request a partition,
+		// which has not been registered yet.
+		if (producerSlot != null && producerState == ExecutionState.RUNNING) {
+			if (producerSlot.getInstance().equals(consumerSlot.getInstance())) {
+				producerLocation = PartitionLocation.LOCAL;
+			}
+			else {
+				producerAddress = new RemoteAddress(producerSlot.getInstance().getInstanceConnectionInfo(),
+						partition.getIntermediateResult().getConnectionIndex());
+
+				producerLocation = PartitionLocation.REMOTE;
+			}
+		}
+
+		return new PartitionInfo(partitionId, producerExecutionId, producerLocation, producerAddress);
+	}
+
+	public static PartitionInfo[] fromEdges(ExecutionEdge[] edges, AllocatedSlot consumerSlot) {
+		// Every edge consumes a different result partition, which might be of
+		// local, remote, or unknown location.
+		PartitionInfo[] partitions = new PartitionInfo[edges.length];
+
+		for (int i = 0; i < edges.length; i++) {
+			partitions[i] = fromEdge(edges[i], consumerSlot);
+		}
+
+		return partitions;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 75a3c9b..2432bea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -18,17 +18,19 @@
 
 package org.apache.flink.runtime.deployment;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobKey;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * A task deployment descriptor contains all the information necessary to deploy a task on a task manager.
  */
@@ -41,7 +43,7 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/** The task's job vertex ID. */
 	private final JobVertexID vertexID;
-	
+
 	/** The ID referencing the attempt to execute the task. */
 	private final ExecutionAttemptID executionId;
 
@@ -51,8 +53,8 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	/** The task's index in the subtask group. */
 	private int indexInSubtaskGroup;
 
-	/** The current number of subtasks. */
-	private int currentNumberOfSubtasks;
+	/** The number of sub tasks. */
+	private int numberOfSubtasks;
 
 	/** The configuration of the job the task belongs to. */
 	private Configuration jobConfiguration;
@@ -62,73 +64,44 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/** The name of the class containing the task code to be executed. */
 	private String invokableClassName;
-	/** The list of output gate deployment descriptors. */
-	private List<GateDeploymentDescriptor> outputGates;
 
-	/** The list of input gate deployment descriptors. */
-	private List<GateDeploymentDescriptor> inputGates;
-	
+
+	/** The list of produced intermediate result partition deployment descriptors. */
+	private List<PartitionDeploymentDescriptor> producedPartitions;
+
+	/** The list of consumed intermediate result partitions. */
+	private List<PartitionConsumerDeploymentDescriptor> consumedPartitions;
+
 	private int targetSlotNumber;
 
-	/**
-	 * The list of JAR files required to run this task.
-	 */
+	/** The list of JAR files required to run this task. */
 	private final List<BlobKey> requiredJarFiles;
 
 	/**
 	 * Constructs a task deployment descriptor.
-	 * 
-	 * @param jobID
-	 *        the ID of the job the tasks belongs to
-	 * @param vertexID
-	 *        the task's execution vertex ID
-	 * @param taskName
-	 *        the task's name the task's index in the subtask group
-	 * @param indexInSubtaskGroup
-	 *        he task's index in the subtask group
-	 * @param currentNumberOfSubtasks
-	 *        the current number of subtasks
-	 * @param jobConfiguration
-	 *        the configuration of the job the task belongs to
-	 * @param taskConfiguration
-	 *        the task's configuration object
-	 * @param invokableClassName
-	 *        the class containing the task code to be executed
-	 * @param outputGates
-	 *        list of output gate deployment descriptors
-	 * @param requiredJarFiles
-	 *        list of JAR files required to run this task
 	 */
-	public TaskDeploymentDescriptor(JobID jobID, JobVertexID vertexID, ExecutionAttemptID execuionId,
-			String taskName, int indexInSubtaskGroup, int currentNumberOfSubtasks, 
-			Configuration jobConfiguration, Configuration taskConfiguration,
-			String invokableClassName,
-			List<GateDeploymentDescriptor> outputGates,
-			List<GateDeploymentDescriptor> inputGates,
-			final List<BlobKey> requiredJarFiles, int targetSlotNumber){
-		if (jobID == null || vertexID == null || execuionId == null || taskName == null || indexInSubtaskGroup < 0 ||
-				currentNumberOfSubtasks <= indexInSubtaskGroup || jobConfiguration == null ||
-				taskConfiguration == null || invokableClassName == null || outputGates == null || inputGates == null)
-		{
-			throw new IllegalArgumentException();
-		}
-		
-		if (requiredJarFiles == null) {
-			throw new IllegalArgumentException("Argument requiredJarFiles must not be null");
-		}
-
-		this.jobID = jobID;
-		this.vertexID = vertexID;
-		this.executionId = execuionId;
-		this.taskName = taskName;
+	public TaskDeploymentDescriptor(
+			JobID jobID, JobVertexID vertexID,  ExecutionAttemptID executionId,  String taskName,
+			int indexInSubtaskGroup,  int numberOfSubtasks, Configuration jobConfiguration,
+			Configuration taskConfiguration, String invokableClassName,
+			List<PartitionDeploymentDescriptor> producedPartitions,
+			List<PartitionConsumerDeploymentDescriptor> consumedPartitions,
+			List<BlobKey> requiredJarFiles, int targetSlotNumber){
+
+		this.jobID = checkNotNull(jobID);
+		this.vertexID = checkNotNull(vertexID);
+		this.executionId = checkNotNull(executionId);
+		this.taskName = checkNotNull(taskName);
+		checkArgument(indexInSubtaskGroup >= 0);
 		this.indexInSubtaskGroup = indexInSubtaskGroup;
-		this.currentNumberOfSubtasks = currentNumberOfSubtasks;
-		this.jobConfiguration = jobConfiguration;
-		this.taskConfiguration = taskConfiguration;
-		this.invokableClassName = invokableClassName;
-		this.outputGates = outputGates;
-		this.inputGates = inputGates;
-		this.requiredJarFiles = requiredJarFiles;
+		checkArgument(numberOfSubtasks > indexInSubtaskGroup);
+		this.numberOfSubtasks = numberOfSubtasks;
+		this.jobConfiguration = checkNotNull(jobConfiguration);
+		this.taskConfiguration = checkNotNull(taskConfiguration);
+		this.invokableClassName = checkNotNull(invokableClassName);
+		this.producedPartitions = checkNotNull(producedPartitions);
+		this.consumedPartitions = checkNotNull(consumedPartitions);
+		this.requiredJarFiles = checkNotNull(requiredJarFiles);
 		this.targetSlotNumber = targetSlotNumber;
 	}
 
@@ -141,40 +114,34 @@ public final class TaskDeploymentDescriptor implements Serializable {
 		this.executionId = new ExecutionAttemptID();
 		this.jobConfiguration = new Configuration();
 		this.taskConfiguration = new Configuration();
-		this.outputGates = Collections.emptyList();
-		this.inputGates = Collections.emptyList();
+		this.producedPartitions = new ArrayList<PartitionDeploymentDescriptor>();
+		this.consumedPartitions = new ArrayList<PartitionConsumerDeploymentDescriptor>();
 		this.requiredJarFiles = new ArrayList<BlobKey>();
 	}
 
 	/**
 	 * Returns the ID of the job the tasks belongs to.
-	 * 
-	 * @return the ID of the job the tasks belongs to
 	 */
 	public JobID getJobID() {
-		return this.jobID;
+		return jobID;
 	}
 
 	/**
 	 * Returns the task's execution vertex ID.
-	 * 
-	 * @return the task's execution vertex ID
 	 */
 	public JobVertexID getVertexID() {
-		return this.vertexID;
+		return vertexID;
 	}
-	
+
 	public ExecutionAttemptID getExecutionId() {
 		return executionId;
 	}
 
 	/**
 	 * Returns the task's name.
-	 * 
-	 * @return the task's name
 	 */
 	public String getTaskName() {
-		return this.taskName;
+		return taskName;
 	}
 
 	/**
@@ -183,16 +150,14 @@ public final class TaskDeploymentDescriptor implements Serializable {
 	 * @return the task's index in the subtask group
 	 */
 	public int getIndexInSubtaskGroup() {
-		return this.indexInSubtaskGroup;
+		return indexInSubtaskGroup;
 	}
 
 	/**
 	 * Returns the current number of subtasks.
-	 * 
-	 * @return the current number of subtasks
 	 */
-	public int getCurrentNumberOfSubtasks() {
-		return this.currentNumberOfSubtasks;
+	public int getNumberOfSubtasks() {
+		return numberOfSubtasks;
 	}
 	
 	/**
@@ -206,38 +171,34 @@ public final class TaskDeploymentDescriptor implements Serializable {
 
 	/**
 	 * Returns the configuration of the job the task belongs to.
-	 * 
-	 * @return the configuration of the job the tasks belongs to
 	 */
 	public Configuration getJobConfiguration() {
-		return this.jobConfiguration;
+		return jobConfiguration;
 	}
 
 	/**
 	 * Returns the task's configuration object.
-	 * 
-	 * @return the task's configuration object
 	 */
 	public Configuration getTaskConfiguration() {
-		return this.taskConfiguration;
+		return taskConfiguration;
 	}
 
 	/**
 	 * Returns the name of the class containing the task code to be executed.
-	 * 
-	 * @return The name of the class containing the task code to be executed
 	 */
 	public String getInvokableClassName() {
-		return this.invokableClassName;
+		return invokableClassName;
 	}
 
-	public List<GateDeploymentDescriptor> getOutputGates() {
-		return outputGates;
+	public List<PartitionDeploymentDescriptor> getProducedPartitions() {
+		return producedPartitions;
 	}
-	
-	public List<GateDeploymentDescriptor> getInputGates() {
-		return inputGates;
+
+	public List<PartitionConsumerDeploymentDescriptor> getConsumedPartitions() {
+		return consumedPartitions;
 	}
 
-	public List<BlobKey> getRequiredJarFiles() { return requiredJarFiles; }
-}
\ No newline at end of file
+	public List<BlobKey> getRequiredJarFiles() {
+		return requiredJarFiles;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractTaskEvent.java
deleted file mode 100644
index d1806bf..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/AbstractTaskEvent.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * This type of event can be used to exchange notification messages between
- * different tasks at runtime using the existing communication channels.
- * 
- */
-public abstract class AbstractTaskEvent extends AbstractEvent {
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventListener.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventListener.java
deleted file mode 100644
index f9da415..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventListener.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.task;
-
-/**
- * This interface must be implemented by all classes which shall
- * be notified about incoming task events.
- * 
- */
-public interface EventListener {
-
-	/**
-	 * Called when a task event has occurred.
-	 * 
-	 * @param event
-	 *        the task event which has occurred
-	 */
-	void eventOccurred(AbstractTaskEvent event);
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventNotificationManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventNotificationManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventNotificationManager.java
deleted file mode 100644
index e8b8eda..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/EventNotificationManager.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.runtime.event.task;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-/**
- * The event notification manager manages the subscription of {@link EventListener} objects to
- * particular event types. Moreover, it handles the dispatching of the events.
- * This class is thread-safe.
- * 
- */
-public class EventNotificationManager {
-
-	/**
-	 * Stores the subscriptions for the individual event types.
-	 */
-	private final Map<Class<? extends AbstractTaskEvent>, List<EventListener>> subscriptions = new HashMap<Class<? extends AbstractTaskEvent>, List<EventListener>>();
-
-	/**
-	 * Subscribes the given event listener object to the specified event type.
-	 * 
-	 * @param eventListener
-	 *        the {@link EventListener} object to create the subscription for
-	 * @param eventType
-	 *        the event type the given listener object wants to be notified about
-	 */
-	public void subscribeToEvent(final EventListener eventListener, final Class<? extends AbstractTaskEvent> eventType) {
-
-		synchronized (this.subscriptions) {
-
-			List<EventListener> subscribers = this.subscriptions.get(eventType);
-			if (subscribers == null) {
-				subscribers = new ArrayList<EventListener>();
-				this.subscriptions.put(eventType, subscribers);
-			}
-			subscribers.add(eventListener);
-		}
-
-	}
-
-	/**
-	 * Removes a subscription of an {@link EventListener} object for the given event type.
-	 * 
-	 * @param eventListener
-	 *        the event listener to remove the subscription for
-	 * @param eventType
-	 *        the event type to remove the subscription for
-	 */
-	public void unsubscribeFromEvent(final EventListener eventListener, final Class<? extends AbstractEvent> eventType) {
-
-		synchronized (this.subscriptions) {
-
-			List<EventListener> subscribers = this.subscriptions.get(eventType);
-			if (subscribers == null) {
-				return;
-			}
-			subscribers.remove(eventListener);
-			if (subscribers.isEmpty()) {
-				this.subscriptions.remove(eventType);
-			}
-		}
-
-	}
-
-	/**
-	 * Delivers a event to all of the registered subscribers.
-	 * 
-	 * @param event The event to deliver.
-	 */
-	public void deliverEvent(AbstractTaskEvent event) {
-
-		synchronized (this.subscriptions) {
-
-			List<EventListener> subscribers = this.subscriptions.get(event.getClass());
-			if (subscribers == null) {
-				return;
-			}
-
-			final Iterator<EventListener> it = subscribers.iterator();
-			while (it.hasNext()) {
-				it.next().eventOccurred(event);
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
index 3dcb88b..648dacc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/IntegerTaskEvent.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * This class provides a simple implementation of an event that holds an integer value.
  * 
  */
-public class IntegerTaskEvent extends AbstractTaskEvent {
+public class IntegerTaskEvent extends TaskEvent {
 
 	/**
 	 * The integer value transported by this integer task event.

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
new file mode 100644
index 0000000..cd19bc4
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/RuntimeEvent.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.task;
+
+public abstract class RuntimeEvent extends AbstractEvent {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
index e68b444..7b6b3d5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/StringTaskEvent.java
@@ -28,7 +28,7 @@ import org.apache.flink.core.memory.DataOutputView;
  * This class provides a simple implementation of an event that holds a string value.
  * 
  */
-public class StringTaskEvent extends AbstractTaskEvent {
+public class StringTaskEvent extends TaskEvent {
 
 	/**
 	 * The string encapsulated by this event.

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
new file mode 100644
index 0000000..9501b95
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/event/task/TaskEvent.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.event.task;
+
+public abstract class TaskEvent extends AbstractEvent {
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
index 647e1ad..c45a941 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/Environment.java
@@ -16,37 +16,32 @@
  * limitations under the License.
  */
 
-
 package org.apache.flink.runtime.execution;
 
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.memorymanager.MemoryManager;
 
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+
 /**
- * The user code of every Nephele task runs inside an <code>Environment</code> object. The environment provides
- * important services to the task. It keeps track of setting up the communication channels and provides access to input
- * splits, memory manager, etc.
+ * The user code of every task runs inside an <code>Environment</code> object.
+ * The environment provides important services to the task. It keeps track of
+ * setting up the communication channels and provides access to input splits,
+ * memory manager, etc.
  */
 public interface Environment {
-	
+
 	/**
 	 * Returns the ID of the job from the original job graph. It is used by the library cache manager to find the
 	 * required
@@ -82,7 +77,7 @@ public interface Environment {
 	 *
 	 * @return the current number of subtasks the respective task is split into
 	 */
-	int getCurrentNumberOfSubtasks();
+	int getNumberOfSubtasks();
 
 	/**
 	 * Returns the index of this subtask in the subtask group.
@@ -119,120 +114,34 @@ public interface Environment {
 	 */
 	String getTaskName();
 
-	/**
-	 * Returns the next unbound input gate ID or <code>null</code> if no such ID exists
-	 *
-	 * @return the next unbound input gate ID or <code>null</code> if no such ID exists
-	 */
-	GateID getNextUnboundInputGateID();
-
-	/**
-	 * Returns the number of output gates registered with this environment.
-	 *
-	 * @return the number of output gates registered with this environment
-	 */
-	int getNumberOfOutputGates();
-
-	/**
-	 * Returns the number of input gates registered with this environment.
-	 *
-	 * @return the number of input gates registered with this environment
-	 */
-	int getNumberOfInputGates();
-
-	/**
-	 * Returns the number of output channels attached to this environment.
-	 *
-	 * @return the number of output channels attached to this environment
-	 */
-	int getNumberOfOutputChannels();
-
-	/**
-	 * Returns the number of input channels attached to this environment.
-	 *
-	 * @return the number of input channels attached to this environment
-	 */
-	int getNumberOfInputChannels();
-
-	/**
-	 * Creates a new OutputGate and registers it with the Environment.
-	 *
-	 * @return the newly created output gate
-	 */
-	OutputGate createAndRegisterOutputGate();
-
-	/**
-	 * Creates a new InputGate and registers it with the Environment.
-	 */
-	<T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate();
+	String getTaskNameWithSubtasks();
 
 	/**
-	 * Returns the IDs of all output channels connected to this environment.
-	 *
-	 * @return the IDs of all output channels connected to this environment
-	 */
-	Set<ChannelID> getOutputChannelIDs();
-
-	/**
-	 * Returns the IDs of all input channels connected to this environment.
-	 *
-	 * @return the IDs of all input channels connected to this environment
+	 * Returns the proxy object for the accumulator protocol.
 	 */
-	Set<ChannelID> getInputChannelIDs();
+	ActorRef getJobManager();
 
 	/**
-	 * Returns the IDs of all output gates connected to this environment.
-	 *
-	 * @return the IDs of all output gates connected to this environment
+	 * Returns the user code class loader
 	 */
-	Set<GateID> getOutputGateIDs();
+	ClassLoader getUserClassLoader();
 
-	/**
-	 * Returns the IDs of all input gates connected to this environment.
-	 *
-	 * @return the IDs of all input gates connected to this environment
-	 */
-	Set<GateID> getInputGateIDs();
+	Map<String, FutureTask<Path>> getCopyTask();
 
-	/**
-	 * Returns the IDs of all the output channels connected to the gate with the given ID.
-	 *
-	 * @param gateID the gate ID
-	 * @return the IDs of all the output channels connected to the gate with the given ID
-	 */
-	Set<ChannelID> getOutputChannelIDsOfGate(GateID gateID);
+	BroadcastVariableManager getBroadcastVariableManager();
 
-	/**
-	 * Returns the IDs of all the input channels connected to the gate with the given ID.
-	 *
-	 * @param gateID the gate ID
-	 * @return the IDs of all the input channels connected to the gate with the given ID
-	 */
-	Set<ChannelID> getInputChannelIDsOfGate(GateID gateID);
+	// ------------------------------------------------------------------------
+	// Runtime result writers and readers
+	// ------------------------------------------------------------------------
+	// The environment sets up buffer-oriented writers and readers, which the
+	// user can use to produce and consume results.
+	// ------------------------------------------------------------------------
 
-	/**
-	 * Returns the proxy object for the accumulator protocol.
-	 */
-	ActorRef getAccumulator();
+	BufferWriter getWriter(int index);
 
-	/**
-	 * Returns the user code class loader
-	 * @return user code class loader
-	 */
-	ClassLoader getUserClassLoader();
+	BufferWriter[] getAllWriters();
 
-	/**
-	 * Returns the buffer provider for this environment.
-	 * <p/>
-	 * The returned buffer provider is used by the output side of the network stack.
-	 *
-	 * @return Buffer provider for the output side of the network stack
-	 * @see org.apache.flink.runtime.io.network.api.RecordWriter
-	 */
-	BufferProvider getOutputBufferProvider();
+	BufferReader getReader(int index);
 
-	Map<String, FutureTask<Path>> getCopyTask();
-	
-	
-	BroadcastVariableManager getBroadcastVariableManager();
+	BufferReader[] getAllReaders();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d908ca19/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index e91344f..6be0397 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -18,39 +18,19 @@
 
 package org.apache.flink.runtime.execution;
 
-import java.io.IOException;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.FutureTask;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 import akka.actor.ActorRef;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionConsumerDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.PartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.network.Buffer;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
-import org.apache.flink.runtime.io.network.bufferprovider.BufferProvider;
-import org.apache.flink.runtime.io.network.bufferprovider.GlobalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPool;
-import org.apache.flink.runtime.io.network.bufferprovider.LocalBufferPoolOwner;
-import org.apache.flink.runtime.io.network.channels.ChannelID;
-import org.apache.flink.runtime.io.network.channels.OutputChannel;
-import org.apache.flink.runtime.io.network.gates.GateID;
-import org.apache.flink.runtime.io.network.gates.InputGate;
-import org.apache.flink.runtime.io.network.gates.OutputGate;
+import org.apache.flink.runtime.io.network.NetworkEnvironment;
+import org.apache.flink.runtime.io.network.api.reader.BufferReader;
+import org.apache.flink.runtime.io.network.api.writer.BufferWriter;
+import org.apache.flink.runtime.io.network.partition.IntermediateResultPartition;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -60,50 +40,38 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.FutureTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import static com.google.common.base.Preconditions.checkElementIndex;
+import static com.google.common.base.Preconditions.checkNotNull;
 
-public class RuntimeEnvironment implements Environment, BufferProvider, LocalBufferPoolOwner, Runnable {
+public class RuntimeEnvironment implements Environment, Runnable {
 
-	/** The log object used for debugging. */
 	private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
 
 	private static final ThreadGroup TASK_THREADS = new ThreadGroup("Task Threads");
-	
-	/** The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds). */
-	private static final int SLEEPINTERVAL = 100;
-	
-	// --------------------------------------------------------------------------------------------
+
+	/** The ActorRef to the job manager */
+	private final ActorRef jobManager;
 
 	/** The task that owns this environment */
 	private final Task owner;
-	
-	
+
 	/** The job configuration encapsulated in the environment object. */
 	private final Configuration jobConfiguration;
 
 	/** The task configuration encapsulated in the environment object. */
 	private final Configuration taskConfiguration;
-	
-	
+
 	/** ClassLoader for all user code classes */
 	private final ClassLoader userCodeClassLoader;
-	
-	/** Class of the task to run in this environment. */
-	private final Class<? extends AbstractInvokable> invokableClass;
 
 	/** Instance of the class to be run in this environment. */
 	private final AbstractInvokable invokable;
-	
-	
-	/** List of output gates created by the task. */
-	private final ArrayList<OutputGate> outputGates = new ArrayList<OutputGate>();
-
-	/** List of input gates created by the task. */
-	private final ArrayList<InputGate<? extends IOReadableWritable>> inputGates = new ArrayList<InputGate<? extends IOReadableWritable>>();
-
-	/** Unbound input gate IDs which are required for deserializing an environment in the course of an RPC call. */
-	private final Queue<GateID> unboundInputGateIDs = new ArrayDeque<GateID>();
 
 	/** The memory manager of the current environment (currently the one associated with the executing TaskManager). */
 	private final MemoryManager memoryManager;
@@ -111,111 +79,103 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	/** The I/O manager of the current environment (currently the one associated with the executing TaskManager). */
 	private final IOManager ioManager;
 
-	/** The input split provider that can be queried for new input splits.  */
+	/** The input split provider that can be queried for new input splits. */
 	private final InputSplitProvider inputSplitProvider;
 
-	
 	/** The thread executing the task in the environment. */
 	private Thread executingThread;
 
-	/**
-	 * The ActorRef to the accumulator
-	 */
-	private final ActorRef accumulator;
-
-	private final Map<String,FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
-	
-	private final BroadcastVariableManager bcVarManager;
-	
-	private LocalBufferPool outputBufferPool;
-	
-	private AtomicBoolean canceled = new AtomicBoolean();
-
-
-	public RuntimeEnvironment(Task owner, TaskDeploymentDescriptor tdd,
-							ClassLoader userCodeClassLoader,
-							MemoryManager memoryManager, IOManager ioManager,
-							InputSplitProvider inputSplitProvider,
-							ActorRef accumulator,
-							BroadcastVariableManager bcVarManager)
-		throws Exception
-	{
-		Preconditions.checkNotNull(owner);
-		Preconditions.checkNotNull(memoryManager);
-		Preconditions.checkNotNull(ioManager);
-		Preconditions.checkNotNull(inputSplitProvider);
-		Preconditions.checkNotNull(accumulator);
-		Preconditions.checkNotNull(userCodeClassLoader);
-		Preconditions.checkNotNull(bcVarManager);
-
-		this.owner = owner;
-
-		this.memoryManager = memoryManager;
-		this.ioManager = ioManager;
-		this.inputSplitProvider = inputSplitProvider;
-		this.accumulator = accumulator;
-		this.bcVarManager = bcVarManager;
-
-		// load and instantiate the invokable class
-		this.userCodeClassLoader = userCodeClassLoader;
-		try {
-			final String className = tdd.getInvokableClassName();
-			this.invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
-		}
-		catch (Throwable t) {
-			throw new Exception("Could not load invokable class.", t);
-		}
-		
+	private final BroadcastVariableManager broadcastVariableManager;
+
+	private final Map<String, FutureTask<Path>> cacheCopyTasks = new HashMap<String, FutureTask<Path>>();
+
+	private final AtomicBoolean canceled = new AtomicBoolean();
+
+	private final IntermediateResultPartition[] producedPartitions;
+
+	private final BufferWriter[] writers;
+
+	private final BufferReader[] readers;
+
+	private final Map<IntermediateDataSetID, BufferReader> readersById = new HashMap<IntermediateDataSetID, BufferReader>();
+
+	public RuntimeEnvironment(
+			ActorRef jobManager, Task owner, TaskDeploymentDescriptor tdd, ClassLoader userCodeClassLoader,
+			MemoryManager memoryManager, IOManager ioManager, InputSplitProvider inputSplitProvider,
+			BroadcastVariableManager broadcastVariableManager, NetworkEnvironment networkEnvironment) throws Exception {
+
+		this.owner = checkNotNull(owner);
+
+		this.memoryManager = checkNotNull(memoryManager);
+		this.ioManager = checkNotNull(ioManager);
+		this.inputSplitProvider = checkNotNull(inputSplitProvider);
+		this.jobManager = checkNotNull(jobManager);
+
+		this.broadcastVariableManager = checkNotNull(broadcastVariableManager);
+
 		try {
-			this.invokable = this.invokableClass.newInstance();
-		}
-		catch (Throwable t) {
-			throw new Exception("Could not instantiate the invokable class.", t);
-		}
-		
-		this.jobConfiguration = tdd.getJobConfiguration();
-		this.taskConfiguration = tdd.getTaskConfiguration();
-		
-		this.invokable.setEnvironment(this);
-		
-		// make sure that user classloader is available, because registerInputOutput might call usercode
-		{
-			Thread currentThread = Thread.currentThread();
-			ClassLoader context = currentThread.getContextClassLoader();
-			currentThread.setContextClassLoader(userCodeClassLoader);
+			// Produced intermediate result partitions
+			final List<PartitionDeploymentDescriptor> partitions = tdd.getProducedPartitions();
+
+			this.producedPartitions = new IntermediateResultPartition[partitions.size()];
+			this.writers = new BufferWriter[partitions.size()];
+
+			for (int i = 0; i < this.producedPartitions.length; i++) {
+				this.producedPartitions[i] = IntermediateResultPartition.create(this, i, owner.getJobID(), owner.getExecutionId(), networkEnvironment, partitions.get(i));
+				writers[i] = new BufferWriter(this.producedPartitions[i]);
+			}
+
+			// Consumed intermediate result partitions
+			final List<PartitionConsumerDeploymentDescriptor> consumedPartitions = tdd.getConsumedPartitions();
+
+			this.readers = new BufferReader[consumedPartitions.size()];
+
+			for (int i = 0; i < readers.length; i++) {
+				readers[i] = BufferReader.create(this, networkEnvironment, consumedPartitions.get(i));
+
+				// The readers are organized by key for task updates/channel updates at runtime
+				readersById.put(readers[i].getConsumedResultId(), readers[i]);
+			}
+
+			// ----------------------------------------------------------------
+			// Invokable setup
+			// ----------------------------------------------------------------
+			// Note: This has to be done *after* the readers and writers have
+			// been setup, because the invokable relies on them for I/O.
+			// ----------------------------------------------------------------
+
+			// Load and instantiate the invokable class
+			this.userCodeClassLoader = checkNotNull(userCodeClassLoader);
+			// Class of the task to run in this environment
+			Class<? extends AbstractInvokable> invokableClass;
 			try {
-				this.invokable.registerInputOutput();
+				final String className = tdd.getInvokableClassName();
+				invokableClass = Class.forName(className, true, userCodeClassLoader).asSubclass(AbstractInvokable.class);
 			}
-			finally {
-				currentThread.setContextClassLoader(context);
+			catch (Throwable t) {
+				throw new Exception("Could not load invokable class.", t);
 			}
-		}
 
-		List<GateDeploymentDescriptor> inGates = tdd.getInputGates();
-		List<GateDeploymentDescriptor> outGates = tdd.getOutputGates();
-		
-		
-		if (this.inputGates.size() != inGates.size()) {
-			throw new Exception("The number of readers created in 'registerInputOutput()' "
-					+ "is different than the number of connected incoming edges in the job graph.");
-		}
-		if (this.outputGates.size() != outGates.size()) {
-			throw new Exception("The number of writers created in 'registerInputOutput()' "
-					+ "is different than the number of connected outgoing edges in the job graph.");
-		}
-		
-		for (int i = 0; i < inGates.size(); i++) {
-			this.inputGates.get(i).initializeChannels(inGates.get(i));
+			try {
+				this.invokable = invokableClass.newInstance();
+			}
+			catch (Throwable t) {
+				throw new Exception("Could not instantiate the invokable class.", t);
+			}
+
+			this.jobConfiguration = tdd.getJobConfiguration();
+			this.taskConfiguration = tdd.getTaskConfiguration();
+
+			this.invokable.setEnvironment(this);
+			this.invokable.registerInputOutput();
 		}
-		for (int i = 0; i < outGates.size(); i++) {
-			this.outputGates.get(i).initializeChannels(outGates.get(i));
+		catch (Throwable t) {
+			throw new Exception("Error setting up runtime environment: " + t.getMessage(), t);
 		}
 	}
 
 	/**
-	 * Returns the invokable object that represents the Nephele task.
-	 *
-	 * @return the invokable object that represents the Nephele task
+	 * Returns the task invokable instance.
 	 */
 	public AbstractInvokable getInvokable() {
 		return this.invokable;
@@ -225,471 +185,226 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	public JobID getJobID() {
 		return this.owner.getJobID();
 	}
-	
+
 	@Override
 	public JobVertexID getJobVertexId() {
 		return this.owner.getVertexID();
 	}
 
 	@Override
-	public GateID getNextUnboundInputGateID() {
-		return this.unboundInputGateIDs.poll();
-	}
-
-	@Override
-	public OutputGate createAndRegisterOutputGate() {
-		OutputGate gate = new OutputGate(getJobID(), new GateID(), getNumberOfOutputGates());
-		this.outputGates.add(gate);
-
-		return gate;
-	}
-
-	@Override
 	public void run() {
-		// quick fail in case the task was cancelled while the tread was started
+		// quick fail in case the task was cancelled while the thread was started
 		if (owner.isCanceledOrFailed()) {
 			owner.cancelingDone();
 			return;
 		}
-		
+
 		try {
 			Thread.currentThread().setContextClassLoader(userCodeClassLoader);
-			this.invokable.invoke();
+			invokable.invoke();
 
 			// Make sure, we enter the catch block when the task has been canceled
-			if (this.owner.isCanceledOrFailed()) {
+			if (owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
-			
-			// If there is any unclosed input gate, close it and propagate close operation to corresponding output gate
-			closeInputGates();
 
-			// First, close all output gates to indicate no records will be emitted anymore
-			requestAllOutputGatesToClose();
-
-			// Wait until all input channels are closed
-			waitForInputChannelsToBeClosed();
+			// Finish the produced partitions
+			if (producedPartitions != null) {
+				for (IntermediateResultPartition partition : producedPartitions) {
+					if (partition != null) {
+						partition.finish();
+					}
+				}
+			}
 
-			// Now we wait until all output channels have written out their data and are closed
-			waitForOutputChannelsToBeClosed();
-			
-			if (this.owner.isCanceledOrFailed()) {
+			if (owner.isCanceledOrFailed()) {
 				throw new CancelTaskException();
 			}
-			
+
 			// Finally, switch execution state to FINISHED and report to job manager
 			if (!owner.markAsFinished()) {
-				throw new Exception("Could notify job manager that the task is finished.");
+				throw new Exception("Could *not* notify job manager that the task is finished.");
 			}
 		}
 		catch (Throwable t) {
-			
-			if (!this.owner.isCanceledOrFailed()) {
+			LOG.error("Error during running invokable: " + t.getMessage(), t);
 
+			if (!owner.isCanceledOrFailed()) {
 				// Perform clean up when the task failed and has been not canceled by the user
 				try {
-					this.invokable.cancel();
-				} catch (Throwable t2) {
+					invokable.cancel();
+				}
+				catch (Throwable t2) {
 					LOG.error("Error while canceling the task", t2);
 				}
 			}
 
-			// Release all resources that may currently be allocated by the individual channels
-			releaseAllChannelResources();
-
 			// if we are already set as cancelled or failed (when failure is triggered externally),
 			// mark that the thread is done.
-			if (this.owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
-				this.owner.cancelingDone();
+			if (owner.isCanceledOrFailed() || t instanceof CancelTaskException) {
+				owner.cancelingDone();
 			}
 			else {
-				// failure from inside the task thread. notify the task of teh failure
-				this.owner.markFailed(t);
+				// failure from inside the task thread. notify the task of the failure
+				owner.markFailed(t);
 			}
 		}
-		finally {
-			// Release all resources that may currently be allocated by the individual channels
-			releaseAllChannelResources();
-		}
-	}
-
-	@Override
-	public <T extends IOReadableWritable> InputGate<T> createAndRegisterInputGate() {
-		InputGate<T> gate = new InputGate<T>(getJobID(), new GateID(), getNumberOfInputGates());
-		this.inputGates.add(gate);
-
-		return gate;
-	}
-
-	public int getNumberOfOutputGates() {
-		return this.outputGates.size();
-	}
-
-	@Override
-	public int getNumberOfInputGates() {
-		return this.inputGates.size();
-	}
-
-	@Override
-	public int getNumberOfOutputChannels() {
-		int numberOfOutputChannels = 0;
-		for (int i = 0; i < this.outputGates.size(); ++i) {
-			numberOfOutputChannels += this.outputGates.get(i).getNumChannels();
-		}
-
-		return numberOfOutputChannels;
-	}
-
-	@Override
-	public int getNumberOfInputChannels() {
-		int numberOfInputChannels = 0;
-		for (int i = 0; i < this.inputGates.size(); ++i) {
-			numberOfInputChannels += this.inputGates.get(i).getNumberOfInputChannels();
-		}
-
-		return numberOfInputChannels;
 	}
 
 	/**
-	 * Returns the registered input gate with index <code>pos</code>.
-	 *
-	 * @param pos the index of the input gate to return
-	 * @return the input gate at index <code>pos</code> or <code>null</code> if no such index exists
-	 */
-	public InputGate<? extends IOReadableWritable> getInputGate(final int pos) {
-		if (pos < this.inputGates.size()) {
-			return this.inputGates.get(pos);
-		}
-
-		return null;
-	}
-
-	/**
-	 * Returns the registered output gate with index <code>pos</code>.
-	 *
-	 * @param index the index of the output gate to return
-	 * @return the output gate at index <code>pos</code> or <code>null</code> if no such index exists
-	 */
-	public OutputGate getOutputGate(int index) {
-		if (index < this.outputGates.size()) {
-			return this.outputGates.get(index);
-		}
-
-		return null;
-	}
-
-	/**
-	 * Returns the thread which is assigned to execute the user code.
-	 *
-	 * @return the thread which is assigned to execute the user code
+	 * Returns the thread, which is assigned to execute the user code.
 	 */
 	public Thread getExecutingThread() {
 		synchronized (this) {
-
-			if (this.executingThread == null) {
+			if (executingThread == null) {
 				String name = owner.getTaskNameWithSubtasks();
-				this.executingThread = new Thread(TASK_THREADS, this, name);
+
+				if (LOG.isDebugEnabled()) {
+					name = name + " (" + owner.getExecutionId() + ")";
+				}
+
+				executingThread = new Thread(TASK_THREADS, this, name);
 			}
 
-			return this.executingThread;
+			return executingThread;
 		}
 	}
-	
+
 	public void cancelExecution() {
 		if (!canceled.compareAndSet(false, true)) {
 			return;
 		}
 
-		LOG.info("Canceling " + owner.getTaskNameWithSubtasks());
+		LOG.info("Canceling {} ({}).", owner.getTaskNameWithSubtasks(), owner.getExecutionId());
 
 		// Request user code to shut down
-		if (this.invokable != null) {
+		if (invokable != null) {
 			try {
-				this.invokable.cancel();
-			} catch (Throwable e) {
-				LOG.error("Error while cancelling the task.", e);
+				invokable.cancel();
+			}
+			catch (Throwable e) {
+				LOG.error("Error while canceling the task.", e);
 			}
 		}
-		
+
 		final Thread executingThread = this.executingThread;
 		if (executingThread != null) {
 			// interrupt the running thread and wait for it to die
 			executingThread.interrupt();
-			
 			try {
 				executingThread.join(5000);
-			} catch (InterruptedException e) {}
-			
+			}
+			catch (InterruptedException e) {
+			}
 			if (!executingThread.isAlive()) {
 				return;
 			}
-			
 			// Continuously interrupt the user thread until it changed to state CANCELED
 			while (executingThread != null && executingThread.isAlive()) {
 				LOG.warn("Task " + owner.getTaskNameWithSubtasks() + " did not react to cancelling signal. Sending repeated interrupt.");
-	
 				if (LOG.isDebugEnabled()) {
 					StringBuilder bld = new StringBuilder("Task ").append(owner.getTaskNameWithSubtasks()).append(" is stuck in method:\n");
-					
 					StackTraceElement[] stack = executingThread.getStackTrace();
 					for (StackTraceElement e : stack) {
 						bld.append(e).append('\n');
 					}
 					LOG.debug(bld.toString());
 				}
-				
 				executingThread.interrupt();
-				
 				try {
 					executingThread.join(1000);
-				} catch (InterruptedException e) {}
-			}
-		}
-	}
-
-	/**
-	 * Blocks until all output channels are closed.
-	 *
-	 * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
-	 */
-	private void waitForOutputChannelsToBeClosed() throws InterruptedException {
-		// Make sure, we leave this method with an InterruptedException when the task has been canceled
-		if (this.owner.isCanceledOrFailed()) {
-			return;
-		}
-
-		for (OutputGate og : this.outputGates) {
-			og.waitForGateToBeClosed();
-		}
-	}
-
-	/**
-	 * Blocks until all input channels are closed.
-	 *
-	 * @throws IOException          thrown if an error occurred while closing the input channels
-	 * @throws InterruptedException thrown if the thread waiting for the channels to be closed is interrupted
-	 */
-	private void waitForInputChannelsToBeClosed() throws IOException, InterruptedException {
-		// Wait for disconnection of all output gates
-		while (!canceled.get()) {
-
-			// Make sure, we leave this method with an InterruptedException when the task has been canceled
-			if (this.owner.isCanceledOrFailed()) {
-				throw new InterruptedException();
-			}
-
-			boolean allClosed = true;
-			for (int i = 0; i < getNumberOfInputGates(); i++) {
-				final InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
-				if (!eig.isClosed()) {
-					allClosed = false;
 				}
-			}
-
-			if (allClosed) {
-				break;
-			}
-			else {
-				Thread.sleep(SLEEPINTERVAL);
+				catch (InterruptedException e) {
+				}
 			}
 		}
 	}
 
-	/**
-	 * Closes all input gates which are not already closed.
-	 */
-	private void closeInputGates() throws IOException, InterruptedException {
-		for (int i = 0; i < this.inputGates.size(); i++) {
-			final InputGate<? extends IOReadableWritable> eig = this.inputGates.get(i);
-			// Important: close must be called on each input gate exactly once
-			eig.close();
-		}
-
-	}
-
-	/**
-	 * Requests all output gates to be closed.
-	 */
-	private void requestAllOutputGatesToClose() throws IOException, InterruptedException {
-		for (int i = 0; i < this.outputGates.size(); i++) {
-			this.outputGates.get(i).requestClose();
-		}
+	@Override
+	public ActorRef getJobManager() {
+		return jobManager;
 	}
 
 	@Override
 	public IOManager getIOManager() {
-		return this.ioManager;
+		return ioManager;
 	}
 
 	@Override
 	public MemoryManager getMemoryManager() {
-		return this.memoryManager;
+		return memoryManager;
 	}
 
 	@Override
 	public BroadcastVariableManager getBroadcastVariableManager() {
-		return this.bcVarManager;
-	}
-	
-	@Override
-	public Configuration getTaskConfiguration() {
-		return this.taskConfiguration;
+		return broadcastVariableManager;
 	}
 
 	@Override
-	public Configuration getJobConfiguration() {
-		return this.jobConfiguration;
-	}
+	public BufferWriter getWriter(int index) {
+		checkElementIndex(index, writers.length, "Illegal environment writer request.");
 
-	@Override
-	public int getCurrentNumberOfSubtasks() {
-		return owner.getNumberOfSubtasks();
+		return writers[checkElementIndex(index, writers.length)];
 	}
 
 	@Override
-	public int getIndexInSubtaskGroup() {
-		return owner.getSubtaskIndex();
+	public BufferWriter[] getAllWriters() {
+		return writers;
 	}
 
 	@Override
-	public String getTaskName() {
-		return owner.getTaskName();
+	public BufferReader getReader(int index) {
+		checkElementIndex(index, readers.length, "Illegal environment reader request.");
+
+		return readers[index];
 	}
 
 	@Override
-	public InputSplitProvider getInputSplitProvider() {
-		return this.inputSplitProvider;
+	public BufferReader[] getAllReaders() {
+		return readers;
 	}
 
-	/**
-	 * Releases the allocated resources (particularly buffer) of input and output channels attached to this task. This
-	 * method should only be called after the respected task has stopped running.
-	 */
-	private void releaseAllChannelResources() {
-		for (int i = 0; i < this.inputGates.size(); i++) {
-			this.inputGates.get(i).releaseAllChannelResources();
-		}
-
-		for (int i = 0; i < this.outputGates.size(); i++) {
-			this.outputGates.get(i).releaseAllChannelResources();
-		}
+	public IntermediateResultPartition[] getProducedPartitions() {
+		return producedPartitions;
 	}
 
-	@Override
-	public Set<ChannelID> getOutputChannelIDs() {
-		Set<ChannelID> ids = new HashSet<ChannelID>();
-
-		for (OutputGate gate : this.outputGates) {
-			for (OutputChannel channel : gate.channels()) {
-				ids.add(channel.getID());
-			}
-		}
-
-		return Collections.unmodifiableSet(ids);
+	public BufferReader getReaderById(IntermediateDataSetID id) {
+		return readersById.get(id);
 	}
 
 	@Override
-	public Set<ChannelID> getInputChannelIDs() {
-		final Set<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
-
-		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
-		while (gateIterator.hasNext()) {
-
-			final InputGate<? extends IOReadableWritable> outputGate = gateIterator.next();
-			for (int i = 0; i < outputGate.getNumberOfInputChannels(); ++i) {
-				inputChannelIDs.add(outputGate.getInputChannel(i).getID());
-			}
-		}
-
-		return Collections.unmodifiableSet(inputChannelIDs);
+	public Configuration getTaskConfiguration() {
+		return taskConfiguration;
 	}
 
 	@Override
-	public Set<GateID> getInputGateIDs() {
-		final Set<GateID> inputGateIDs = new HashSet<GateID>();
-
-		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
-		while (gateIterator.hasNext()) {
-			inputGateIDs.add(gateIterator.next().getGateID());
-		}
-
-		return Collections.unmodifiableSet(inputGateIDs);
+	public Configuration getJobConfiguration() {
+		return jobConfiguration;
 	}
 
 	@Override
-	public Set<GateID> getOutputGateIDs() {
-		final Set<GateID> outputGateIDs = new HashSet<GateID>();
-
-		final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
-		while (gateIterator.hasNext()) {
-			outputGateIDs.add(gateIterator.next().getGateID());
-		}
-
-		return Collections.unmodifiableSet(outputGateIDs);
+	public int getNumberOfSubtasks() {
+		return owner.getNumberOfSubtasks();
 	}
 
-
 	@Override
-	public Set<ChannelID> getOutputChannelIDsOfGate(final GateID gateID) {
-		OutputGate outputGate = null;
-		final Iterator<OutputGate> gateIterator = this.outputGates.iterator();
-		while (gateIterator.hasNext()) {
-			final OutputGate candidateGate = gateIterator.next();
-			if (candidateGate.getGateID().equals(gateID)) {
-				outputGate = candidateGate;
-				break;
-			}
-		}
-
-		if (outputGate == null) {
-			throw new IllegalArgumentException("Cannot find output gate with ID " + gateID);
-		}
-
-		final Set<ChannelID> outputChannelIDs = new HashSet<ChannelID>();
-
-		for (int i = 0; i < outputGate.getNumChannels(); ++i) {
-			outputChannelIDs.add(outputGate.getChannel(i).getID());
-		}
-
-		return Collections.unmodifiableSet(outputChannelIDs);
+	public int getIndexInSubtaskGroup() {
+		return owner.getSubtaskIndex();
 	}
 
 	@Override
-	public Set<ChannelID> getInputChannelIDsOfGate(final GateID gateID) {
-		InputGate<? extends IOReadableWritable> inputGate = null;
-		final Iterator<InputGate<? extends IOReadableWritable>> gateIterator = this.inputGates.iterator();
-		while (gateIterator.hasNext()) {
-			final InputGate<? extends IOReadableWritable> candidateGate = gateIterator.next();
-			if (candidateGate.getGateID().equals(gateID)) {
-				inputGate = candidateGate;
-				break;
-			}
-		}
-
-		if (inputGate == null) {
-			throw new IllegalArgumentException("Cannot find input gate with ID " + gateID);
-		}
-
-		final Set<ChannelID> inputChannelIDs = new HashSet<ChannelID>();
-
-		for (int i = 0; i < inputGate.getNumberOfInputChannels(); ++i) {
-			inputChannelIDs.add(inputGate.getInputChannel(i).getID());
-		}
-
-		return Collections.unmodifiableSet(inputChannelIDs);
-	}
-
-	public List<OutputGate> outputGates() {
-		return this.outputGates;
+	public String getTaskName() {
+		return owner.getTaskName();
 	}
 
-	public List<InputGate<? extends IOReadableWritable>> inputGates() {
-		return this.inputGates;
+	@Override
+	public InputSplitProvider getInputSplitProvider() {
+		return inputSplitProvider;
 	}
 
 	@Override
-	public ActorRef getAccumulator() {
-		return accumulator;
+	public String getTaskNameWithSubtasks() {
+		return owner.getTaskNameWithSubtasks();
 	}
 
 	@Override
@@ -698,84 +413,15 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	}
 
 	public void addCopyTasksForCacheFile(Map<String, FutureTask<Path>> copyTasks) {
-		this.cacheCopyTasks.putAll(copyTasks);
+		cacheCopyTasks.putAll(copyTasks);
 	}
-	
+
 	public void addCopyTaskForCacheFile(String name, FutureTask<Path> copyTask) {
-		this.cacheCopyTasks.put(name, copyTask);
+		cacheCopyTasks.put(name, copyTask);
 	}
 
 	@Override
 	public Map<String, FutureTask<Path>> getCopyTask() {
-		return this.cacheCopyTasks;
-	}
-
-	@Override
-	public BufferProvider getOutputBufferProvider() {
-		return this;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                            BufferProvider methods
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public Buffer requestBuffer(int minBufferSize) throws IOException {
-		return this.outputBufferPool.requestBuffer(minBufferSize);
-	}
-
-	@Override
-	public Buffer requestBufferBlocking(int minBufferSize) throws IOException, InterruptedException {
-		return this.outputBufferPool.requestBufferBlocking(minBufferSize);
-	}
-
-	@Override
-	public int getBufferSize() {
-		return this.outputBufferPool.getBufferSize();
-	}
-
-	@Override
-	public BufferAvailabilityRegistration registerBufferAvailabilityListener(BufferAvailabilityListener listener) {
-		return this.outputBufferPool.registerBufferAvailabilityListener(listener);
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	//                                       LocalBufferPoolOwner methods
-	// -----------------------------------------------------------------------------------------------------------------
-
-	@Override
-	public int getNumberOfChannels() {
-		return getNumberOfOutputChannels();
-	}
-
-	@Override
-	public void setDesignatedNumberOfBuffers(int numBuffers) {
-		this.outputBufferPool.setNumDesignatedBuffers(numBuffers);
-	}
-
-	@Override
-	public void clearLocalBufferPool() {
-		this.outputBufferPool.destroy();
-	}
-
-	@Override
-	public void registerGlobalBufferPool(GlobalBufferPool globalBufferPool) {
-		if (this.outputBufferPool == null) {
-			this.outputBufferPool = new LocalBufferPool(globalBufferPool, 1);
-		}
-	}
-
-	@Override
-	public void logBufferUtilization() {
-		LOG.info(String.format("\t%s: %d available, %d requested, %d designated",
-				owner.getTaskNameWithSubtasks(),
-				this.outputBufferPool.numAvailableBuffers(),
-				this.outputBufferPool.numRequestedBuffers(),
-				this.outputBufferPool.numDesignatedBuffers()));
-	}
-
-	@Override
-	public void reportAsynchronousEvent() {
-		this.outputBufferPool.reportAsynchronousEvent();
+		return cacheCopyTasks;
 	}
 }


Mime
View raw message