flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [08/16] flink git commit: Remove Record API dependencies from CC iteration tests
Date Tue, 24 Nov 2015 17:17:51 GMT
Remove Record API dependencies from CC iteration tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b640c017
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b640c017
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b640c017

Branch: refs/heads/master
Commit: b640c017970df59c0de2bf7084da307062e84713
Parents: 8543dd9
Author: Fabian Hueske <fhueske@apache.org>
Authored: Thu Oct 22 21:18:04 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Tue Nov 24 18:16:51 2015 +0100

----------------------------------------------------------------------
 .../iterative/ConnectedComponentsITCase.java    |  62 +++++--
 ...ectedComponentsWithDeferredUpdateITCase.java | 165 ++++++++-----------
 .../ConnectedComponentsWithObjectMapITCase.java |   9 +-
 ...tedComponentsWithSolutionSetFirstITCase.java | 152 +++++++----------
 4 files changed, 177 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b640c017/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
index df3c00d..61e08d4 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsITCase.java
@@ -21,13 +21,19 @@ package org.apache.flink.test.iterative;
 
 import java.io.BufferedReader;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.RecordAPITestBase;
+import org.apache.flink.test.util.JavaProgramTestBase;
 
 
-public class ConnectedComponentsITCase extends RecordAPITestBase {
+public class ConnectedComponentsITCase extends JavaProgramTestBase {
 	
 	private static final long SEED = 0xBADC0FFEEBEEFL;
 	
@@ -40,22 +46,44 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 	protected String edgesPath;
 	protected String resultPath;
 
-	public ConnectedComponentsITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
 		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES,
NUM_VERTICES, SEED));
 		resultPath = getTempFilePath("results");
 	}
-	
+
 	@Override
-	protected Plan getTestJob() {
-		WorksetConnectedComponents cc = new WorksetConnectedComponents();
-		return cc.getPlan(Integer.valueOf(parallelism).toString(),  verticesPath, edgesPath, resultPath,
"100");
+	protected void testProgram() throws Exception {
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// read vertex and edge data
+		DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
+
+		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter("
").types(Long.class, Long.class)
+				.flatMap(new ConnectedComponents.UndirectEdge());
+
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
+
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the
component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset().join(edges).where(0).equalTo(0).with(new
ConnectedComponents.NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new ConnectedComponents.ComponentIdFilter());
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(changes, changes);
+
+		result.writeAsCsv(resultPath, "\n", " ");
+
+		// execute program
+		env.execute("Connected Components Example");
 	}
 
 	@Override
@@ -64,4 +92,12 @@ public class ConnectedComponentsITCase extends RecordAPITestBase {
 			ConnectedComponentsData.checkOddEvenResult(reader);
 		}
 	}
+
+	public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>,
Tuple2<T, T>> {
+
+		@Override
+		public Tuple2<T, T> map(Tuple1<T> vertex) {
+			return new Tuple2<>(vertex.f0, vertex.f0);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b640c017/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
index d5d150d..33dd989 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithDeferredUpdateITCase.java
@@ -20,28 +20,20 @@
 package org.apache.flink.test.iterative;
 
 import java.io.BufferedReader;
-import java.io.Serializable;
 import java.util.Collection;
 
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.MapFunction;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.MinimumComponentIDReduce;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -49,7 +41,7 @@ import org.junit.runners.Parameterized.Parameters;
 
 @SuppressWarnings("deprecation")
 @RunWith(Parameterized.class)
-public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBase {
+public class ConnectedComponentsWithDeferredUpdateITCase extends JavaProgramTestBase {
 	
 	private static final long SEED = 0xBADC0FFEEBEEFL;
 	
@@ -65,20 +57,64 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 	
 	public ConnectedComponentsWithDeferredUpdateITCase(Configuration config) {
 		super(config);
-		setTaskManagerNumSlots(parallelism);
 	}
-	
+
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
 		edgesPath = createTempFile("edges.txt", ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES,
NUM_VERTICES, SEED));
 		resultPath = getTempFilePath("results");
 	}
-	
+
 	@Override
-	protected Plan getTestJob() {
+	protected void testProgram() throws Exception {
 		boolean extraMapper = config.getBoolean("ExtraMapper", false);
-		return getPlan(parallelism, verticesPath, edgesPath, resultPath, 100, extraMapper);
+
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// read vertex and edge data
+		DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
+
+		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter("
").types(Long.class, Long.class)
+				.flatMap(new ConnectedComponents.UndirectEdge());
+
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
+
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
+
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the
component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> changes = iteration.getWorkset()
+				.join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1)
+				.join(iteration.getSolutionSet()).where(0).equalTo(0)
+				.with(new UpdateComponentIdMatchNonPreserving());
+
+		DataSet<Tuple2<Long,Long>> delta;
+		if(extraMapper) {
+			delta = changes.map(
+					// ID Mapper
+					new MapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>>() {
+						@Override
+						public Tuple2<Long, Long> map(Tuple2<Long, Long> v) throws Exception {
+							return v;
+						}
+					});
+		}
+		else {
+			delta = changes;
+		}
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(delta, changes);
+
+		result.writeAsCsv(resultPath, "\n", " ");
+
+		// execute program
+		env.execute("Connected Components Example");
 	}
 
 	@Override
@@ -98,84 +134,21 @@ public class ConnectedComponentsWithDeferredUpdateITCase extends RecordAPITestBa
 		
 		return toParameterList(config1, config2);
 	}
-	
-	@SuppressWarnings("unchecked")
-	public static Plan getPlan(int numSubTasks, String verticesInput, String edgeInput, String
output, int maxIterations, boolean extraMap) {
-
-		// data source for initial vertices
-		FileDataSource initialVertices = new FileDataSource(new CsvInputFormat(' ', LongValue.class),
verticesInput, "Vertices");
-		
-		MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(initialVertices).name("Assign
Vertex Ids").build();
-		
-		// the loop takes the vertices as the solution set and changed vertices as the workset
-		// initially, all vertices are changed
-		DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
-		iteration.setInitialSolutionSet(verticesWithId);
-		iteration.setInitialWorkset(verticesWithId);
-		iteration.setMaximumNumberOfIterations(maxIterations);
-		
-		// data source for the edges
-		FileDataSource edges = new FileDataSource(new CsvInputFormat(' ', LongValue.class, LongValue.class),
edgeInput, "Edges");
-
-		// join workset (changed vertices) with the edges to propagate changes to neighbors
-		JoinOperator joinWithNeighbors = JoinOperator.builder(new NeighborWithComponentIDJoin(),
LongValue.class, 0, 0)
-				.input1(iteration.getWorkset())
-				.input2(edges)
-				.name("Join Candidate Id With Neighbor")
-				.build();
-
-		// find for each neighbor the smallest of all candidates
-		ReduceOperator minCandidateId = ReduceOperator.builder(new MinimumComponentIDReduce(),
LongValue.class, 0)
-				.input(joinWithNeighbors)
-				.name("Find Minimum Candidate Id")
-				.build();
-		
-		// join candidates with the solution set and update if the candidate component-id is smaller
-		JoinOperator updateComponentId = JoinOperator.builder(new UpdateComponentIdMatchNonPreserving(),
LongValue.class, 0, 0)
-				.input1(minCandidateId)
-				.input2(iteration.getSolutionSet())
-				.name("Update Component Id")
-				.build();
-		
-		if (extraMap) {
-			MapOperator mapper = MapOperator.builder(IdentityMap.class).input(updateComponentId).name("idmap").build();
-			iteration.setSolutionSetDelta(mapper);
-		} else {
-			iteration.setSolutionSetDelta(updateComponentId);
-		}
-		
-		iteration.setNextWorkset(updateComponentId);
 
-		// sink is the iteration result
-		FileDataSink result = new FileDataSink(new CsvOutputFormat("\n", " ", LongValue.class,
LongValue.class), output, iteration, "Result");
-
-		// return the PACT plan
-		Plan plan = new Plan(result, "Workset Connected Components");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
-	
-	public static final class UpdateComponentIdMatchNonPreserving extends JoinFunction implements
Serializable {
+	public static final class UpdateComponentIdMatchNonPreserving
+			implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long,
Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void join(Record newVertexWithComponent, Record currentVertexWithComponent, Collector<Record>
out){
-	
-			long candidateComponentID = newVertexWithComponent.getField(1, LongValue.class).getValue();
-			long currentComponentID = currentVertexWithComponent.getField(1, LongValue.class).getValue();
-	
-			if (candidateComponentID < currentComponentID) {
-				out.collect(newVertexWithComponent);
+		public void join(
+				Tuple2<Long, Long> candidate,
+				Tuple2<Long, Long> current,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			if(candidate.f1 < current.f1) {
+				out.collect(candidate);
 			}
 		}
 	}
-	
-	public static final class IdentityMap extends MapFunction {
-		private static final long serialVersionUID = 1L;
 
-		@Override
-		public void map(Record record, Collector<Record> out) throws Exception {
-			out.collect(record);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b640c017/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
index f0a1dd7..a8a28f1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithObjectMapITCase.java
@@ -76,7 +76,7 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
 												.flatMap(new UndirectEdge());
 				
 		// assign the initial components (equal to the vertex id)
-		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new DuplicateValue<Long>());
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
 						
 		// open a delta iteration
 		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
@@ -98,11 +98,4 @@ public class ConnectedComponentsWithObjectMapITCase extends JavaProgramTestBase
 		env.execute("Connected Components Example");
 	}
 	
-	public static final class DuplicateValue<T> implements MapFunction<Tuple1<T>,
Tuple2<T, T>> {
-		
-		@Override
-		public Tuple2<T, T> map(Tuple1<T> vertex) {
-			return new Tuple2<T, T>(vertex.f0, vertex.f0);
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b640c017/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
index b97d0ad..fbe1402 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/ConnectedComponentsWithSolutionSetFirstITCase.java
@@ -20,33 +20,25 @@
 package org.apache.flink.test.iterative;
 
 import java.io.BufferedReader;
-import java.io.Serializable;
-
-import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.record.functions.JoinFunction;
-import org.apache.flink.api.java.record.functions.FunctionAnnotation.ConstantFieldsSecondExcept;
-import org.apache.flink.api.java.record.io.CsvInputFormat;
-import org.apache.flink.api.java.record.io.CsvOutputFormat;
-import org.apache.flink.api.java.record.operators.DeltaIteration;
-import org.apache.flink.api.java.record.operators.FileDataSink;
-import org.apache.flink.api.java.record.operators.FileDataSource;
-import org.apache.flink.api.java.record.operators.JoinOperator;
-import org.apache.flink.api.java.record.operators.MapOperator;
-import org.apache.flink.api.java.record.operators.ReduceOperator;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.DuplicateLongMap;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.MinimumComponentIDReduce;
-import org.apache.flink.test.recordJobs.graph.WorksetConnectedComponents.NeighborWithComponentIDJoin;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.examples.java.graph.ConnectedComponents;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.RecordAPITestBase;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.Record;
+import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.util.Collector;
 
 /**
  * Tests a bug that prevented that the solution set can be on both sides of the match/cogroup
function.
  */
 @SuppressWarnings("deprecation")
-public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITestBase {
+public class ConnectedComponentsWithSolutionSetFirstITCase extends JavaProgramTestBase {
 	
 	private static final long SEED = 0xBADC0FFEEBEEFL;
 	
@@ -59,11 +51,6 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	protected String edgesPath;
 	protected String resultPath;
 
-	public ConnectedComponentsWithSolutionSetFirstITCase(){
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	
 	@Override
 	protected void preSubmit() throws Exception {
 		verticesPath = createTempFile("vertices.txt", ConnectedComponentsData.getEnumeratingVertices(NUM_VERTICES));
@@ -72,90 +59,67 @@ public class ConnectedComponentsWithSolutionSetFirstITCase extends RecordAPITest
 	}
 	
 	@Override
-	protected Plan getTestJob() {
-		return getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(parallelism, verticesPath,
edgesPath,
-				resultPath, 100);
+	protected void testProgram() throws Exception {
+		// set up execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// read vertex and edge data
+		DataSet<Tuple1<Long>> vertices = env.readCsvFile(verticesPath).types(Long.class);
+
+		DataSet<Tuple2<Long, Long>> edges = env.readCsvFile(edgesPath).fieldDelimiter("
").types(Long.class, Long.class)
+				.flatMap(new ConnectedComponents.UndirectEdge());
+
+		// assign the initial components (equal to the vertex id)
+		DataSet<Tuple2<Long, Long>> verticesWithInitialId = vertices.map(new ConnectedComponentsITCase.DuplicateValue<Long>());
+
+		// open a delta iteration
+		DeltaIteration<Tuple2<Long, Long>, Tuple2<Long, Long>> iteration =
+				verticesWithInitialId.iterateDelta(verticesWithInitialId, 100, 0);
+
+		// apply the step logic: join with the edges, select the minimum neighbor, update if the
component of the candidate is smaller
+		DataSet<Tuple2<Long, Long>> minNeighbor = iteration.getWorkset()
+				.join(edges).where(0).equalTo(0).with(new ConnectedComponents.NeighborWithComponentIDJoin())
+				.groupBy(0).aggregate(Aggregations.MIN, 1);
+
+		DataSet<Tuple2<Long, Long>> updatedIds = iteration.getSolutionSet()
+				.join(minNeighbor).where(0).equalTo(0).with(new UpdateComponentIdMatchMirrored());
+
+		// close the delta iteration (delta and new workset are identical)
+		DataSet<Tuple2<Long, Long>> result = iteration.closeWith(updatedIds, updatedIds);
+
+		result.writeAsCsv(resultPath, "\n", " ");
+
+		// execute program
+		env.execute("Connected Components Example");
 	}
 
+
 	@Override
 	protected void postSubmit() throws Exception {
 		for (BufferedReader reader : getResultReader(resultPath)) {
 			ConnectedComponentsData.checkOddEvenResult(reader);
 		}
 	}
-	
+
 	// --------------------------------------------------------------------------------------------
 	//  Classes and methods for the test program
 	// --------------------------------------------------------------------------------------------
-	
-	@ConstantFieldsSecondExcept({})
-	public static final class UpdateComponentIdMatchMirrored extends JoinFunction implements
Serializable {
-		
+
+	@FunctionAnnotation.ForwardedFieldsSecond("*")
+	public static final class UpdateComponentIdMatchMirrored
+			implements FlatJoinFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Tuple2<Long,
Long>> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
-		public void join(Record currentVertexWithComponent, Record newVertexWithComponent, Collector<Record>
out){
-	
-			long candidateComponentID = newVertexWithComponent.getField(1, LongValue.class).getValue();
-			long currentComponentID = currentVertexWithComponent.getField(1, LongValue.class).getValue();
-	
-			if (candidateComponentID < currentComponentID) {
-				out.collect(newVertexWithComponent);
+		public void join(
+				Tuple2<Long, Long> current,
+				Tuple2<Long, Long> candidate,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			if(candidate.f1 < current.f1) {
+				out.collect(candidate);
 			}
+
 		}
 	}
-	
-	@SuppressWarnings("unchecked")
-	private static Plan getPlanForWorksetConnectedComponentsWithSolutionSetAsFirstInput(
-			int numSubTasks, String verticesInput, String edgeInput, String output, int maxIterations)
-	{
-		// data source for initial vertices
-		FileDataSource initialVertices = new FileDataSource(new CsvInputFormat(' ', LongValue.class),
verticesInput, "Vertices");
-		
-		MapOperator verticesWithId = MapOperator.builder(DuplicateLongMap.class).input(initialVertices).name("Assign
Vertex Ids").build();
-		
-		DeltaIteration iteration = new DeltaIteration(0, "Connected Components Iteration");
-		iteration.setInitialSolutionSet(verticesWithId);
-		iteration.setInitialWorkset(verticesWithId);
-		iteration.setMaximumNumberOfIterations(maxIterations);
-		
-		// create DataSourceContract for the edges
-		FileDataSource edges = new FileDataSource(new CsvInputFormat(' ', LongValue.class, LongValue.class),
edgeInput, "Edges");
-
-		// create CrossOperator for distance computation
-		JoinOperator joinWithNeighbors = JoinOperator.builder(new NeighborWithComponentIDJoin(),
LongValue.class, 0, 0)
-				.input1(iteration.getWorkset())
-				.input2(edges)
-				.name("Join Candidate Id With Neighbor")
-				.build();
-
-		// create ReduceOperator for finding the nearest cluster centers
-		ReduceOperator minCandidateId = ReduceOperator.builder(new MinimumComponentIDReduce(),
LongValue.class, 0)
-				.input(joinWithNeighbors)
-				.name("Find Minimum Candidate Id")
-				.build();
-		
-		// create CrossOperator for distance computation
-		JoinOperator updateComponentId = JoinOperator.builder(new UpdateComponentIdMatchMirrored(),
LongValue.class, 0, 0)
-				.input1(iteration.getSolutionSet())
-				.input2(minCandidateId)
-				.name("Update Component Id")
-				.build();
-		
-		iteration.setNextWorkset(updateComponentId);
-		iteration.setSolutionSetDelta(updateComponentId);
-
-		// create DataSinkContract for writing the new cluster positions
-		FileDataSink result = new FileDataSink(new CsvOutputFormat(), output, iteration, "Result");
-		CsvOutputFormat.configureRecordFormat(result)
-			.recordDelimiter('\n')
-			.fieldDelimiter(' ')
-			.field(LongValue.class, 0)
-			.field(LongValue.class, 1);
-
-		// return the PACT plan
-		Plan plan = new Plan(result, "Workset Connected Components");
-		plan.setDefaultParallelism(numSubTasks);
-		return plan;
-	}
 }


Mime
View raw message