flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsapu...@apache.org
Subject [5/5] flink git commit: [FLINK-2815] [REFACTOR] Remove Pact from class and file names since it is no longer valid reference
Date Tue, 06 Oct 2015 15:14:25 GMT
[FLINK-2815] [REFACTOR] Remove Pact from class and file names since it is no longer valid reference

Remove Pact word from class and file names in Apache Flink.
Pact was the name used in Stratosphere time to refer to concept of distributed datasets (similar to Flink Dataset). It was used when Pact and Nephele still separate concept.

As part of 0.10.0 release cleanup effort, let's remove the Pact names to avoid confusion.

The PR also contains small cleanups (sorry):
1. Small refactor DataSinkTask and DataSourceTask to follow Java7 generic convention creation new collection. Remove LOG.isDebugEnabled check.
2. Simple cleanup to update MapValue and TypeInformation with Java7 generic convention creation new collection.
3. Combine several exceptions that have same catch operation.

Apologize for the extra changes with PR. But I separated them into different commits for easier review.

Author: hsaputra <hsaputra@apache.org>

Closes #1218 from hsaputra/remove_pact_name and squashes the following commits:

b3c55b4 [hsaputra] Rename RegularTask to BatchTask per review.
e278fac [hsaputra] Address review comments from chiwanpark (good catch).
9f92f33 [hsaputra] Remove Pact from the file names of teh flink-runtime and flink-clients modules.
dbb2175 [hsaputra] Simple cleanup to update MapValue with Java7 generic for new collection. Remove unused imports in CollectionsDataTypeTest.
df2f553 [hsaputra] Use Java7 style of type resolution for new collection.
6403d44 [hsaputra] Remove the word Pact from the Javadoc for ChainedDriver.
0c562f4 [hsaputra] Small refactor on DataSinkTask and DataSourceTask classes to keep up with modern Java practice.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b08669ab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b08669ab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b08669ab

Branch: refs/heads/master
Commit: b08669abf282c52b54c395b85e992edb8ca621d4
Parents: e494c27
Author: hsaputra <hsaputra@apache.org>
Authored: Tue Oct 6 08:14:06 2015 -0700
Committer: hsaputra <hsaputra@apache.org>
Committed: Tue Oct 6 08:14:06 2015 -0700

----------------------------------------------------------------------
 .../apache/flink/client/web/JobJSONServlet.java |  163 ++
 .../flink/client/web/PactJobJSONServlet.java    |  163 --
 .../flink/client/web/WebInterfaceServer.java    |    2 +-
 .../api/common/typeinfo/TypeInformation.java    |    2 +-
 .../java/org/apache/flink/types/MapValue.java   |   18 +-
 .../flink/types/CollectionsDataTypeTest.java    |    6 -
 .../plantranslate/JobGraphGenerator.java        |   30 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../broadcast/BroadcastVariableManager.java     |   10 +-
 .../BroadcastVariableMaterialization.java       |   12 +-
 .../concurrent/SolutionSetUpdateBarrier.java    |    7 +-
 .../SolutionSetUpdateBarrierBroker.java         |    7 +-
 .../task/AbstractIterativePactTask.java         |  395 -----
 .../iterative/task/AbstractIterativeTask.java   |  395 +++++
 .../iterative/task/IterationHeadPactTask.java   |  440 -----
 .../iterative/task/IterationHeadTask.java       |  441 ++++++
 .../task/IterationIntermediatePactTask.java     |  131 --
 .../task/IterationIntermediateTask.java         |  131 ++
 .../task/IterationSynchronizationSinkTask.java  |    4 +-
 .../iterative/task/IterationTailPactTask.java   |  140 --
 .../iterative/task/IterationTailTask.java       |  140 ++
 .../jobgraph/tasks/AbstractInvokable.java       |    3 +-
 .../AbstractCachedBuildSideJoinDriver.java      |    2 +-
 .../operators/AbstractOuterJoinDriver.java      |    6 +-
 .../operators/AllGroupCombineDriver.java        |    6 +-
 .../runtime/operators/AllGroupReduceDriver.java |    6 +-
 .../runtime/operators/AllReduceDriver.java      |    6 +-
 .../flink/runtime/operators/BatchTask.java      | 1499 ++++++++++++++++++
 .../flink/runtime/operators/CoGroupDriver.java  |    6 +-
 .../runtime/operators/CoGroupRawDriver.java     |    6 +-
 .../CoGroupWithSolutionSetFirstDriver.java      |   12 +-
 .../CoGroupWithSolutionSetSecondDriver.java     |   12 +-
 .../runtime/operators/CollectorMapDriver.java   |    6 +-
 .../flink/runtime/operators/CrossDriver.java    |    6 +-
 .../flink/runtime/operators/DataSinkTask.java   |   41 +-
 .../flink/runtime/operators/DataSourceTask.java |   58 +-
 .../apache/flink/runtime/operators/Driver.java  |   90 ++
 .../flink/runtime/operators/DriverStrategy.java |   12 +-
 .../flink/runtime/operators/FlatMapDriver.java  |    6 +-
 .../operators/GroupReduceCombineDriver.java     |    6 +-
 .../runtime/operators/GroupReduceDriver.java    |    6 +-
 .../flink/runtime/operators/JoinDriver.java     |    6 +-
 .../JoinWithSolutionSetFirstDriver.java         |   12 +-
 .../JoinWithSolutionSetSecondDriver.java        |   12 +-
 .../flink/runtime/operators/MapDriver.java      |    6 +-
 .../runtime/operators/MapPartitionDriver.java   |    6 +-
 .../flink/runtime/operators/NoOpDriver.java     |    6 +-
 .../flink/runtime/operators/PactDriver.java     |   90 --
 .../runtime/operators/PactTaskContext.java      |   70 -
 .../runtime/operators/ReduceCombineDriver.java  |    6 +-
 .../flink/runtime/operators/ReduceDriver.java   |    6 +-
 .../runtime/operators/RegularPactTask.java      | 1499 ------------------
 .../runtime/operators/ResettableDriver.java     |   44 +
 .../runtime/operators/ResettablePactDriver.java |   44 -
 .../flink/runtime/operators/TaskContext.java    |   70 +
 .../operators/UnionWithTempOperator.java        |    6 +-
 .../chaining/ChainedAllReduceDriver.java        |    8 +-
 .../chaining/ChainedCollectorMapDriver.java     |    8 +-
 .../operators/chaining/ChainedDriver.java       |    8 +-
 .../chaining/ChainedFlatMapDriver.java          |    8 +-
 .../operators/chaining/ChainedMapDriver.java    |    8 +-
 .../chaining/GroupCombineChainedDriver.java     |    8 +-
 .../SynchronousChainedCombineDriver.java        |    8 +-
 .../runtime/operators/util/TaskConfig.java      |    8 +-
 .../TaskDeploymentDescriptorTest.java           |    4 +-
 .../ExecutionGraphDeploymentTest.java           |   16 +-
 .../operators/chaining/ChainTaskTest.java       |   10 +-
 .../operators/drivers/TestTaskContext.java      |    4 +-
 .../testutils/BinaryOperatorTestBase.java       |   20 +-
 .../operators/testutils/DriverTestBase.java     |   20 +-
 .../operators/testutils/TaskTestBase.java       |    4 +-
 .../testutils/UnaryOperatorTestBase.java        |   20 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 .../flink/tez/runtime/RegularProcessor.java     |    6 +-
 .../org/apache/flink/tez/runtime/TezTask.java   |   18 +-
 .../src/test/resources/logback-test.xml         |    2 +-
 flink-tests/src/test/resources/logback-test.xml |    2 +-
 78 files changed, 3236 insertions(+), 3275 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
new file mode 100644
index 0000000..77250d3
--- /dev/null
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/JobJSONServlet.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.client.web;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintWriter;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.client.program.PackagedProgram;
+
+
+public class JobJSONServlet extends HttpServlet {
+	
+	/** Serial UID for serialization interoperability. */
+	private static final long serialVersionUID = 558077298726449201L;
+	
+	private static final Logger LOG = LoggerFactory.getLogger(JobJSONServlet.class);
+
+	// ------------------------------------------------------------------------
+
+	private static final String JOB_PARAM_NAME = "job";
+
+	private static final String CLASS_PARAM_NAME = "assemblerClass";
+
+	// ------------------------------------------------------------------------
+
+	private final File jobStoreDirectory; // the directory in which the jobs are stored
+
+	public JobJSONServlet(File jobStoreDirectory) {
+		this.jobStoreDirectory = jobStoreDirectory;
+	}
+
+	@Override
+	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
+		resp.setContentType("application/json");
+
+		String jobName = req.getParameter(JOB_PARAM_NAME);
+		if (jobName == null) {
+			LOG.warn("Received request without job parameter name.");
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			return;
+		}
+
+		// check, if the jar exists
+		File jarFile = new File(jobStoreDirectory, jobName);
+		if (!jarFile.exists()) {
+			LOG.warn("Received request for non-existing jar file.");
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			return;
+		}
+
+		// create the pact plan
+		PackagedProgram pactProgram;
+		try {
+			pactProgram = new PackagedProgram(jarFile, req.getParameter(CLASS_PARAM_NAME), new String[0]);
+		}
+		catch (Throwable t) {
+			LOG.info("Instantiating the PactProgram for '" + jarFile.getName() + "' failed.", t);
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			resp.getWriter().print(t.getMessage());
+			return;
+		}
+		
+		String jsonPlan = null;
+		String programDescription = null;
+		
+		try {
+			jsonPlan = pactProgram.getPreviewPlan();
+		}
+		catch (Throwable t) {
+			LOG.error("Failed to create json dump of pact program.", t);
+		}
+		
+		try {
+			programDescription = pactProgram.getDescription();
+		}
+		catch (Throwable t) {
+			LOG.error("Failed to create description of pact program.", t);
+		}
+			
+		if (jsonPlan == null && programDescription == null) {
+			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
+			return;
+		} else {
+			resp.setStatus(HttpServletResponse.SC_OK);
+			PrintWriter wrt = resp.getWriter();
+			wrt.print("{ \"jobname\": \"");
+			wrt.print(jobName);
+			if (jsonPlan != null) {
+				wrt.print("\", \"plan\": ");
+				wrt.println(jsonPlan);
+			}
+			if (programDescription != null) {
+				wrt.print(", \"description\": \"");
+				wrt.print(escapeString(programDescription));
+				wrt.print("\"");
+			}
+			
+			wrt.println("}");
+		}
+	}
+
+	protected String escapeString(String str) {
+		int len = str.length();
+		char[] s = str.toCharArray();
+		StringBuilder sb = new StringBuilder();
+
+		for (int i = 0; i < len; i += 1) {
+			char c = s[i];
+			if ((c == '\\') || (c == '"') || (c == '/')) {
+				sb.append('\\');
+				sb.append(c);
+			}
+			else if (c == '\b') {
+				sb.append("\\b");
+			} else if (c == '\t') {
+				sb.append("\\t");
+			} else if (c == '\n') {
+				sb.append("<br>");
+			} else if (c == '\f') {
+				sb.append("\\f");
+			} else if (c == '\r') {
+				sb.append("\\r");
+			} else if (c == '>') {
+				sb.append("&gt;");
+			} else if (c == '<') {
+				sb.append("&lt;");
+			} else {
+				if (c < ' ') {
+					// Unreadable throw away
+				} else {
+					sb.append(c);
+				}
+			}
+		}
+
+		return sb.toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java b/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
deleted file mode 100644
index 019fc50..0000000
--- a/flink-clients/src/main/java/org/apache/flink/client/web/PactJobJSONServlet.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.client.web;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.PrintWriter;
-
-import javax.servlet.ServletException;
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.client.program.PackagedProgram;
-
-
-public class PactJobJSONServlet extends HttpServlet {
-	
-	/** Serial UID for serialization interoperability. */
-	private static final long serialVersionUID = 558077298726449201L;
-	
-	private static final Logger LOG = LoggerFactory.getLogger(PactJobJSONServlet.class);
-
-	// ------------------------------------------------------------------------
-
-	private static final String JOB_PARAM_NAME = "job";
-
-	private static final String CLASS_PARAM_NAME = "assemblerClass";
-
-	// ------------------------------------------------------------------------
-
-	private final File jobStoreDirectory; // the directory in which the jobs are stored
-
-	public PactJobJSONServlet(File jobStoreDirectory) {
-		this.jobStoreDirectory = jobStoreDirectory;
-	}
-
-	@Override
-	protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
-		resp.setContentType("application/json");
-
-		String jobName = req.getParameter(JOB_PARAM_NAME);
-		if (jobName == null) {
-			LOG.warn("Received request without job parameter name.");
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			return;
-		}
-
-		// check, if the jar exists
-		File jarFile = new File(jobStoreDirectory, jobName);
-		if (!jarFile.exists()) {
-			LOG.warn("Received request for non-existing jar file.");
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			return;
-		}
-
-		// create the pact plan
-		PackagedProgram pactProgram;
-		try {
-			pactProgram = new PackagedProgram(jarFile, req.getParameter(CLASS_PARAM_NAME), new String[0]);
-		}
-		catch (Throwable t) {
-			LOG.info("Instantiating the PactProgram for '" + jarFile.getName() + "' failed.", t);
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			resp.getWriter().print(t.getMessage());
-			return;
-		}
-		
-		String jsonPlan = null;
-		String programDescription = null;
-		
-		try {
-			jsonPlan = pactProgram.getPreviewPlan();
-		}
-		catch (Throwable t) {
-			LOG.error("Failed to create json dump of pact program.", t);
-		}
-		
-		try {
-			programDescription = pactProgram.getDescription();
-		}
-		catch (Throwable t) {
-			LOG.error("Failed to create description of pact program.", t);
-		}
-			
-		if (jsonPlan == null && programDescription == null) {
-			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
-			return;
-		} else {
-			resp.setStatus(HttpServletResponse.SC_OK);
-			PrintWriter wrt = resp.getWriter();
-			wrt.print("{ \"jobname\": \"");
-			wrt.print(jobName);
-			if (jsonPlan != null) {
-				wrt.print("\", \"plan\": ");
-				wrt.println(jsonPlan);
-			}
-			if (programDescription != null) {
-				wrt.print(", \"description\": \"");
-				wrt.print(escapeString(programDescription));
-				wrt.print("\"");
-			}
-			
-			wrt.println("}");
-		}
-	}
-
-	protected String escapeString(String str) {
-		int len = str.length();
-		char[] s = str.toCharArray();
-		StringBuilder sb = new StringBuilder();
-
-		for (int i = 0; i < len; i += 1) {
-			char c = s[i];
-			if ((c == '\\') || (c == '"') || (c == '/')) {
-				sb.append('\\');
-				sb.append(c);
-			}
-			else if (c == '\b') {
-				sb.append("\\b");
-			} else if (c == '\t') {
-				sb.append("\\t");
-			} else if (c == '\n') {
-				sb.append("<br>");
-			} else if (c == '\f') {
-				sb.append("\\f");
-			} else if (c == '\r') {
-				sb.append("\\r");
-			} else if (c == '>') {
-				sb.append("&gt;");
-			} else if (c == '<') {
-				sb.append("&lt;");
-			} else {
-				if (c < ' ') {
-					// Unreadable throw away
-				} else {
-					sb.append(c);
-				}
-			}
-		}
-
-		return sb.toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
index bab3cf7..68a0706 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/web/WebInterfaceServer.java
@@ -144,7 +144,7 @@ public class WebInterfaceServer {
 		CliFrontend cli = new CliFrontend(configDir);
 		ServletContextHandler servletContext = new ServletContextHandler(ServletContextHandler.SESSIONS);
 		servletContext.setContextPath("/");
-		servletContext.addServlet(new ServletHolder(new PactJobJSONServlet(uploadDir)), "/pactPlan");
+		servletContext.addServlet(new ServletHolder(new JobJSONServlet(uploadDir)), "/pactPlan");
 		servletContext.addServlet(new ServletHolder(new PlanDisplayServlet(jobManagerWebPort)), "/showPlan");
 		servletContext.addServlet(new ServletHolder(new JobsServlet(uploadDir, tmpDir, "launch.html")), "/jobs");
 		servletContext.addServlet(new ServletHolder(new JobSubmissionServlet(cli, uploadDir, planDumpDir)), "/runJob");

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
index 309c968..07d8544 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/typeinfo/TypeInformation.java
@@ -119,7 +119,7 @@ public abstract class TypeInformation<T> implements Serializable {
 	 */
 	public List<TypeInformation<?>> getGenericParameters() {
 		// Return an empty list as the default implementation
-		return new LinkedList<TypeInformation<?>>();
+		return new LinkedList<>();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/main/java/org/apache/flink/types/MapValue.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/types/MapValue.java b/flink-core/src/main/java/org/apache/flink/types/MapValue.java
index ed5b4e1..a6cafd2 100644
--- a/flink-core/src/main/java/org/apache/flink/types/MapValue.java
+++ b/flink-core/src/main/java/org/apache/flink/types/MapValue.java
@@ -31,7 +31,7 @@ import org.apache.flink.util.ReflectionUtil;
 
 /**
  * Generic map base type for PACT programs that implements the Value and Map interfaces.
- * PactMap encapsulates a Java HashMap object.
+ * The {@link MapValue} encapsulates a Java {@link HashMap} object.
  * 
  * @see org.apache.flink.types.Value
  * @see java.util.Map
@@ -56,10 +56,10 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
 	 * Initializes the encapsulated map with an empty HashMap.
 	 */
 	public MapValue() {
-		this.keyClass = ReflectionUtil.<K> getTemplateType1(this.getClass());
-		this.valueClass = ReflectionUtil.<V> getTemplateType2(this.getClass());
+		this.keyClass = ReflectionUtil.getTemplateType1(this.getClass());
+		this.valueClass = ReflectionUtil.getTemplateType2(this.getClass());
 
-		this.map = new HashMap<K, V>();
+		this.map = new HashMap<>();
 	}
 
 	/**
@@ -68,10 +68,10 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
 	 * @param map Map holding all entries with which the new encapsulated map is filled.
 	 */
 	public MapValue(Map<K, V> map) {
-		this.keyClass = ReflectionUtil.<K> getTemplateType1(this.getClass());
-		this.valueClass = ReflectionUtil.<V> getTemplateType2(this.getClass());
+		this.keyClass = ReflectionUtil.getTemplateType1(this.getClass());
+		this.valueClass = ReflectionUtil.getTemplateType2(this.getClass());
 
-		this.map = new HashMap<K, V>(map);
+		this.map = new HashMap<>(map);
 	}
 
 	@Override
@@ -87,9 +87,7 @@ public abstract class MapValue<K extends Value, V extends Value> implements Valu
 				val.read(in);
 				this.map.put(key, val);
 			}
-		} catch (final InstantiationException e) {
-			throw new RuntimeException(e);
-		} catch (final IllegalAccessException e) {
+		} catch (final InstantiationException | IllegalAccessException e) {
 			throw new RuntimeException(e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
index b631799..5c81e4a 100644
--- a/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
+++ b/flink-core/src/test/java/org/apache/flink/types/CollectionsDataTypeTest.java
@@ -32,12 +32,6 @@ import org.junit.Assert;
 
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
 import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.ListValue;
-import org.apache.flink.types.MapValue;
-import org.apache.flink.types.Pair;
-import org.apache.flink.types.StringValue;
 import org.junit.Before;
 import org.junit.Test;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
index 3567fad..c15e47a 100644
--- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
+++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java
@@ -55,10 +55,10 @@ import org.apache.flink.optimizer.util.Utils;
 import org.apache.flink.runtime.io.network.DataExchangeMode;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationIntermediateTask;
 import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
 import org.apache.flink.runtime.jobgraph.JobEdge;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -66,17 +66,17 @@ import org.apache.flink.runtime.jobgraph.InputFormatVertex;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.CoGroupDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.CoGroupWithSolutionSetSecondDriver;
 import org.apache.flink.runtime.operators.DataSinkTask;
 import org.apache.flink.runtime.operators.DataSourceTask;
 import org.apache.flink.runtime.operators.DriverStrategy;
+import org.apache.flink.runtime.operators.JoinDriver;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetFirstDriver;
 import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.JoinDriver;
 import org.apache.flink.runtime.operators.NoOpDriver;
-import org.apache.flink.runtime.operators.RegularPactTask;
 import org.apache.flink.runtime.operators.chaining.ChainedDriver;
 import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
 import org.apache.flink.runtime.operators.util.LocalStrategy;
@@ -829,7 +829,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		} else {
 			// create task vertex
 			vertex = new JobVertex(taskName);
-			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+			vertex.setInvokableClass((this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
 			
 			config = new TaskConfig(vertex.getConfiguration());
 			config.setDriver(ds.getDriverClass());
@@ -854,7 +854,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		final DriverStrategy ds = node.getDriverStrategy();
 		final JobVertex vertex = new JobVertex(taskName);
 		final TaskConfig config = new TaskConfig(vertex.getConfiguration());
-		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediatePactTask.class : RegularPactTask.class);
+		vertex.setInvokableClass( (this.currentIteration != null && node.isOnDynamicPath()) ? IterationIntermediateTask.class : BatchTask.class);
 		
 		// set user code
 		config.setStubWrapper(node.getProgramOperator().getUserCodeWrapper());
@@ -951,7 +951,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 			
 			// reset the vertex type to iteration head
-			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			toReturn = null;
 		} else {
@@ -959,7 +959,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
 			headVertex = new JobVertex("PartialSolution ("+iteration.getNodeName()+")");
-			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
 			toReturn = headVertex;
@@ -1019,7 +1019,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			}
 			
 			// reset the vertex type to iteration head
-			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			toReturn = null;
 		} else {
@@ -1027,7 +1027,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 			// everything else happens in the post visit, after the input (the initial partial solution)
 			// is connected.
 			headVertex = new JobVertex("IterationHead("+iteration.getNodeName()+")");
-			headVertex.setInvokableClass(IterationHeadPactTask.class);
+			headVertex.setInvokableClass(IterationHeadTask.class);
 			headConfig = new TaskConfig(headVertex.getConfiguration());
 			headConfig.setDriver(NoOpDriver.class);
 			toReturn = headVertex;
@@ -1310,7 +1310,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 		// No following termination criterion
 		if (rootOfStepFunction.getOutgoingChannels().isEmpty()) {
 			
-			rootOfStepFunctionVertex.setInvokableClass(IterationTailPactTask.class);
+			rootOfStepFunctionVertex.setInvokableClass(IterationTailTask.class);
 			
 			tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
 		}
@@ -1337,7 +1337,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
 			}
 			
-			rootOfTerminationCriterionVertex.setInvokableClass(IterationTailPactTask.class);
+			rootOfTerminationCriterionVertex.setInvokableClass(IterationTailTask.class);
 			// Hack
 			tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
 			tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
@@ -1457,7 +1457,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				worksetTailConfig.setIsWorksetUpdate();
 				
 				if (hasWorksetTail) {
-					nextWorksetVertex.setInvokableClass(IterationTailPactTask.class);
+					nextWorksetVertex.setInvokableClass(IterationTailTask.class);
 					
 					worksetTailConfig.setOutputSerializer(iterNode.getWorksetSerializer());
 				}
@@ -1481,7 +1481,7 @@ public class JobGraphGenerator implements Visitor<PlanNode> {
 				solutionDeltaConfig.setIsSolutionSetUpdate();
 				
 				if (hasSolutionSetTail) {
-					solutionDeltaVertex.setInvokableClass(IterationTailPactTask.class);
+					solutionDeltaVertex.setInvokableClass(IterationTailTask.class);
 					
 					solutionDeltaConfig.setOutputSerializer(iterNode.getSolutionSetSerializer());
 					

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime-web/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-runtime-web/src/test/resources/logback-test.xml b/flink-runtime-web/src/test/resources/logback-test.xml
index 9d4f644..2235251 100644
--- a/flink-runtime-web/src/test/resources/logback-test.xml
+++ b/flink-runtime-web/src/test/resources/logback-test.xml
@@ -31,7 +31,7 @@
          throw error to test failing scenarios. Logging those would overflow the log. -->
          <!---->
     <logger name="org.apache.flink.runtime.operators.DataSinkTask" level="OFF"/>
-    <logger name="org.apache.flink.runtime.operators.RegularPactTask" level="OFF"/>
+    <logger name="org.apache.flink.runtime.operators.BatchTask" level="OFF"/>
     <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
     <logger name="org.apache.flink.runtime.taskmanager.Task" level="OFF"/>
     <logger name="org.apache.flink.runtime.jobmanager.JobManager" level="OFF"/>

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
index 660a62c..7d0454e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableManager.java
@@ -24,7 +24,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 
 public class BroadcastVariableManager {
 	
@@ -33,7 +33,7 @@ public class BroadcastVariableManager {
 	
 	// --------------------------------------------------------------------------------------------
 	
-	public <T> BroadcastVariableMaterialization<T, ?> materializeBroadcastVariable(String name, int superstep, RegularPactTask<?, ?> holder, 
+	public <T> BroadcastVariableMaterialization<T, ?> materializeBroadcastVariable(String name, int superstep, BatchTask<?, ?> holder,
 			MutableReader<?> reader, TypeSerializerFactory<T> serializerFactory) throws IOException
 	{
 		final BroadcastVariableKey key = new BroadcastVariableKey(holder.getEnvironment().getJobVertexId(), name, superstep);
@@ -77,12 +77,12 @@ public class BroadcastVariableManager {
 	}
 	
 	
-	public void releaseReference(String name, int superstep, RegularPactTask<?, ?> referenceHolder) {
+	public void releaseReference(String name, int superstep, BatchTask<?, ?> referenceHolder) {
 		BroadcastVariableKey key = new BroadcastVariableKey(referenceHolder.getEnvironment().getJobVertexId(), name, superstep);
 		releaseReference(key, referenceHolder);
 	}
 	
-	public void releaseReference(BroadcastVariableKey key, RegularPactTask<?, ?> referenceHolder) {
+	public void releaseReference(BroadcastVariableKey key, BatchTask<?, ?> referenceHolder) {
 		BroadcastVariableMaterialization<?, ?> mat = variables.get(key);
 		
 		// release this reference
@@ -93,7 +93,7 @@ public class BroadcastVariableManager {
 	}
 	
 	
-	public void releaseAllReferencesFromTask(RegularPactTask<?, ?> referenceHolder) {
+	public void releaseAllReferencesFromTask(BatchTask<?, ?> referenceHolder) {
 		// go through all registered variables 
 		for (Map.Entry<BroadcastVariableKey, BroadcastVariableMaterialization<?, ?>> entry : variables.entrySet()) {
 			BroadcastVariableMaterialization<?, ?> mat = entry.getValue();

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
index c4dd8a9..86e0111 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/broadcast/BroadcastVariableMaterialization.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.common.functions.BroadcastVariableInitializer;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
 import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.operators.RegularPactTask;
+import org.apache.flink.runtime.operators.BatchTask;
 import org.apache.flink.runtime.operators.util.ReaderIterator;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.slf4j.Logger;
@@ -44,7 +44,7 @@ public class BroadcastVariableMaterialization<T, C> {
 	private static final Logger LOG = LoggerFactory.getLogger(BroadcastVariableMaterialization.class);
 	
 	
-	private final Set<RegularPactTask<?, ?>> references = new HashSet<RegularPactTask<?,?>>();
+	private final Set<BatchTask<?, ?>> references = new HashSet<BatchTask<?,?>>();
 	
 	private final Object materializationMonitor = new Object();
 	
@@ -65,7 +65,7 @@ public class BroadcastVariableMaterialization<T, C> {
 
 	// --------------------------------------------------------------------------------------------
 	
-	public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, RegularPactTask<?, ?> referenceHolder)
+	public void materializeVariable(MutableReader<?> reader, TypeSerializerFactory<?> serializerFactory, BatchTask<?, ?> referenceHolder)
 			throws MaterializationExpiredException, IOException
 	{
 		Preconditions.checkNotNull(reader);
@@ -156,15 +156,15 @@ public class BroadcastVariableMaterialization<T, C> {
 		}
 	}
 	
-	public boolean decrementReference(RegularPactTask<?, ?> referenceHolder) {
+	public boolean decrementReference(BatchTask<?, ?> referenceHolder) {
 		return decrementReferenceInternal(referenceHolder, true);
 	}
 	
-	public boolean decrementReferenceIfHeld(RegularPactTask<?, ?> referenceHolder) {
+	public boolean decrementReferenceIfHeld(BatchTask<?, ?> referenceHolder) {
 		return decrementReferenceInternal(referenceHolder, false);
 	}
 	
-	private boolean decrementReferenceInternal(RegularPactTask<?, ?> referenceHolder, boolean errorIfNoReference) {
+	private boolean decrementReferenceInternal(BatchTask<?, ?> referenceHolder, boolean errorIfNoReference) {
 		synchronized (references) {
 			if (disposed || references.isEmpty()) {
 				if (errorIfNoReference) {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
index bc11d3f..ffc74d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrier.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
 import java.util.concurrent.CountDownLatch;
 
 /**
  * Resettable barrier to synchronize the
- * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} and
- * the {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask} in case of
+ * {@link IterationHeadTask} and
+ * the {@link IterationTailTask} in case of
  * iterations that contain a separate solution set tail.
  */
 public class SolutionSetUpdateBarrier {

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
index abbecde..352a262 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/concurrent/SolutionSetUpdateBarrierBroker.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.runtime.iterative.concurrent;
 
+import org.apache.flink.runtime.iterative.task.IterationHeadTask;
+import org.apache.flink.runtime.iterative.task.IterationTailTask;
+
 /**
  * Broker to hand over {@link SolutionSetUpdateBarrier} from 
- * {@link org.apache.flink.runtime.iterative.task.IterationHeadPactTask} to
- * {@link org.apache.flink.runtime.iterative.task.IterationTailPactTask}.
+ * {@link IterationHeadTask} to
+ * {@link IterationTailTask}.
  */
 public class SolutionSetUpdateBarrierBroker extends Broker<SolutionSetUpdateBarrier> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
deleted file mode 100644
index efe74f9..0000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ /dev/null
@@ -1,395 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.iterative.task;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.core.fs.Path;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.common.operators.util.JoinHashMap;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.core.memory.DataOutputView;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.io.network.api.reader.MutableReader;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
-import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
-import org.apache.flink.runtime.iterative.concurrent.Broker;
-import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
-import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
-import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
-import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
-import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
-import org.apache.flink.runtime.operators.PactDriver;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.ResettablePactDriver;
-import org.apache.flink.runtime.operators.hash.CompactingHashTable;
-import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.InstantiationUtil;
-import org.apache.flink.util.MutableObjectIterator;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-/**
- * The abstract base class for all tasks able to participate in an iteration.
- */
-public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
-		implements Terminable
-{
-	private static final Logger log = LoggerFactory.getLogger(AbstractIterativePactTask.class);
-	
-	protected LongSumAggregator worksetAggregator;
-
-	protected BlockingBackChannel worksetBackChannel;
-
-	protected boolean isWorksetIteration;
-
-	protected boolean isWorksetUpdate;
-
-	protected boolean isSolutionSetUpdate;
-	
-
-	private RuntimeAggregatorRegistry iterationAggregators;
-
-	private String brokerKey;
-
-	private int superstepNum = 1;
-	
-	private volatile boolean terminationRequested;
-
-	// --------------------------------------------------------------------------------------------
-	// Main life cycle methods that implement the iterative behavior
-	// --------------------------------------------------------------------------------------------
-
-	@Override
-	protected void initialize() throws Exception {
-		super.initialize();
-
-		// check if the driver is resettable
-		if (this.driver instanceof ResettablePactDriver) {
-			final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
-			// make sure that the according inputs are not reseted
-			for (int i = 0; i < resDriver.getNumberOfInputs(); i++) {
-				if (resDriver.isInputResettable(i)) {
-					excludeFromReset(i);
-				}
-			}
-		}
-		
-		TaskConfig config = getLastTasksConfig();
-		isWorksetIteration = config.getIsWorksetIteration();
-		isWorksetUpdate = config.getIsWorksetUpdate();
-		isSolutionSetUpdate = config.getIsSolutionSetUpdate();
-
-		if (isWorksetUpdate) {
-			worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
-
-			if (isWorksetIteration) {
-				worksetAggregator = getIterationAggregators().getAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
-
-				if (worksetAggregator == null) {
-					throw new RuntimeException("Missing workset elements count aggregator.");
-				}
-			}
-		}
-	}
-
-	@Override
-	public void run() throws Exception {
-		if (inFirstIteration()) {
-			if (this.driver instanceof ResettablePactDriver) {
-				// initialize the repeatable driver
-				((ResettablePactDriver<?, ?>) this.driver).initialize();
-			}
-		} else {
-			reinstantiateDriver();
-			resetAllInputs();
-			
-			// re-read the iterative broadcast variables
-			for (int i : this.iterativeBroadcastInputs) {
-				final String name = getTaskConfig().getBroadcastInputName(i);
-				readAndSetBroadcastInput(i, name, this.runtimeUdfContext, superstepNum);
-			}
-		}
-
-		// call the parent to execute the superstep
-		super.run();
-		
-		// release the iterative broadcast variables
-		for (int i : this.iterativeBroadcastInputs) {
-			final String name = getTaskConfig().getBroadcastInputName(i);
-			releaseBroadcastVariables(name, superstepNum, this.runtimeUdfContext);
-		}
-	}
-
-	@Override
-	protected void closeLocalStrategiesAndCaches() {
-		try {
-			super.closeLocalStrategiesAndCaches();
-		}
-		finally {
-			if (this.driver instanceof ResettablePactDriver) {
-				final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
-				try {
-					resDriver.teardown();
-				} catch (Throwable t) {
-					log.error("Error while shutting down an iterative operator.", t);
-				}
-			}
-		}
-	}
-
-	@Override
-	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
-		Environment env = getEnvironment();
-		return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(),
-				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
-				env.getDistributedCacheEntries(), this.accumulatorMap);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	// Utility Methods for Iteration Handling
-	// --------------------------------------------------------------------------------------------
-
-	protected boolean inFirstIteration() {
-		return this.superstepNum == 1;
-	}
-
-	protected int currentIteration() {
-		return this.superstepNum;
-	}
-
-	protected void incrementIterationCounter() {
-		this.superstepNum++;
-	}
-
-	public String brokerKey() {
-		if (brokerKey == null) {
-			int iterationId = config.getIterationId();
-			brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
-					getEnvironment().getIndexInSubtaskGroup();
-		}
-		return brokerKey;
-	}
-
-	private void reinstantiateDriver() throws Exception {
-		if (this.driver instanceof ResettablePactDriver) {
-			final ResettablePactDriver<?, ?> resDriver = (ResettablePactDriver<?, ?>) this.driver;
-			resDriver.reset();
-		} else {
-			Class<? extends PactDriver<S, OT>> driverClass = this.config.getDriver();
-			this.driver = InstantiationUtil.instantiate(driverClass, PactDriver.class);
-
-			try {
-				this.driver.setup(this);
-			}
-			catch (Throwable t) {
-				throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskName() +
-						"' , caused an error: " + t.getMessage(), t);
-			}
-		}
-	}
-
-	public RuntimeAggregatorRegistry getIterationAggregators() {
-		if (this.iterationAggregators == null) {
-			this.iterationAggregators = IterationAggregatorBroker.instance().get(brokerKey());
-		}
-		return this.iterationAggregators;
-	}
-
-	protected void verifyEndOfSuperstepState() throws IOException {
-		// sanity check that there is at least one iterative input reader
-		if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
-			throw new IllegalStateException("Error: Iterative task without a single iterative input.");
-		}
-
-		for (int inputNum : this.iterativeInputs) {
-			MutableReader<?> reader = this.inputReaders[inputNum];
-
-			if (!reader.isFinished()) {
-				if (reader.hasReachedEndOfSuperstep()) {
-					reader.startNextSuperstep();
-				}
-				else {
-					// need to read and drop all non-consumed data until we reach the end-of-superstep
-					@SuppressWarnings("unchecked")
-					MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
-					Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
-					while ((o = inIter.next(o)) != null);
-					
-					if (!reader.isFinished()) {
-						// also reset the end-of-superstep state
-						reader.startNextSuperstep();
-					}
-				}
-			}
-		}
-		
-		for (int inputNum : this.iterativeBroadcastInputs) {
-			MutableReader<?> reader = this.broadcastInputReaders[inputNum];
-
-			if (!reader.isFinished()) {
-				
-				// sanity check that the BC input is at the end of the superstep
-				if (!reader.hasReachedEndOfSuperstep()) {
-					throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
-				}
-				
-				reader.startNextSuperstep();
-			}
-		}
-	}
-
-	@Override
-	public boolean terminationRequested() {
-		return this.terminationRequested;
-	}
-
-	@Override
-	public void requestTermination() {
-		this.terminationRequested = true;
-	}
-
-	@Override
-	public void cancel() throws Exception {
-		requestTermination();
-		super.cancel();
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Iteration State Update Handling
-	// -----------------------------------------------------------------------------------------------------------------
-
-	/**
-	 * Creates a new {@link WorksetUpdateOutputCollector}.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediatePactTask} or {@link IterationTailPactTask} to update the
-	 * workset.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
-	 * collect(T) of the delegate.
-	 *
-	 * @param delegate null -OR- the delegate on which to call collect() by the newly created collector
-	 * @return a new {@link WorksetUpdateOutputCollector}
-	 */
-	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
-		DataOutputView outputView = worksetBackChannel.getWriteEnd();
-		TypeSerializer<OT> serializer = getOutputSerializer();
-		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
-	}
-
-	protected Collector<OT> createWorksetUpdateOutputCollector() {
-		return createWorksetUpdateOutputCollector(null);
-	}
-
-	/**
-	 * Creates a new solution set update output collector.
-	 * <p>
-	 * This collector is used by {@link IterationIntermediatePactTask} or {@link IterationTailPactTask} to update the
-	 * solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
-	 * {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
-	 * {@link SolutionSetUpdateOutputCollector} is created.
-	 * <p>
-	 * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
-	 * collect(T) of the delegate.
-	 *
-	 * @param delegate null -OR- a delegate collector to be called by the newly created collector
-	 * @return a new {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
-	 * {@link SolutionSetUpdateOutputCollector}
-	 */
-	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
-		Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
-		
-		Object ss = solutionSetBroker.get(brokerKey());
-		if (ss instanceof CompactingHashTable) {
-			@SuppressWarnings("unchecked")
-			CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
-			return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
-		}
-		else if (ss instanceof JoinHashMap) {
-			@SuppressWarnings("unchecked")
-			JoinHashMap<OT> map = (JoinHashMap<OT>) ss;
-			return new SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
-		} else {
-			throw new RuntimeException("Unrecognized solution set handle: " + ss);
-		}
-	}
-
-	/**
-	 * @return output serializer of this task
-	 */
-	private TypeSerializer<OT> getOutputSerializer() {
-		TypeSerializerFactory<OT> serializerFactory;
-
-		if ((serializerFactory = getLastTasksConfig().getOutputSerializer(getUserCodeClassLoader())) ==
-				null) {
-			throw new RuntimeException("Missing output serializer for workset update.");
-		}
-
-		return serializerFactory.getSerializer();
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
-
-		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
-										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks,
-										Map<String, Accumulator<?,?>> accumulatorMap) {
-			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap);
-		}
-
-		@Override
-		public int getSuperstepNumber() {
-			return AbstractIterativePactTask.this.superstepNum;
-		}
-
-		@Override
-		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-			return getIterationAggregators().<T>getAggregator(name);
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public <T extends Value> T getPreviousIterationAggregate(String name) {
-			return (T) getIterationAggregators().getPreviousGlobalAggregate(name);
-		}
-
-		@Override
-		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) {
-			// only add accumulator on first iteration
-			if (inFirstIteration()) {
-				super.addAccumulator(name, newAccumulator);
-			}
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/b08669ab/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
new file mode 100644
index 0000000..215111b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativeTask.java
@@ -0,0 +1,395 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.iterative.task;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.operators.BatchTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.common.operators.util.JoinHashMap;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.io.network.api.reader.MutableReader;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannel;
+import org.apache.flink.runtime.iterative.concurrent.BlockingBackChannelBroker;
+import org.apache.flink.runtime.iterative.concurrent.Broker;
+import org.apache.flink.runtime.iterative.concurrent.IterationAggregatorBroker;
+import org.apache.flink.runtime.iterative.concurrent.SolutionSetBroker;
+import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
+import org.apache.flink.runtime.iterative.io.SolutionSetObjectsUpdateOutputCollector;
+import org.apache.flink.runtime.iterative.io.SolutionSetUpdateOutputCollector;
+import org.apache.flink.runtime.iterative.io.WorksetUpdateOutputCollector;
+import org.apache.flink.runtime.operators.Driver;
+import org.apache.flink.runtime.operators.ResettableDriver;
+import org.apache.flink.runtime.operators.hash.CompactingHashTable;
+import org.apache.flink.runtime.operators.util.DistributedRuntimeUDFContext;
+import org.apache.flink.runtime.operators.util.TaskConfig;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.MutableObjectIterator;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+/**
+ * The abstract base class for all tasks able to participate in an iteration.
+ */
+public abstract class AbstractIterativeTask<S extends Function, OT> extends BatchTask<S, OT>
+		implements Terminable
+{
+	private static final Logger log = LoggerFactory.getLogger(AbstractIterativeTask.class);
+	
+	protected LongSumAggregator worksetAggregator;
+
+	protected BlockingBackChannel worksetBackChannel;
+
+	protected boolean isWorksetIteration;
+
+	protected boolean isWorksetUpdate;
+
+	protected boolean isSolutionSetUpdate;
+	
+
+	private RuntimeAggregatorRegistry iterationAggregators;
+
+	private String brokerKey;
+
+	private int superstepNum = 1;
+	
+	private volatile boolean terminationRequested;
+
+	// --------------------------------------------------------------------------------------------
+	// Main life cycle methods that implement the iterative behavior
+	// --------------------------------------------------------------------------------------------
+
+	@Override
+	protected void initialize() throws Exception {
+		super.initialize();
+
+		// check if the driver is resettable
+		if (this.driver instanceof ResettableDriver) {
+			final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
+			// make sure that the according inputs are not reseted
+			for (int i = 0; i < resDriver.getNumberOfInputs(); i++) {
+				if (resDriver.isInputResettable(i)) {
+					excludeFromReset(i);
+				}
+			}
+		}
+		
+		TaskConfig config = getLastTasksConfig();
+		isWorksetIteration = config.getIsWorksetIteration();
+		isWorksetUpdate = config.getIsWorksetUpdate();
+		isSolutionSetUpdate = config.getIsSolutionSetUpdate();
+
+		if (isWorksetUpdate) {
+			worksetBackChannel = BlockingBackChannelBroker.instance().getAndRemove(brokerKey());
+
+			if (isWorksetIteration) {
+				worksetAggregator = getIterationAggregators().getAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME);
+
+				if (worksetAggregator == null) {
+					throw new RuntimeException("Missing workset elements count aggregator.");
+				}
+			}
+		}
+	}
+
+	@Override
+	public void run() throws Exception {
+		if (inFirstIteration()) {
+			if (this.driver instanceof ResettableDriver) {
+				// initialize the repeatable driver
+				((ResettableDriver<?, ?>) this.driver).initialize();
+			}
+		} else {
+			reinstantiateDriver();
+			resetAllInputs();
+			
+			// re-read the iterative broadcast variables
+			for (int i : this.iterativeBroadcastInputs) {
+				final String name = getTaskConfig().getBroadcastInputName(i);
+				readAndSetBroadcastInput(i, name, this.runtimeUdfContext, superstepNum);
+			}
+		}
+
+		// call the parent to execute the superstep
+		super.run();
+		
+		// release the iterative broadcast variables
+		for (int i : this.iterativeBroadcastInputs) {
+			final String name = getTaskConfig().getBroadcastInputName(i);
+			releaseBroadcastVariables(name, superstepNum, this.runtimeUdfContext);
+		}
+	}
+
+	@Override
+	protected void closeLocalStrategiesAndCaches() {
+		try {
+			super.closeLocalStrategiesAndCaches();
+		}
+		finally {
+			if (this.driver instanceof ResettableDriver) {
+				final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
+				try {
+					resDriver.teardown();
+				} catch (Throwable t) {
+					log.error("Error while shutting down an iterative operator.", t);
+				}
+			}
+		}
+	}
+
+	@Override
+	public DistributedRuntimeUDFContext createRuntimeContext(String taskName) {
+		Environment env = getEnvironment();
+		return new IterativeRuntimeUdfContext(taskName, env.getNumberOfSubtasks(),
+				env.getIndexInSubtaskGroup(), getUserCodeClassLoader(), getExecutionConfig(),
+				env.getDistributedCacheEntries(), this.accumulatorMap);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Utility Methods for Iteration Handling
+	// --------------------------------------------------------------------------------------------
+
+	protected boolean inFirstIteration() {
+		return this.superstepNum == 1;
+	}
+
+	protected int currentIteration() {
+		return this.superstepNum;
+	}
+
+	protected void incrementIterationCounter() {
+		this.superstepNum++;
+	}
+
+	public String brokerKey() {
+		if (brokerKey == null) {
+			int iterationId = config.getIterationId();
+			brokerKey = getEnvironment().getJobID().toString() + '#' + iterationId + '#' +
+					getEnvironment().getIndexInSubtaskGroup();
+		}
+		return brokerKey;
+	}
+
+	private void reinstantiateDriver() throws Exception {
+		if (this.driver instanceof ResettableDriver) {
+			final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
+			resDriver.reset();
+		} else {
+			Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
+			this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);
+
+			try {
+				this.driver.setup(this);
+			}
+			catch (Throwable t) {
+				throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskName() +
+						"' , caused an error: " + t.getMessage(), t);
+			}
+		}
+	}
+
+	public RuntimeAggregatorRegistry getIterationAggregators() {
+		if (this.iterationAggregators == null) {
+			this.iterationAggregators = IterationAggregatorBroker.instance().get(brokerKey());
+		}
+		return this.iterationAggregators;
+	}
+
+	protected void verifyEndOfSuperstepState() throws IOException {
+		// sanity check that there is at least one iterative input reader
+		if (this.iterativeInputs.length == 0 && this.iterativeBroadcastInputs.length == 0) {
+			throw new IllegalStateException("Error: Iterative task without a single iterative input.");
+		}
+
+		for (int inputNum : this.iterativeInputs) {
+			MutableReader<?> reader = this.inputReaders[inputNum];
+
+			if (!reader.isFinished()) {
+				if (reader.hasReachedEndOfSuperstep()) {
+					reader.startNextSuperstep();
+				}
+				else {
+					// need to read and drop all non-consumed data until we reach the end-of-superstep
+					@SuppressWarnings("unchecked")
+					MutableObjectIterator<Object> inIter = (MutableObjectIterator<Object>) this.inputIterators[inputNum];
+					Object o = this.inputSerializers[inputNum].getSerializer().createInstance();
+					while ((o = inIter.next(o)) != null);
+					
+					if (!reader.isFinished()) {
+						// also reset the end-of-superstep state
+						reader.startNextSuperstep();
+					}
+				}
+			}
+		}
+		
+		for (int inputNum : this.iterativeBroadcastInputs) {
+			MutableReader<?> reader = this.broadcastInputReaders[inputNum];
+
+			if (!reader.isFinished()) {
+				
+				// sanity check that the BC input is at the end of the superstep
+				if (!reader.hasReachedEndOfSuperstep()) {
+					throw new IllegalStateException("An iterative broadcast input has not been fully consumed.");
+				}
+				
+				reader.startNextSuperstep();
+			}
+		}
+	}
+
+	@Override
+	public boolean terminationRequested() {
+		return this.terminationRequested;
+	}
+
+	@Override
+	public void requestTermination() {
+		this.terminationRequested = true;
+	}
+
+	@Override
+	public void cancel() throws Exception {
+		requestTermination();
+		super.cancel();
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+	// Iteration State Update Handling
+	// -----------------------------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a new {@link WorksetUpdateOutputCollector}.
+	 * <p>
+	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 * workset.
+	 * <p>
+	 * If a non-null delegate is given, the new {@link Collector} will write to the solution set and also call
+	 * collect(T) of the delegate.
+	 *
+	 * @param delegate null -OR- the delegate on which to call collect() by the newly created collector
+	 * @return a new {@link WorksetUpdateOutputCollector}
+	 */
+	protected Collector<OT> createWorksetUpdateOutputCollector(Collector<OT> delegate) {
+		DataOutputView outputView = worksetBackChannel.getWriteEnd();
+		TypeSerializer<OT> serializer = getOutputSerializer();
+		return new WorksetUpdateOutputCollector<OT>(outputView, serializer, delegate);
+	}
+
+	protected Collector<OT> createWorksetUpdateOutputCollector() {
+		return createWorksetUpdateOutputCollector(null);
+	}
+
+	/**
+	 * Creates a new solution set update output collector.
+	 * <p>
+	 * This collector is used by {@link IterationIntermediateTask} or {@link IterationTailTask} to update the
+	 * solution set of workset iterations. Depending on the task configuration, either a fast (non-probing)
+	 * {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or normal (re-probing)
+	 * {@link SolutionSetUpdateOutputCollector} is created.
+	 * <p>
+	 * If a non-null delegate is given, the new {@link Collector} will write back to the solution set and also call
+	 * collect(T) of the delegate.
+	 *
+	 * @param delegate null -OR- a delegate collector to be called by the newly created collector
+	 * @return a new {@link org.apache.flink.runtime.iterative.io.SolutionSetFastUpdateOutputCollector} or
+	 * {@link SolutionSetUpdateOutputCollector}
+	 */
+	protected Collector<OT> createSolutionSetUpdateOutputCollector(Collector<OT> delegate) {
+		Broker<Object> solutionSetBroker = SolutionSetBroker.instance();
+		
+		Object ss = solutionSetBroker.get(brokerKey());
+		if (ss instanceof CompactingHashTable) {
+			@SuppressWarnings("unchecked")
+			CompactingHashTable<OT> solutionSet = (CompactingHashTable<OT>) ss;
+			return new SolutionSetUpdateOutputCollector<OT>(solutionSet, delegate);
+		}
+		else if (ss instanceof JoinHashMap) {
+			@SuppressWarnings("unchecked")
+			JoinHashMap<OT> map = (JoinHashMap<OT>) ss;
+			return new SolutionSetObjectsUpdateOutputCollector<OT>(map, delegate);
+		} else {
+			throw new RuntimeException("Unrecognized solution set handle: " + ss);
+		}
+	}
+
+	/**
+	 * @return output serializer of this task
+	 */
+	private TypeSerializer<OT> getOutputSerializer() {
+		TypeSerializerFactory<OT> serializerFactory;
+
+		if ((serializerFactory = getLastTasksConfig().getOutputSerializer(getUserCodeClassLoader())) ==
+				null) {
+			throw new RuntimeException("Missing output serializer for workset update.");
+		}
+
+		return serializerFactory.getSerializer();
+	}
+
+	// -----------------------------------------------------------------------------------------------------------------
+
+	private class IterativeRuntimeUdfContext extends DistributedRuntimeUDFContext implements IterationRuntimeContext {
+
+		public IterativeRuntimeUdfContext(String name, int numParallelSubtasks, int subtaskIndex, ClassLoader userCodeClassLoader,
+										ExecutionConfig executionConfig, Map<String, Future<Path>> cpTasks,
+										Map<String, Accumulator<?,?>> accumulatorMap) {
+			super(name, numParallelSubtasks, subtaskIndex, userCodeClassLoader, executionConfig, cpTasks, accumulatorMap);
+		}
+
+		@Override
+		public int getSuperstepNumber() {
+			return AbstractIterativeTask.this.superstepNum;
+		}
+
+		@Override
+		public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+			return getIterationAggregators().<T>getAggregator(name);
+		}
+
+		@Override
+		@SuppressWarnings("unchecked")
+		public <T extends Value> T getPreviousIterationAggregate(String name) {
+			return (T) getIterationAggregators().getPreviousGlobalAggregate(name);
+		}
+
+		@Override
+		public <V, A extends Serializable> void addAccumulator(String name, Accumulator<V, A> newAccumulator) {
+			// only add accumulator on first iteration
+			if (inFirstIteration()) {
+				super.addAccumulator(name, newAccumulator);
+			}
+		}
+	}
+
+}


Mime
View raw message