flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [5/5] flink git commit: [FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.
Date Sun, 30 Aug 2015 21:44:40 GMT
[FLINK-1681] [tests] Remove outdated 'nephele' iteration tests.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a7a57ebe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a7a57ebe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a7a57ebe

Branch: refs/heads/master
Commit: a7a57ebea6d8f60abba4fe2559af05d316112ca4
Parents: 0ba5355
Author: Stephan Ewen <sewen@apache.org>
Authored: Sun Aug 30 18:29:12 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Sun Aug 30 22:39:17 2015 +0200

----------------------------------------------------------------------
 .../apache/flink/test/util/TestBaseUtils.java   |  45 +
 .../BulkIterationWithAllReducerITCase.java      |   6 +-
 .../test/iterative/DanglingPageRankITCase.java  | 392 ++++++++-
 .../flink/test/iterative/PageRankITCase.java    |  54 --
 .../test/iterative/nephele/ConfigUtils.java     |  64 --
 .../ConnectedComponentsNepheleITCase.java       | 837 -------------------
 .../nephele/DanglingPageRankNepheleITCase.java  |  75 --
 ...nglingPageRankWithCombinerNepheleITCase.java |  63 --
 .../IterationWithChainingNepheleITCase.java     | 296 -------
 .../test/iterative/nephele/JobGraphUtils.java   | 106 ---
 .../CustomCompensatableDanglingPageRank.java    | 315 -------
 ...mpensatableDanglingPageRankWithCombiner.java | 329 --------
 .../CustomCompensatableDotProductCoGroup.java   | 130 ---
 .../CustomCompensatableDotProductMatch.java     |  80 --
 .../CustomCompensatingMap.java                  |  82 --
 .../CustomImprovedAdjacencyListInputFormat.java |  66 --
 ...stomImprovedDanglingPageRankInputFormat.java |  66 --
 .../CustomPageWithRankOutFormat.java            |  45 -
 .../CustomRankCombiner.java                     |  57 --
 .../types/VertexWithAdjacencyList.java          |  83 --
 .../VertexWithAdjacencyListComparator.java      | 148 ----
 ...ertexWithAdjacencyListComparatorFactory.java |  39 -
 .../VertexWithAdjacencyListSerializer.java      | 112 ---
 ...ertexWithAdjacencyListSerializerFactory.java |  56 --
 .../types/VertexWithRank.java                   |  65 --
 .../types/VertexWithRankAndDangling.java        |  76 --
 .../VertexWithRankAndDanglingComparator.java    | 153 ----
 ...texWithRankAndDanglingComparatorFactory.java |  39 -
 .../VertexWithRankAndDanglingSerializer.java    |  84 --
 ...texWithRankAndDanglingSerializerFactory.java |  56 --
 .../types/VertexWithRankComparator.java         | 151 ----
 .../types/VertexWithRankComparatorFactory.java  |  39 -
 ...xWithAdjacencyListPairComparatorFactory.java |  91 --
 ...ngToVertexWithRankPairComparatorFactory.java |  91 --
 .../types/VertexWithRankSerializer.java         |  81 --
 .../types/VertexWithRankSerializerFactory.java  |  56 --
 .../danglingpagerank/AsciiLongArrayView.java    | 166 ----
 .../nephele/danglingpagerank/BooleanValue.java  |  57 --
 .../CompensatableDanglingPageRank.java          | 295 -------
 .../CompensatableDotProductCoGroup.java         | 137 ---
 .../CompensatableDotProductMatch.java           | 102 ---
 .../danglingpagerank/CompensatingMap.java       |  88 --
 .../DanglingPageGenerateRankInputFormat.java    |  62 --
 .../DiffL1NormConvergenceCriterion.java         |  44 -
 .../ImprovedAdjacencyListInputFormat.java       |  74 --
 .../ImprovedDanglingPageRankInputFormat.java    |  73 --
 .../nephele/danglingpagerank/LongArrayView.java |  88 --
 .../nephele/danglingpagerank/PageRankStats.java | 124 ---
 .../PageRankStatsAggregator.java                |  84 --
 .../danglingpagerank/PageWithRankOutFormat.java |  47 --
 .../test/recordJobs/graph/DanglingPageRank.java | 105 ---
 .../test/recordJobs/graph/SimplePageRank.java   | 194 -----
 .../graph/pageRankUtil/AsciiLongArrayView.java  | 163 ----
 .../DanglingPageRankInputFormat.java            |  78 --
 .../DiffL1NormConvergenceCriterion.java         |  44 -
 .../graph/pageRankUtil/DotProductCoGroup.java   | 129 ---
 .../graph/pageRankUtil/DotProductMatch.java     |  63 --
 .../ImprovedAdjacencyListInputFormat.java       |  76 --
 .../graph/pageRankUtil/LongArrayView.java       |  89 --
 .../graph/pageRankUtil/PageRankStats.java       | 105 ---
 .../pageRankUtil/PageRankStatsAggregator.java   |  73 --
 .../pageRankUtil/PageWithRankOutFormat.java     |  51 --
 62 files changed, 412 insertions(+), 6727 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
----------------------------------------------------------------------
diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
index ce02267..87fab25 100644
--- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
+++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java
@@ -588,4 +588,49 @@ public class TestBaseUtils extends TestLogger {
 
 		return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8");
 	}
+	
+	public static class TupleComparator<T extends Tuple> implements Comparator<T> {
+
+		@Override
+		public int compare(T o1, T o2) {
+			if (o1 == null || o2 == null) {
+				throw new IllegalArgumentException("Cannot compare null tuples");
+			}
+			else if (o1.getArity() != o2.getArity()) {
+				return o1.getArity() - o2.getArity();
+			}
+			else {
+				for (int i = 0; i < o1.getArity(); i++) {
+					Object val1 = o1.getField(i);
+					Object val2 = o2.getField(i);
+					
+					int cmp;
+					if (val1 != null && val2 != null) {
+						cmp = compareValues(val1, val2);
+					}
+					else {
+						cmp = val1 == null ? (val2 == null ? 0 : -1) : 1;
+					}
+					
+					if (cmp != 0) {
+						return cmp;
+					}
+				}
+				
+				return 0;
+			}
+		}
+		
+		@SuppressWarnings("unchecked")
+		private static <X extends Comparable<X>> int compareValues(Object o1, Object o2) {
+			if (o1 instanceof Comparable && o2 instanceof Comparable) {
+				X c1 = (X) o1;
+				X c2 = (X) o2;
+				return c1.compareTo(c2);
+			}
+			else {
+				throw new IllegalArgumentException("Cannot compare tuples with non comparable elements");
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
index f4f2c18..d55a63f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/BulkIterationWithAllReducerITCase.java
@@ -64,10 +64,8 @@ public class BulkIterationWithAllReducerITCase extends JavaProgramTestBase {
 		
 		@Override
 		public void open(Configuration parameters) {
-			Collection<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
-			synchronized (bc) {
-				this.bcValue = bc.isEmpty() ? null : bc.iterator().next();
-			}
+			List<Integer> bc = getRuntimeContext().getBroadcastVariable("bc");
+			this.bcValue = bc.isEmpty() ? null : bc.get(0);
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
index e2d095d..53496e2 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/DanglingPageRankITCase.java
@@ -18,35 +18,373 @@
 
 package org.apache.flink.test.iterative;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.iterative.nephele.DanglingPageRankNepheleITCase;
-import org.apache.flink.test.recordJobs.graph.DanglingPageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
 
-public class DanglingPageRankITCase extends RecordAPITestBase {
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
-	protected String pagesPath;
-	protected String edgesPath;
-	protected String resultPath;
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+
+@RunWith(Parameterized.class)
+@SuppressWarnings({"serial", "unchecked"})
+public class DanglingPageRankITCase extends MultipleProgramsTestBase {
+
+	private static final String AGGREGATOR_NAME = "pagerank.aggregator";
+	
+	
+	public DanglingPageRankITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testDanglingPageRank() {
+		try {
+			final int NUM_ITERATIONS = 25;
+			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Tuple2<Long, Boolean>> vertices = env.fromElements(
+					new Tuple2<>(1L, false),
+					new Tuple2<>(2L, false),
+					new Tuple2<>(5L, false),
+					new Tuple2<>(3L, true),
+					new Tuple2<>(4L, false)
+			);
+
+			DataSet<PageWithLinks> edges = env.fromElements(
+					new PageWithLinks(2L, new long[] { 1 }),
+					new PageWithLinks(5L, new long[] { 2, 4 }),
+					new PageWithLinks(4L, new long[] { 3, 2 }),
+					new PageWithLinks(1L, new long[] { 4, 2, 3 })
+			);
+			
+			
+			final long numVertices = vertices.count();
+			final long numDanglingVertices = vertices
+					.filter(
+							new FilterFunction<Tuple2<Long, Boolean>>() {
+								@Override
+								public boolean filter(Tuple2<Long, Boolean> value) {
+									return value.f1;
+								}
+							})
+					.count();
+			
+			
+			DataSet<PageWithRankAndDangling> verticesWithInitialRank = vertices
+					.map(new MapFunction<Tuple2<Long, Boolean>, PageWithRankAndDangling>() {
+						
+						@Override
+						public PageWithRankAndDangling map(Tuple2<Long, Boolean> value) {
+							return new PageWithRankAndDangling(value.f0, 1.0 / numVertices, value.f1);
+						}
+					});
+			
+			IterativeDataSet<PageWithRankAndDangling> iteration = verticesWithInitialRank.iterate(NUM_ITERATIONS);
+
+			iteration.getAggregators().registerAggregationConvergenceCriterion(
+					AGGREGATOR_NAME,
+					new PageRankStatsAggregator(),
+					new DiffL1NormConvergenceCriterion());
+			
+			DataSet<PageWithRank> partialRanks = iteration.join(edges).where("pageId").equalTo("pageId").with(
+					new FlatJoinFunction<PageWithRankAndDangling, PageWithLinks, PageWithRank>() {
+						
+						@Override
+						public void join(PageWithRankAndDangling page,
+											PageWithLinks links,
+											Collector<PageWithRank> out)  {
+							
+							double rankToDistribute = page.rank / (double) links.targets.length;
+							PageWithRank output = new PageWithRank(0L, rankToDistribute);
+
+							for (long target : links.targets) {
+								output.pageId = target;
+								out.collect(output);
+							}
+						}
+					}
+			);
+			
+			DataSet<PageWithRankAndDangling> newRanks = 
+				iteration.coGroup(partialRanks).where("pageId").equalTo("pageId").with(
+					new RichCoGroupFunction<PageWithRankAndDangling, PageWithRank, PageWithRankAndDangling>() {
+
+						private static final double BETA = 0.85;
+
+						private final double randomJump = (1.0 - BETA) / numVertices;
+						private PageRankStatsAggregator aggregator;
+						private double danglingRankFactor;
+						
+						@Override
+						public void open(Configuration parameters) throws Exception {
+							int currentIteration = getIterationRuntimeContext().getSuperstepNumber();
+							
+							aggregator = getIterationRuntimeContext().getIterationAggregator(AGGREGATOR_NAME);
+
+							if (currentIteration == 1) {
+								danglingRankFactor = BETA * (double) numDanglingVertices / 
+										((double) numVertices * (double) numVertices);
+							} else {
+								PageRankStats previousAggregate = getIterationRuntimeContext()
+										.getPreviousIterationAggregate(AGGREGATOR_NAME);
+								danglingRankFactor = BETA * previousAggregate.danglingRank() / (double) numVertices;
+							}
+						}
+						
+						@Override
+						public void coGroup(Iterable<PageWithRankAndDangling> currentPages,
+											Iterable<PageWithRank> partialRanks,
+											Collector<PageWithRankAndDangling> out) {
+							
+							// compute the next rank
+							long edges = 0;
+							double summedRank = 0;
+							for (PageWithRank partial : partialRanks) {
+								summedRank += partial.rank;
+								edges++;
+							}
+							double rank = BETA * summedRank + randomJump + danglingRankFactor;
+							
+							// current rank, for stats and convergence
+							PageWithRankAndDangling currentPage = currentPages.iterator().next();
+							double currentRank = currentPage.rank;
+							boolean isDangling = currentPage.dangling;
+
+							// maintain statistics to compensate for probability loss on dangling nodes
+							double danglingRankToAggregate = isDangling ? rank : 0;
+							long danglingVerticesToAggregate = isDangling ? 1 : 0;
+							double diff = Math.abs(currentRank - rank);
+							aggregator.aggregate(diff, rank, danglingRankToAggregate, danglingVerticesToAggregate, 1, edges);
+
+							currentPage.rank = rank;
+							out.collect(currentPage);
+						}
+					});
+			
+			List<PageWithRankAndDangling> result = iteration.closeWith(newRanks).collect();
+			
+			double totalRank = 0.0;
+			for (PageWithRankAndDangling r : result) {
+				totalRank += r.rank;
+				assertTrue(r.pageId >= 1 && r.pageId <= 5);
+				assertTrue(r.pageId != 3 || r.dangling);
+			}
+			
+			assertEquals(1.0, totalRank, 0.001);
+		}
+		catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	// ------------------------------------------------------------------------
+	//  custom types
+	// ------------------------------------------------------------------------
+	
+	public static class PageWithRank {
+		
+		public long pageId;
+		public double rank;
+
+		public PageWithRank() {}
+		
+		public PageWithRank(long pageId, double rank) {
+			this.pageId = pageId;
+			this.rank = rank;
+		}
+	}
+
+	public static class PageWithRankAndDangling {
+
+		public long pageId;
+		public double rank;
+		public boolean dangling; 
+
+		public PageWithRankAndDangling() {}
+
+		public PageWithRankAndDangling(long pageId, double rank, boolean dangling) {
+			this.pageId = pageId;
+			this.rank = rank;
+			this.dangling = dangling;
+		}
+
+		@Override
+		public String toString() {
+			return "PageWithRankAndDangling{" +
+					"pageId=" + pageId +
+					", rank=" + rank +
+					", dangling=" + dangling +
+					'}';
+		}
+	}
+
+	public static class PageWithLinks {
+
+		public long pageId;
+		public long[] targets;
+
+		public PageWithLinks() {}
+
+		public PageWithLinks(long pageId, long[] targets) {
+			this.pageId = pageId;
+			this.targets = targets;
+		}
+	}
+	
+	// ------------------------------------------------------------------------
+	//  statistics
+	// ------------------------------------------------------------------------
+
+	public static class PageRankStats implements Value {
+
+		private double diff;
+		private double rank;
+		private double danglingRank;
+		private long numDanglingVertices;
+		private long numVertices;
+		private long edges;
+
+		public PageRankStats() {}
+
+		public PageRankStats(
+					double diff, double rank, double danglingRank,
+					long numDanglingVertices, long numVertices, long edges) {
+			
+			this.diff = diff;
+			this.rank = rank;
+			this.danglingRank = danglingRank;
+			this.numDanglingVertices = numDanglingVertices;
+			this.numVertices = numVertices;
+			this.edges = edges;
+		}
+
+		public double diff() {
+			return diff;
+		}
+
+		public double rank() {
+			return rank;
+		}
+
+		public double danglingRank() {
+			return danglingRank;
+		}
+
+		public long numDanglingVertices() {
+			return numDanglingVertices;
+		}
+
+		public long numVertices() {
+			return numVertices;
+		}
+
+		public long edges() {
+			return edges;
+		}
+
+		@Override
+		public void write(DataOutputView out) throws IOException {
+			out.writeDouble(diff);
+			out.writeDouble(rank);
+			out.writeDouble(danglingRank);
+			out.writeLong(numDanglingVertices);
+			out.writeLong(numVertices);
+			out.writeLong(edges);
+		}
+
+		@Override
+		public void read(DataInputView in) throws IOException {
+			diff = in.readDouble();
+			rank = in.readDouble();
+			danglingRank = in.readDouble();
+			numDanglingVertices = in.readLong();
+			numVertices = in.readLong();
+			edges = in.readLong();
+		}
+
+		@Override
+		public String toString() {
+			return "PageRankStats: diff [" + diff + "], rank [" + rank + "], danglingRank [" + danglingRank +
+					"], numDanglingVertices [" + numDanglingVertices + "], numVertices [" + numVertices + "], edges [" + edges +
+					"]";
+		}
+	}
 	
-	@Override
-	protected void preSubmit() throws Exception {
-		pagesPath = createTempFile("pages.txt", DanglingPageRankNepheleITCase.TEST_VERTICES);
-		edgesPath = createTempFile("edges.txt", DanglingPageRankNepheleITCase.TEST_EDGES);
-		resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		DanglingPageRank pr = new DanglingPageRank();
-		Plan plan = pr.getPlan(
-			String.valueOf(parallelism),
-			pagesPath,
-			edgesPath,
-			resultPath,
-			"25",	// max iterations
-			"5",	// num vertices
-			"1");	// num dangling vertices
-		return plan;
+	public static class PageRankStatsAggregator implements Aggregator<PageRankStats> {
+
+		private double diff;
+		private double rank;
+		private double danglingRank;
+		private long numDanglingVertices;
+		private long numVertices;
+		private long edges;
+
+		@Override
+		public PageRankStats getAggregate() {
+			return new PageRankStats(diff, rank, danglingRank, numDanglingVertices, numVertices, edges);
+		}
+
+		public void aggregate(double diffDelta, double rankDelta, double danglingRankDelta, long danglingVerticesDelta,
+							  long verticesDelta, long edgesDelta) {
+			diff += diffDelta;
+			rank += rankDelta;
+			danglingRank += danglingRankDelta;
+			numDanglingVertices += danglingVerticesDelta;
+			numVertices += verticesDelta;
+			edges += edgesDelta;
+		}
+
+		@Override
+		public void aggregate(PageRankStats pageRankStats) {
+			diff += pageRankStats.diff();
+			rank += pageRankStats.rank();
+			danglingRank += pageRankStats.danglingRank();
+			numDanglingVertices += pageRankStats.numDanglingVertices();
+			numVertices += pageRankStats.numVertices();
+			edges += pageRankStats.edges();
+		}
+
+		@Override
+		public void reset() {
+			diff = 0;
+			rank = 0;
+			danglingRank = 0;
+			numDanglingVertices = 0;
+			numVertices = 0;
+			edges = 0;
+		}
+	}
+
+	public static class DiffL1NormConvergenceCriterion implements ConvergenceCriterion<PageRankStats> {
+
+		private static final double EPSILON = 0.00005;
+
+		@Override
+		public boolean isConverged(int iteration, PageRankStats pageRankStats) {
+			return pageRankStats.diff() < EPSILON;
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
deleted file mode 100644
index 946d89b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/PageRankITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.graph.SimplePageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class PageRankITCase extends RecordAPITestBase {
-	
-	private static final String VERTICES = "1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n";
-	
-	private static final String EDGES = "1 2\n2 3\n3 4\n4 5\n5 6\n6 7\n7 8\n8 9\n9 10\n10 1\n";
-
-	protected String pagesPath;
-	protected String edgesPath;
-	protected String resultPath;
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		pagesPath = createTempFile("pages.txt", VERTICES);
-		edgesPath = createTempFile("edges.txt", EDGES);
-		resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected Plan getTestJob() {
-		SimplePageRank pr = new SimplePageRank();
-		Plan plan = pr.getPlan(
-			String.valueOf(parallelism),
-			pagesPath,
-			edgesPath,
-			resultPath,
-			"5",	// max iterations
-			"10");	// num vertices
-		return plan;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
deleted file mode 100644
index c9eea3d..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConfigUtils.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.flink.configuration.Configuration;
-
-public class ConfigUtils {
-
-	private ConfigUtils() {
-	}
-
-	public static int asInteger(String key, Configuration parameters) {
-		int value = parameters.getInteger(key, -1);
-		if (value == -1) {
-			throw new IllegalStateException();
-		}
-		return value;
-	}
-
-	public static double asDouble(String key, Configuration parameters) {
-		double value = Double.parseDouble(parameters.getString(key, String.valueOf(Double.NaN)));
-		if (Double.isNaN(value)) {
-			throw new IllegalStateException();
-		}
-		return value;
-	}
-
-	public static long asLong(String key, Configuration parameters) {
-		long value = parameters.getLong(key, Long.MIN_VALUE);
-		if (value == Long.MIN_VALUE) {
-			throw new IllegalStateException();
-		}
-		return value;
-	}
-
-	public static Set<Integer> asIntSet(String key, Configuration parameters) {
-		String[] tokens = parameters.getString(key, "").split(",");
-		Set<Integer> failingWorkers = new HashSet<Integer>(tokens.length);
-		for (String token : tokens) {
-			failingWorkers.add(Integer.parseInt(token));
-		}
-		return failingWorkers;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
deleted file mode 100644
index 7a3639b..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/ConnectedComponentsNepheleITCase.java
+++ /dev/null
@@ -1,837 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.io.BufferedReader;
-import java.util.Collection;
-
-import org.apache.flink.api.common.aggregators.LongSumAggregator;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypePairComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordPairComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingReduceFunction;
-import org.apache.flink.api.java.record.operators.ReduceOperator.WrappingClassReduceFunction;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.convergence.WorksetEmptyConvergenceCriterion;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationIntermediatePactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.BuildSecondCachedJoinDriver;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.JoinWithSolutionSetSecondDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.MinimumComponentIDReduce;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.UpdateComponentIdMatch;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-/**
- * Tests the various variants of iteration state updates for workset iterations:
- * - unified solution set and workset tail update
- * - separate solution set and workset tail updates
- * - intermediate workset update and solution set tail
- * - intermediate solution set update and workset tail
- */
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class ConnectedComponentsNepheleITCase extends RecordAPITestBase {
-
-	private static final long SEED = 0xBADC0FFEEBEEFL;
-
-	private static final int NUM_VERTICES = 1000;
-
-	private static final int NUM_EDGES = 10000;
-
-	private static final int ITERATION_ID = 1;
-
-	private static final long MEM_PER_CONSUMER = 3;
-
-	private static final int parallelism = 4;
-
-	private static final double MEM_FRAC_PER_CONSUMER = (double)MEM_PER_CONSUMER/TASK_MANAGER_MEMORY_SIZE*parallelism;
-
-	protected String verticesPath;
-
-	protected String edgesPath;
-
-	protected String resultPath;
-
-	public ConnectedComponentsNepheleITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	@Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config1 = new Configuration();
-		config1.setInteger("testcase", 1);
-
-		Configuration config2 = new Configuration();
-		config2.setInteger("testcase", 2);
-
-		Configuration config3 = new Configuration();
-		config3.setInteger("testcase", 3);
-
-		Configuration config4 = new Configuration();
-		config4.setInteger("testcase", 4);
-
-		return toParameterList(config1, config2, config3, config4);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
-		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED));
-		resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		int maxIterations = 100;
-
-		int type = config.getInteger("testcase", 0);
-		switch (type) {
-		case 1:
-			return createJobGraphUnifiedTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
-		case 2:
-			return createJobGraphSeparateTails(verticesPath, edgesPath, resultPath, parallelism, maxIterations);
-		case 3:
-			return createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(verticesPath, edgesPath, resultPath, parallelism,
-				maxIterations);
-		case 4:
-			return createJobGraphSolutionSetUpdateAndWorksetTail(verticesPath, edgesPath, resultPath, parallelism,
-				maxIterations);
-		default:
-			throw new RuntimeException("Broken test configuration");
-		}
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (BufferedReader reader : getResultReader(resultPath)) {
-			ConnectedComponentsData.checkOddEvenResult(reader);
-		}
-	}
-
-	public static final class IdDuplicator extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			record.setField(1, record.getField(0, LongValue.class));
-			out.collect(record);
-		}
-
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Invariant vertices across all variants
-	// -----------------------------------------------------------------------------------------------------------------
-
-	private static InputFormatVertex createVerticesInput(JobGraph jobGraph, String verticesPath, int numSubTasks,
-			TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
-	{
-		@SuppressWarnings("unchecked")
-		CsvInputFormat verticesInFormat = new CsvInputFormat(' ', LongValue.class);
-		InputFormatVertex verticesInput = JobGraphUtils.createInput(verticesInFormat, verticesPath, "VerticesInput",
-			jobGraph, numSubTasks);
-		TaskConfig verticesInputConfig = new TaskConfig(verticesInput.getConfiguration());
-		{
-			verticesInputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			verticesInputConfig.setOutputSerializer(serializer);
-
-			// chained mapper that duplicates the id
-			TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
-			chainedMapperConfig.setStubWrapper(new UserCodeClassWrapper<IdDuplicator>(IdDuplicator.class));
-			chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
-			chainedMapperConfig.setInputSerializer(serializer, 0);
-
-			chainedMapperConfig.setOutputSerializer(serializer);
-			chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-			chainedMapperConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-			chainedMapperConfig.setOutputComparator(comparator, 0);
-			chainedMapperConfig.setOutputComparator(comparator, 1);
-
-			verticesInputConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "ID Duplicator");
-		}
-
-		return verticesInput;
-	}
-
-	private static InputFormatVertex createEdgesInput(JobGraph jobGraph, String edgesPath, int numSubTasks,
-			TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
-	{
-		// edges
-		@SuppressWarnings("unchecked")
-		CsvInputFormat edgesInFormat = new CsvInputFormat(' ', LongValue.class, LongValue.class);
-		InputFormatVertex edgesInput = JobGraphUtils.createInput(edgesInFormat, edgesPath, "EdgesInput", jobGraph,
-			numSubTasks);
-		TaskConfig edgesInputConfig = new TaskConfig(edgesInput.getConfiguration());
-		{
-			edgesInputConfig.setOutputSerializer(serializer);
-			edgesInputConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-			edgesInputConfig.setOutputComparator(comparator, 0);
-		}
-
-		return edgesInput;
-	}
-
-	private static JobVertex createIterationHead(JobGraph jobGraph, int numSubTasks,
-			TypeSerializerFactory<?> serializer,
-			TypeComparatorFactory<?> comparator,
-			TypePairComparatorFactory<?, ?> pairComparator) {
-
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Join With Edges (Iteration Head)", jobGraph, numSubTasks);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		{
-			headConfig.setIterationId(ITERATION_ID);
-
-			// initial input / workset
-			headConfig.addInputToGroup(0);
-			headConfig.setInputSerializer(serializer, 0);
-			headConfig.setInputComparator(comparator, 0);
-			headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
-			headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-
-			// regular plan input (second input to the join)
-			headConfig.addInputToGroup(1);
-			headConfig.setInputSerializer(serializer, 1);
-			headConfig.setInputComparator(comparator, 1);
-			headConfig.setInputLocalStrategy(1, LocalStrategy.NONE);
-			headConfig.setInputCached(1, true);
-			headConfig.setRelativeInputMaterializationMemory(1, MEM_FRAC_PER_CONSUMER);
-
-			// initial solution set input
-			headConfig.addInputToGroup(2);
-			headConfig.setInputSerializer(serializer, 2);
-			headConfig.setInputComparator(comparator, 2);
-			headConfig.setInputLocalStrategy(2, LocalStrategy.NONE);
-			headConfig.setIterationHeadSolutionSetInputIndex(2);
-
-			headConfig.setSolutionSetSerializer(serializer);
-			headConfig.setSolutionSetComparator(comparator);
-
-			// back channel / iterations
-			headConfig.setIsWorksetIteration();
-			headConfig.setRelativeBackChannelMemory(MEM_FRAC_PER_CONSUMER);
-			headConfig.setRelativeSolutionSetMemory(MEM_FRAC_PER_CONSUMER );
-
-			// output into iteration
-			headConfig.setOutputSerializer(serializer);
-			headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-			headConfig.setOutputComparator(comparator, 0);
-
-			// final output
-			TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-			headFinalOutConfig.setOutputSerializer(serializer);
-			headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
-			// the sync
-			headConfig.setIterationHeadIndexOfSyncOutput(2);
-
-			// the driver
-			headConfig.setDriver(BuildSecondCachedJoinDriver.class);
-			headConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			headConfig.setStubWrapper(
-				new UserCodeClassWrapper<NeighborWithComponentIDJoin>(NeighborWithComponentIDJoin.class));
-			headConfig.setDriverComparator(comparator, 0);
-			headConfig.setDriverComparator(comparator, 1);
-			headConfig.setDriverPairComparator(pairComparator);
-			headConfig.setRelativeMemoryDriver(MEM_FRAC_PER_CONSUMER);
-
-			headConfig.addIterationAggregator(
-				WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME, new LongSumAggregator());
-		}
-
-		return head;
-	}
-
-	private static JobVertex createIterationIntermediate(JobGraph jobGraph, int numSubTasks,
-			TypeSerializerFactory<?> serializer, TypeComparatorFactory<?> comparator)
-	{
-		// --------------- the intermediate (reduce to min id) ---------------
-		JobVertex intermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Find Min Component-ID", jobGraph, numSubTasks);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-		{
-			intermediateConfig.setIterationId(ITERATION_ID);
-
-			intermediateConfig.addInputToGroup(0);
-			intermediateConfig.setInputSerializer(serializer, 0);
-			intermediateConfig.setInputComparator(comparator, 0);
-			intermediateConfig.setInputLocalStrategy(0, LocalStrategy.SORT);
-			intermediateConfig.setRelativeMemoryInput(0, MEM_FRAC_PER_CONSUMER);
-			intermediateConfig.setFilehandlesInput(0, 64);
-			intermediateConfig.setSpillingThresholdInput(0, 0.85f);
-
-			intermediateConfig.setOutputSerializer(serializer);
-			intermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-
-			intermediateConfig.setDriver(GroupReduceDriver.class);
-			intermediateConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-			intermediateConfig.setDriverComparator(comparator, 0);
-			intermediateConfig.setStubWrapper(
-				new UserCodeObjectWrapper<WrappingReduceFunction>(new WrappingClassReduceFunction(MinimumComponentIDReduce.class)));
-		}
-
-		return intermediate;
-	}
-
-	private static OutputFormatVertex createOutput(JobGraph jobGraph, String resultPath, int numSubTasks,
-			TypeSerializerFactory<?> serializer) {
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Final Output", numSubTasks);
-		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
-		{
-
-			outputConfig.addInputToGroup(0);
-			outputConfig.setInputSerializer(serializer, 0);
-
-			outputConfig.setStubWrapper(new UserCodeClassWrapper<CsvOutputFormat>(CsvOutputFormat.class));
-			outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, resultPath);
-
-			Configuration outputUserConfig = outputConfig.getStubParameters();
-			outputUserConfig.setString(CsvOutputFormat.RECORD_DELIMITER_PARAMETER, "\n");
-			outputUserConfig.setString(CsvOutputFormat.FIELD_DELIMITER_PARAMETER, " ");
-			outputUserConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 0, LongValue.class);
-			outputUserConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 0, 0);
-			outputUserConfig.setClass(CsvOutputFormat.FIELD_TYPE_PARAMETER_PREFIX + 1, LongValue.class);
-			outputUserConfig.setInteger(CsvOutputFormat.RECORD_POSITION_PARAMETER_PREFIX + 1, 1);
-			outputUserConfig.setInteger(CsvOutputFormat.NUM_FIELDS_PARAMETER, 2);
-		}
-
-		return output;
-	}
-
-	private static JobVertex createSync(JobGraph jobGraph, int numSubTasks, int maxIterations) {
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(maxIterations);
-		syncConfig.setIterationId(ITERATION_ID);
-		syncConfig.addIterationAggregator(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
-			new LongSumAggregator());
-		syncConfig.setConvergenceCriterion(WorksetEmptyConvergenceCriterion.AGGREGATOR_NAME,
-			new WorksetEmptyConvergenceCriterion());
-
-		return sync;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Unified solution set and workset tail update
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public JobGraph createJobGraphUnifiedTails(
-			String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
-	{
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<?> comparator =
-			new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
-		final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
-		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
-
-		// -- invariant vertices -----------------------------------------------------------------------------------
-		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-		JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-
-		JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
-		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
-		// --------------- the tail (solution set join) ---------------
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationTail", jobGraph, numSubTasks);
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		{
-			tailConfig.setIterationId(ITERATION_ID);
-
-			tailConfig.setIsWorksetIteration();
-			tailConfig.setIsWorksetUpdate();
-
-			tailConfig.setIsSolutionSetUpdate();
-			tailConfig.setIsSolutionSetUpdateWithoutReprobe();
-
-			// inputs and driver
-			tailConfig.addInputToGroup(0);
-			tailConfig.setInputSerializer(serializer, 0);
-
-			// output
-			tailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			tailConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
-			tailConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			tailConfig.setDriverComparator(comparator, 0);
-			tailConfig.setDriverPairComparator(pairComparator);
-			
-			tailConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
-		}
-
-		// -- edges ------------------------------------------------------------------------------------------------
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
-		JobGraphUtils.connect(intermediate, tail, DistributionPattern.POINTWISE);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		vertices.setSlotSharingGroup(sharingGroup);
-		edges.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		tail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		
-		intermediate.setStrictlyCoLocatedWith(head);
-		tail.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-
-	public JobGraph createJobGraphSeparateTails(
-			String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
-	{
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<?> comparator =
-			new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
-		final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
-		JobGraph jobGraph = new JobGraph("Connected Components (Unified Tails)");
-
-		// input
-		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
-		// head
-		JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setWaitForSolutionSetUpdate();
-
-		// intermediate
-		JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
-		// output and auxiliaries
-		OutputFormatVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
-		// ------------------ the intermediate (ss join) ----------------------
-		JobVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Solution Set Join", jobGraph, numSubTasks);
-		TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
-		{
-			ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
-
-			// inputs
-			ssJoinIntermediateConfig.addInputToGroup(0);
-			ssJoinIntermediateConfig.setInputSerializer(serializer, 0);
-
-			// output
-			ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			ssJoinIntermediateConfig.setOutputComparator(comparator, 0);
-			ssJoinIntermediateConfig.setOutputComparator(comparator, 1);
-
-			ssJoinIntermediateConfig.setOutputSerializer(serializer);
-
-			// driver
-			ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
-			ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			ssJoinIntermediateConfig.setDriverComparator(comparator, 0);
-			ssJoinIntermediateConfig.setDriverPairComparator(pairComparator);
-			
-			ssJoinIntermediateConfig.setStubWrapper(
-				new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
-		}
-
-		// -------------------------- ss tail --------------------------------
-		JobVertex ssTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail",
-			jobGraph, numSubTasks);
-		TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
-		{
-			ssTailConfig.setIterationId(ITERATION_ID);
-			ssTailConfig.setIsSolutionSetUpdate();
-			ssTailConfig.setIsWorksetIteration();
-
-			// inputs and driver
-			ssTailConfig.addInputToGroup(0);
-			ssTailConfig.setInputSerializer(serializer, 0);
-			ssTailConfig.setInputAsynchronouslyMaterialized(0, true);
-			ssTailConfig.setRelativeInputMaterializationMemory(0, MEM_FRAC_PER_CONSUMER);
-
-			// output
-			ssTailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			ssTailConfig.setDriver(CollectorMapDriver.class);
-			ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			ssTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-		}
-
-		// -------------------------- ws tail --------------------------------
-		JobVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail",
-			jobGraph, numSubTasks);
-		TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
-		{
-			wsTailConfig.setIterationId(ITERATION_ID);
-			wsTailConfig.setIsWorksetIteration();
-			wsTailConfig.setIsWorksetUpdate();
-
-			// inputs and driver
-			wsTailConfig.addInputToGroup(0);
-			wsTailConfig.setInputSerializer(serializer, 0);
-
-			// output
-			wsTailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			wsTailConfig.setDriver(CollectorMapDriver.class);
-			wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			wsTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-		}
-
-		// --------------- the wiring ---------------------
-
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
-		JobGraphUtils.connect(intermediate, ssJoinIntermediate, DistributionPattern.POINTWISE);
-		ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(ssJoinIntermediate, ssTail, DistributionPattern.POINTWISE);
-		ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(ssJoinIntermediate, wsTail, DistributionPattern.POINTWISE);
-		wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		vertices.setSlotSharingGroup(sharingGroup);
-		edges.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		ssJoinIntermediate.setSlotSharingGroup(sharingGroup);
-		wsTail.setSlotSharingGroup(sharingGroup);
-		ssTail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		
-		intermediate.setStrictlyCoLocatedWith(head);
-		ssJoinIntermediate.setStrictlyCoLocatedWith(head);
-		wsTail.setStrictlyCoLocatedWith(head);
-		ssTail.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-
-	public JobGraph createJobGraphIntermediateWorksetUpdateAndSolutionSetTail(
-			String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
-	{
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<?> comparator =
-			new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
-		final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
-		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Workset Update, Solution Set Tail)");
-
-		// input
-		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
-		// head
-		JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		headConfig.setWaitForSolutionSetUpdate();
-
-		// intermediate
-		JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
-		// output and auxiliaries
-		JobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
-		// ------------------ the intermediate (ws update) ----------------------
-		JobVertex wsUpdateIntermediate =
-			JobGraphUtils.createTask(IterationIntermediatePactTask.class, "WorksetUpdate", jobGraph, numSubTasks);
-		TaskConfig wsUpdateConfig = new TaskConfig(wsUpdateIntermediate.getConfiguration());
-		{
-			wsUpdateConfig.setIterationId(ITERATION_ID);
-			wsUpdateConfig.setIsWorksetIteration();
-			wsUpdateConfig.setIsWorksetUpdate();
-
-			// inputs
-			wsUpdateConfig.addInputToGroup(0);
-			wsUpdateConfig.setInputSerializer(serializer, 0);
-
-			// output
-			wsUpdateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			wsUpdateConfig.setOutputComparator(comparator, 0);
-
-			wsUpdateConfig.setOutputSerializer(serializer);
-
-			// driver
-			wsUpdateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
-			wsUpdateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			wsUpdateConfig.setDriverComparator(comparator, 0);
-			wsUpdateConfig.setDriverPairComparator(pairComparator);
-			
-			wsUpdateConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(
-				UpdateComponentIdMatch.class));
-		}
-
-		// -------------------------- ss tail --------------------------------
-		JobVertex ssTail =
-			JobGraphUtils.createTask(IterationTailPactTask.class, "IterationSolutionSetTail", jobGraph, numSubTasks);
-		TaskConfig ssTailConfig = new TaskConfig(ssTail.getConfiguration());
-		{
-			ssTailConfig.setIterationId(ITERATION_ID);
-			ssTailConfig.setIsSolutionSetUpdate();
-			ssTailConfig.setIsWorksetIteration();
-
-			// inputs and driver
-			ssTailConfig.addInputToGroup(0);
-			ssTailConfig.setInputSerializer(serializer, 0);
-
-			// output
-			ssTailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			ssTailConfig.setDriver(CollectorMapDriver.class);
-			ssTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			ssTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-		}
-
-		// edges
-
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
-		JobGraphUtils.connect(intermediate, wsUpdateIntermediate,
-				DistributionPattern.POINTWISE);
-		wsUpdateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(wsUpdateIntermediate, ssTail, DistributionPattern.POINTWISE);
-		ssTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		vertices.setSlotSharingGroup(sharingGroup);
-		edges.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		wsUpdateIntermediate.setSlotSharingGroup(sharingGroup);
-		ssTail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-
-		intermediate.setStrictlyCoLocatedWith(head);
-		wsUpdateIntermediate.setStrictlyCoLocatedWith(head);
-		ssTail.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-
-	// -----------------------------------------------------------------------------------------------------------------
-	// Intermediate solution set update and workset tail
-	// -----------------------------------------------------------------------------------------------------------------
-
-	public JobGraph createJobGraphSolutionSetUpdateAndWorksetTail(
-			String verticesPath, String edgesPath, String resultPath, int numSubTasks, int maxIterations)
-	{
-		// -- init -------------------------------------------------------------------------------------------------
-		final TypeSerializerFactory<?> serializer = RecordSerializerFactory.get();
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<?> comparator =
-			new RecordComparatorFactory(new int[] { 0 }, new Class[] { LongValue.class }, new boolean[] { true });
-		final TypePairComparatorFactory<?, ?> pairComparator = RecordPairComparatorFactory.get();
-
-		JobGraph jobGraph = new JobGraph("Connected Components (Intermediate Solution Set Update, Workset Tail)");
-
-		// input
-		InputFormatVertex vertices = createVerticesInput(jobGraph, verticesPath, numSubTasks, serializer, comparator);
-		InputFormatVertex edges = createEdgesInput(jobGraph, edgesPath, numSubTasks, serializer, comparator);
-
-		// head
-		JobVertex head = createIterationHead(jobGraph, numSubTasks, serializer, comparator, pairComparator);
-
-		// intermediate
-		JobVertex intermediate = createIterationIntermediate(jobGraph, numSubTasks, serializer, comparator);
-		TaskConfig intermediateConfig = new TaskConfig(intermediate.getConfiguration());
-
-		// output and auxiliaries
-		JobVertex output = createOutput(jobGraph, resultPath, numSubTasks, serializer);
-		JobVertex sync = createSync(jobGraph, numSubTasks, maxIterations);
-
-		// ------------------ the intermediate (ss update) ----------------------
-		JobVertex ssJoinIntermediate = JobGraphUtils.createTask(IterationIntermediatePactTask.class,
-			"Solution Set Update", jobGraph, numSubTasks);
-		TaskConfig ssJoinIntermediateConfig = new TaskConfig(ssJoinIntermediate.getConfiguration());
-		{
-			ssJoinIntermediateConfig.setIterationId(ITERATION_ID);
-			ssJoinIntermediateConfig.setIsSolutionSetUpdate();
-			ssJoinIntermediateConfig.setIsSolutionSetUpdateWithoutReprobe();
-
-			// inputs
-			ssJoinIntermediateConfig.addInputToGroup(0);
-			ssJoinIntermediateConfig.setInputSerializer(serializer, 0);
-
-			// output
-			ssJoinIntermediateConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			ssJoinIntermediateConfig.setOutputComparator(comparator, 0);
-
-			ssJoinIntermediateConfig.setOutputSerializer(serializer);
-
-			// driver
-			ssJoinIntermediateConfig.setDriver(JoinWithSolutionSetSecondDriver.class);
-			ssJoinIntermediateConfig.setDriverStrategy(DriverStrategy.HYBRIDHASH_BUILD_SECOND);
-			ssJoinIntermediateConfig.setDriverComparator(comparator, 0);
-			ssJoinIntermediateConfig.setDriverPairComparator(pairComparator);
-			
-			ssJoinIntermediateConfig.setStubWrapper(new UserCodeClassWrapper<UpdateComponentIdMatch>(UpdateComponentIdMatch.class));
-		}
-
-		// -------------------------- ws tail --------------------------------
-		JobVertex wsTail = JobGraphUtils.createTask(IterationTailPactTask.class, "IterationWorksetTail", jobGraph, numSubTasks);
-		TaskConfig wsTailConfig = new TaskConfig(wsTail.getConfiguration());
-		{
-			wsTailConfig.setIterationId(ITERATION_ID);
-			wsTailConfig.setIsWorksetIteration();
-			wsTailConfig.setIsWorksetUpdate();
-
-			// inputs and driver
-			wsTailConfig.addInputToGroup(0);
-			wsTailConfig.setInputSerializer(serializer, 0);
-
-			// output
-			wsTailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			wsTailConfig.setDriver(CollectorMapDriver.class);
-			wsTailConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			wsTailConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-		}
-
-		// --------------- the wiring ---------------------
-
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(edges, head, DistributionPattern.ALL_TO_ALL);
-		JobGraphUtils.connect(vertices, head, DistributionPattern.ALL_TO_ALL);
-
-		JobGraphUtils.connect(head, intermediate, DistributionPattern.ALL_TO_ALL);
-		intermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
-		JobGraphUtils.connect(intermediate, ssJoinIntermediate, DistributionPattern.POINTWISE);
-		ssJoinIntermediateConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(ssJoinIntermediate, wsTail, DistributionPattern.POINTWISE);
-		wsTailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, 1);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		vertices.setSlotSharingGroup(sharingGroup);
-		edges.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		intermediate.setSlotSharingGroup(sharingGroup);
-		ssJoinIntermediate.setSlotSharingGroup(sharingGroup);
-		wsTail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-
-		intermediate.setStrictlyCoLocatedWith(head);
-		ssJoinIntermediate.setStrictlyCoLocatedWith(head);
-		wsTail.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-
-	public static final class DummyMapper extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record rec, Collector<Record> out) {
-			out.collect(rec);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
deleted file mode 100644
index 516309c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankNepheleITCase.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDanglingPageRank;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class DanglingPageRankNepheleITCase extends RecordAPITestBase {
-	
-	public static final String TEST_VERTICES = "1\n" +
-	                                           "2\n" +
-	                                           "5\n" +
-	                                           "3 1\n" +
-	                                           "4";
-
-	public static final String TEST_EDGES = "2 1\n" +
-	                                        "5 2 4\n" +
-	                                        "4 3 2\n" +
-	                                        "1 4 2 3";
-	
-	protected String pagesWithRankPath;
-	protected String edgesPath;
-	protected String resultPath;
-
-	public DanglingPageRankNepheleITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		this.pagesWithRankPath = createTempFile("pagesWithRank", TEST_VERTICES);
-		this.edgesPath = createTempFile("edges", TEST_EDGES);
-		this.resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		String[] parameters = new String[] {
-			Integer.valueOf(parallelism).toString(),
-			pagesWithRankPath,
-			edgesPath,
-			resultPath,
-			"<none>",
-			"2",
-			"5",
-			"5",
-			"30",
-			"5",
-			"1",
-			"0",
-			"100",
-			"0"
-		};
-		
-		return CustomCompensatableDanglingPageRank.getJobGraph(parameters);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
deleted file mode 100644
index ba22ce5..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/DanglingPageRankWithCombinerNepheleITCase.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.test.iterative.nephele.customdanglingpagerank.CustomCompensatableDanglingPageRankWithCombiner;
-import org.apache.flink.test.util.RecordAPITestBase;
-
-public class DanglingPageRankWithCombinerNepheleITCase extends RecordAPITestBase {
-	
-	protected String pagesWithRankPath;
-	protected String edgesPath;
-	protected String resultPath;
-
-	public DanglingPageRankWithCombinerNepheleITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	@Override
-	protected void preSubmit() throws Exception {
-		this.pagesWithRankPath = createTempFile("pagesWithRank", DanglingPageRankNepheleITCase.TEST_VERTICES);
-		this.edgesPath = createTempFile("edges", DanglingPageRankNepheleITCase.TEST_EDGES);
-		this.resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		String[] parameters = new String[] {
-			Integer.valueOf(parallelism).toString(),
-			pagesWithRankPath,
-			edgesPath,
-			resultPath,
-			"<none>",
-			"2",
-			"5",
-			"3",
-			"30",
-			"5",
-			"1",
-			"0",
-			"100",
-			"0"
-		};
-		
-		return CustomCompensatableDanglingPageRankWithCombiner.getJobGraph(parameters);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
deleted file mode 100644
index 7a3135c..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.java
+++ /dev/null
@@ -1,296 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import java.util.Collection;
-
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
-import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
-import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
-import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.FileOutputFormat;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
-import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.operators.CollectorMapDriver;
-import org.apache.flink.runtime.operators.DriverStrategy;
-import org.apache.flink.runtime.operators.GroupReduceDriver;
-import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.util.LocalStrategy;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.test.iterative.IterationWithChainingITCase;
-import org.apache.flink.test.recordJobs.kmeans.udfs.CoordVector;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
-import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.Collector;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-/**
- * Tests chained iteration tails.
- * <p>
- * GitHub issue #123 reports a problem with chaining of tasks to iteration tails. The initial fix worked around the
- * issue by having the compiler *not* chain tasks to an iteration tail. The existing IterationWithChainingITCase only
- * tests this compiler behavior. The JobGraph and bypasses the compiler to test the original chaining problem.
- * <p>
- * A chained mapper after the iteration tail (dummy reduce) increments the given input points in each iteration. The
- * final result will only be correct, if the chained mapper is successfully executed.
- * 
- * {@link IterationWithChainingITCase}
- */
-@SuppressWarnings("deprecation")
-@RunWith(Parameterized.class)
-public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
-
-	private static final String INPUT_STRING = "0|%d.25|\n" + "1|%d.25|\n";
-
-	private String dataPath;
-
-	private String resultPath;
-
-	public IterationWithChainingNepheleITCase(Configuration config) {
-		super(config);
-		setTaskManagerNumSlots(parallelism);
-	}
-
-	@Override
-	protected void preSubmit() throws Exception {
-		String initialInput = String.format(INPUT_STRING, 1, 2);
-		dataPath = createTempFile("data_points.txt", initialInput);
-		resultPath = getTempFilePath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		int maxIterations = config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
-		String result = String.format(INPUT_STRING, 1 + maxIterations, 2 + maxIterations);
-		compareResultsByLinesInMemory(result, resultPath);
-	}
-
-	@Parameterized.Parameters
-	public static Collection<Object[]> getConfigurations() {
-		Configuration config = new Configuration();
-		config.setInteger("ChainedMapperNepheleITCase#NoSubtasks", parallelism);
-		config.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
-		return toParameterList(config);
-	}
-
-	@Override
-	protected JobGraph getJobGraph() throws Exception {
-		int numSubTasks = config.getInteger("ChainedMapperNepheleITCase#NoSubtasks", 1);
-		int maxIterations = config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
-
-		return getTestJobGraph(dataPath, resultPath, numSubTasks, maxIterations);
-	}
-
-	private JobGraph getTestJobGraph(String inputPath, String outputPath, int numSubTasks, int maxIterations) {
-
-		final JobGraph jobGraph = new JobGraph("Iteration Tail with Chaining");
-
-		final TypeSerializerFactory<Record> serializer = RecordSerializerFactory.get();
-
-		@SuppressWarnings("unchecked")
-		final TypeComparatorFactory<Record> comparator =
-			new RecordComparatorFactory(new int[] { 0 }, new Class[] { IntValue.class });
-
-		final int ITERATION_ID = 1;
-
-		// --------------------------------------------------------------------------------------------------------------
-		// 1. VERTICES
-		// --------------------------------------------------------------------------------------------------------------
-
-		// - input -----------------------------------------------------------------------------------------------------
-		InputFormatVertex input = JobGraphUtils.createInput(
-			new PointInFormat(), inputPath, "Input", jobGraph, numSubTasks);
-		TaskConfig inputConfig = new TaskConfig(input.getConfiguration());
-		{
-			inputConfig.setOutputSerializer(serializer);
-			inputConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-		}
-
-		// - head ------------------------------------------------------------------------------------------------------
-		JobVertex head = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, numSubTasks);
-		TaskConfig headConfig = new TaskConfig(head.getConfiguration());
-		{
-			headConfig.setIterationId(ITERATION_ID);
-
-			// input to iteration head
-			headConfig.addInputToGroup(0);
-			headConfig.setInputSerializer(serializer, 0);
-			headConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
-			headConfig.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
-
-			// output into iteration
-			headConfig.setOutputSerializer(serializer);
-			headConfig.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
-			headConfig.setOutputComparator(comparator, 0);
-
-			// final output
-			TaskConfig headFinalOutConfig = new TaskConfig(new Configuration());
-			headFinalOutConfig.setOutputSerializer(serializer);
-			headFinalOutConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			headConfig.setIterationHeadFinalOutputConfig(headFinalOutConfig);
-
-			// the sync
-			headConfig.setIterationHeadIndexOfSyncOutput(2);
-
-			// driver
-			headConfig.setDriver(CollectorMapDriver.class);
-			headConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			headConfig.setStubWrapper(new UserCodeClassWrapper<DummyMapper>(DummyMapper.class));
-
-			// back channel
-			headConfig.setRelativeBackChannelMemory(1.0);
-		}
-
-		// - tail ------------------------------------------------------------------------------------------------------
-		JobVertex tail = JobGraphUtils.createTask(IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, numSubTasks);
-		TaskConfig tailConfig = new TaskConfig(tail.getConfiguration());
-		{
-			tailConfig.setIterationId(ITERATION_ID);
-
-			// inputs and driver
-			tailConfig.addInputToGroup(0);
-			tailConfig.setInputSerializer(serializer, 0);
-
-			// output
-			tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
-			tailConfig.setOutputSerializer(serializer);
-
-			// the driver
-			tailConfig.setDriver(GroupReduceDriver.class);
-			tailConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
-			tailConfig.setDriverComparator(comparator, 0);
-			tailConfig.setStubWrapper(new UserCodeClassWrapper<DummyReducer>(DummyReducer.class));
-
-			// chained mapper
-			TaskConfig chainedMapperConfig = new TaskConfig(new Configuration());
-			chainedMapperConfig.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
-			chainedMapperConfig.setStubWrapper(new UserCodeClassWrapper<IncrementCoordinatesMapper>(
-				IncrementCoordinatesMapper.class));
-
-			chainedMapperConfig.setInputLocalStrategy(0, LocalStrategy.NONE);
-			chainedMapperConfig.setInputSerializer(serializer, 0);
-
-			chainedMapperConfig.setOutputSerializer(serializer);
-
-			chainedMapperConfig.setIsWorksetUpdate();
-
-			tailConfig.addChainedTask(ChainedCollectorMapDriver.class, chainedMapperConfig, "Chained ID Mapper");
-		}
-
-		// - output ----------------------------------------------------------------------------------------------------
-		OutputFormatVertex output = JobGraphUtils.createFileOutput(jobGraph, "Output", numSubTasks);
-		TaskConfig outputConfig = new TaskConfig(output.getConfiguration());
-		{
-			outputConfig.addInputToGroup(0);
-			outputConfig.setInputSerializer(serializer, 0);
-
-			outputConfig.setStubWrapper(new UserCodeClassWrapper<PointOutFormat>(PointOutFormat.class));
-			outputConfig.setStubParameter(FileOutputFormat.FILE_PARAMETER_KEY, outputPath);
-		}
-
-		// - sync ------------------------------------------------------------------------------------------------------
-		JobVertex sync = JobGraphUtils.createSync(jobGraph, numSubTasks);
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setNumberOfIterations(maxIterations);
-		syncConfig.setIterationId(ITERATION_ID);
-
-		// --------------------------------------------------------------------------------------------------------------
-		// 2. EDGES
-		// --------------------------------------------------------------------------------------------------------------
-		JobGraphUtils.connect(input, head, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, tail, DistributionPattern.ALL_TO_ALL);
-		tailConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, numSubTasks);
-
-		JobGraphUtils.connect(head, output, DistributionPattern.POINTWISE);
-
-		JobGraphUtils.connect(head, sync, DistributionPattern.POINTWISE);
-
-		// --------------------------------------------------------------------------------------------------------------
-		// 3. INSTANCE SHARING
-		// --------------------------------------------------------------------------------------------------------------
-		
-		SlotSharingGroup sharingGroup = new SlotSharingGroup();
-		
-		input.setSlotSharingGroup(sharingGroup);
-		head.setSlotSharingGroup(sharingGroup);
-		tail.setSlotSharingGroup(sharingGroup);
-		output.setSlotSharingGroup(sharingGroup);
-		sync.setSlotSharingGroup(sharingGroup);
-		
-		tail.setStrictlyCoLocatedWith(head);
-
-		return jobGraph;
-	}
-
-	public static final class DummyMapper extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record rec, Collector<Record> out) {
-			out.collect(rec);
-		}
-	}
-
-	public static final class DummyReducer implements GroupReduceFunction<Record, Record> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void reduce(Iterable<Record> it, Collector<Record> out) {
-			for (Record r :it) {
-				out.collect(r);
-			}
-		}
-	}
-
-	public static final class IncrementCoordinatesMapper extends MapFunction {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public void map(Record rec, Collector<Record> out) {
-			CoordVector coord = rec.getField(1, CoordVector.class);
-
-			double[] vector = coord.getCoordinates();
-			for (int i = 0; i < vector.length; i++) {
-				vector[i]++;
-			}
-
-			rec.setField(1, coord);
-			out.collect(rec);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a7a57ebe/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
deleted file mode 100644
index 4edc83e..0000000
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/JobGraphUtils.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.test.iterative.nephele;
-
-import org.apache.flink.api.common.io.FileInputFormat;
-import org.apache.flink.api.common.io.InputFormat;
-import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
-import org.apache.flink.api.common.operators.util.UserCodeWrapper;
-import org.apache.flink.runtime.iterative.task.IterationSynchronizationSinkTask;
-import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputFormatVertex;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
-import org.apache.flink.runtime.operators.DataSinkTask;
-import org.apache.flink.runtime.operators.DataSourceTask;
-import org.apache.flink.runtime.operators.RegularPactTask;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-
-public class JobGraphUtils {
-
-	public static final long MEGABYTE = 1024l * 1024l;
-
-	private JobGraphUtils() {}
-	
-	public static <T extends FileInputFormat<?>> InputFormatVertex createInput(T stub, String path, String name, JobGraph graph,
-			int parallelism)
-	{
-		stub.setFilePath(path);
-		return createInput(new UserCodeObjectWrapper<T>(stub), name, graph, parallelism);
-	}
-
-	private static <T extends InputFormat<?,?>> InputFormatVertex createInput(UserCodeWrapper<T> stub, String name, JobGraph graph,
-			int parallelism)
-	{
-		InputFormatVertex inputVertex = new InputFormatVertex(name);
-		graph.addVertex(inputVertex);
-		
-		inputVertex.setInvokableClass(DataSourceTask.class);
-		inputVertex.setParallelism(parallelism);
-
-		TaskConfig inputConfig = new TaskConfig(inputVertex.getConfiguration());
-		inputConfig.setStubWrapper(stub);
-		
-		return inputVertex;
-	}
-
-//	public static void connect(AbstractJobVertex source, AbstractJobVertex target, ChannelType channelType,
-//			DistributionPattern distributionPattern, ShipStrategyType shipStrategy) throws JobGraphDefinitionException
-//	{
-//		source.connectTo(target, channelType, CompressionLevel.NO_COMPRESSION, distributionPattern);
-//		new TaskConfig(source.getConfiguration()).addOutputShipStrategy(shipStrategy);
-//	}
-	
-	public static void connect(JobVertex source, JobVertex target, DistributionPattern distributionPattern) {
-		target.connectNewDataSetAsInput(source, distributionPattern);
-	}
-
-	@SuppressWarnings("rawtypes") 
-	public static JobVertex createTask(Class<? extends RegularPactTask> task, String name, JobGraph graph, int parallelism)
-	{
-		JobVertex taskVertex = new JobVertex(name);
-		graph.addVertex(taskVertex);
-		
-		taskVertex.setInvokableClass(task);
-		taskVertex.setParallelism(parallelism);
-		return taskVertex;
-	}
-
-	public static JobVertex createSync(JobGraph jobGraph, int parallelism) {
-		JobVertex sync = new JobVertex("BulkIterationSync");
-		jobGraph.addVertex(sync);
-		
-		sync.setInvokableClass(IterationSynchronizationSinkTask.class);
-		sync.setParallelism(1);
-		
-		TaskConfig syncConfig = new TaskConfig(sync.getConfiguration());
-		syncConfig.setGateIterativeWithNumberOfEventsUntilInterrupt(0, parallelism);
-		return sync;
-	}
-
-	public static OutputFormatVertex createFileOutput(JobGraph jobGraph, String name, int parallelism) {
-		OutputFormatVertex sinkVertex = new OutputFormatVertex(name);
-		jobGraph.addVertex(sinkVertex);
-		
-		sinkVertex.setInvokableClass(DataSinkTask.class);
-		sinkVertex.setParallelism(parallelism);
-		return sinkVertex;
-	}
-}


Mime
View raw message