flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [03/10] flink git commit: [gelly] [refactoring] Removed Example end string from all gelly examples
Date Tue, 19 May 2015 21:03:32 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
deleted file mode 100644
index e3d3e1c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Simple Community Detection Algorithm.
- *
- * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
- * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
- * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
- * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- *
- * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- */
-public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> {
-
-	private Integer maxIterations;
-
-	private Double delta;
-
-	public SimpleCommunityDetection(Integer maxIterations, Double delta) {
-
-		this.maxIterations = maxIterations;
-		this.delta = delta;
-	}
-
-	@Override
-	public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
-
-		Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
-
-		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
-				.mapVertices(new AddScoreToVertexValuesMapper());
-
-		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
-				new LabelMessenger(), maxIterations)
-				.mapVertices(new RemoveScoreFromVertexValuesMapper());
-	}
-
-	@SuppressWarnings("serial")
-	public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
-		private Double delta;
-
-		public VertexLabelUpdater(Double delta) {
-			this.delta = delta;
-		}
-
-		@Override
-		public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
-								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
-
-			// we would like these two maps to be ordered
-			Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
-			Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
-
-			for (Tuple2<Long, Double> message : inMessages) {
-				// split the message into received label and score
-				Long receivedLabel = message.f0;
-				Double receivedScore = message.f1;
-
-				// if the label was received before
-				if (receivedLabelsWithScores.containsKey(receivedLabel)) {
-					Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
-					receivedLabelsWithScores.put(receivedLabel, newScore);
-				} else {
-					// first time we see the label
-					receivedLabelsWithScores.put(receivedLabel, receivedScore);
-				}
-
-				// store the labels with the highest scores
-				if (labelsWithHighestScore.containsKey(receivedLabel)) {
-					Double currentScore = labelsWithHighestScore.get(receivedLabel);
-					if (currentScore < receivedScore) {
-						// record the highest score
-						labelsWithHighestScore.put(receivedLabel, receivedScore);
-					}
-				} else {
-					// first time we see this label
-					labelsWithHighestScore.put(receivedLabel, receivedScore);
-				}
-			}
-
-			if(receivedLabelsWithScores.size() > 0) {
-				// find the label with the highest score from the ones received
-				Double maxScore = -Double.MAX_VALUE;
-				Long maxScoreLabel = labelScore.f0;
-				for (Long curLabel : receivedLabelsWithScores.keySet()) {
-
-					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
-						maxScore = receivedLabelsWithScores.get(curLabel);
-						maxScoreLabel = curLabel;
-					}
-				}
-
-				// find the highest score of maxScoreLabel
-				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
-				// re-score the new label
-				if (maxScoreLabel != labelScore.f0) {
-					highestScore -= delta / getSuperstepNumber();
-				}
-				// else delta = 0
-				// update own label
-				setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
-			Tuple2<Long, Double>, Double> {
-
-		@Override
-		public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
-
-			for(Edge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
-			}
-
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
-
-		@Override
-		public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
-			return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
-
-		@Override
-		public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
-			return vertex.getValue().f0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
deleted file mode 100644
index f4f8b27..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ /dev/null
@@ -1,108 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-@SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double> {
-
-	private final K srcVertexId;
-	private final Integer maxIterations;
-
-	public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
-		this.srcVertexId = srcVertexId;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
-
-		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
-				maxIterations);
-	}
-
-	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
-
-		private K srcVertexId;
-
-		public InitVerticesMapper(K srcId) {
-			this.srcVertexId = srcId;
-		}
-
-		public Double map(Vertex<K, Double> value) {
-			if (value.f0.equals(srcVertexId)) {
-				return 0.0;
-			} else {
-				return Double.MAX_VALUE;
-			}
-		}
-	}
-
-	/**
-	 * Function that updates the value of a vertex by picking the minimum
-	 * distance from all incoming messages.
-	 * 
-	 * @param <K>
-	 */
-	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
-		@Override
-		public void updateVertex(K vertexKey, Double vertexValue,
-				MessageIterator<Double> inMessages) {
-
-			Double minDistance = Double.MAX_VALUE;
-
-			for (double msg : inMessages) {
-				if (msg < minDistance) {
-					minDistance = msg;
-				}
-			}
-
-			if (vertexValue > minDistance) {
-				setNewVertexValue(minDistance);
-			}
-		}
-	}
-
-	/**
-	 * Distributes the minimum distance associated with a given vertex among all
-	 * the target vertices summed up with the edge's value.
-	 * 
-	 * @param <K>
-	 */
-	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		@Override
-		public void sendMessages(K vertexKey, Double newDistance)
-				throws Exception {
-			for (Edge<K, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
new file mode 100644
index 0000000..41180cd
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.io.Serializable;
+
+@SuppressWarnings("serial")
+public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serializable>
+		implements GraphAlgorithm<K, Double, Double> {
+
+	private final K srcVertexId;
+	private final Integer maxIterations;
+
+	public SingleSourceShortestPathsAlgorithm(K srcVertexId, Integer maxIterations) {
+		this.srcVertexId = srcVertexId;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				maxIterations);
+	}
+
+	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
+
+		private K srcVertexId;
+
+		public InitVerticesMapper(K srcId) {
+			this.srcVertexId = srcId;
+		}
+
+		public Double map(Vertex<K, Double> value) {
+			if (value.f0.equals(srcVertexId)) {
+				return 0.0;
+			} else {
+				return Double.MAX_VALUE;
+			}
+		}
+	}
+
+	/**
+	 * Function that updates the value of a vertex by picking the minimum
+	 * distance from all incoming messages.
+	 * 
+	 * @param <K>
+	 */
+	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+		@Override
+		public void updateVertex(K vertexKey, Double vertexValue,
+				MessageIterator<Double> inMessages) {
+
+			Double minDistance = Double.MAX_VALUE;
+
+			for (double msg : inMessages) {
+				if (msg < minDistance) {
+					minDistance = msg;
+				}
+			}
+
+			if (vertexValue > minDistance) {
+				setNewVertexValue(minDistance);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 * 
+	 * @param <K>
+	 */
+	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(K vertexKey, Double newDistance)
+				throws Exception {
+			for (Edge<K, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index a0c1de4..ca40323 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,9 +20,9 @@ package org.apache.flink.graph.test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAConnectedComponentsExample;
 import org.apache.flink.graph.example.GSAPageRank;
-import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
+import org.apache.flink.graph.example.GSAConnectedComponents;
+import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
 import org.junit.Before;
@@ -70,7 +70,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testConnectedComponents() throws Exception {
-		GSAConnectedComponentsExample.main(new String[]{edgesPath, resultPath, "16"});
+		GSAConnectedComponents.main(new String[]{edgesPath, resultPath, "16"});
 		expectedResult = "1 1\n" +
 				"2 1\n" +
 				"3 1\n" +
@@ -86,8 +86,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testSingleSourceShortestPath() throws Exception {
-		GSASingleSourceShortestPathsExample.main(new String[]{"1", edgesPath, resultPath, "16"});
+	public void testSingleSourceShortestPaths() throws Exception {
+		GSASingleSourceShortestPaths.main(new String[]{"1", edgesPath, resultPath, "16"});
 		expectedResult = "1 0.0\n" +
 				"2 12.0\n" +
 				"3 13.0\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
new file mode 100644
index 0000000..1302424
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/CommunityDetectionITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.CommunityDetection;
+import org.apache.flink.graph.example.utils.CommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public CommunityDetectionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example
+		 */
+		final String edges = "1	2	1.0\n" + "1	3	2.0\n" + "1	4	3.0\n" + "1	5	4.0\n" + "2	6	5.0\n" +
+				"6	7	6.0\n" + "6	8	7.0\n" + "7	8	8.0";
+		edgesPath = createTempFile(edges);
+
+		CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
+				CommunityDetectionData.DELTA + ""});
+
+		expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
+		 */
+
+		final String edges = "1	2	1.0\n" + "1	3	1.0\n" + "1	4	1.0\n" + "1	5	1.0";
+		edgesPath = createTempFile(edges);
+
+		CommunityDetection.main(new String[]{edgesPath, resultPath, "1",
+				CommunityDetectionData.DELTA + ""});
+
+		expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+	}
+
+
+	// -------------------------------------------------------------------------
+	// Util methods
+	// -------------------------------------------------------------------------
+	private String createTempFile(final String rows) throws Exception {
+		File tempFile = tempFolder.newFile();
+		Files.write(rows, tempFile, Charsets.UTF_8);
+		return tempFile.toURI().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
index f07f407..b0bacc4 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
@@ -20,8 +20,8 @@ package org.apache.flink.graph.test.example;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.ConnectedComponentsExample;
-import org.apache.flink.graph.example.utils.ConnectedComponentsExampleData;
+import org.apache.flink.graph.example.ConnectedComponents;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
 import org.junit.Before;
@@ -54,14 +54,14 @@ public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
 		resultPath = tempFolder.newFile().toURI().toString();
 
 		File edgesFile = tempFolder.newFile();
-		Files.write(ConnectedComponentsExampleData.EDGES, edgesFile, Charsets.UTF_8);
+		Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
 		edgesPath = edgesFile.toURI().toString();
 	}
 
 	@Test
 	public void testConnectedComponentsExample() throws Exception {
-		ConnectedComponentsExample.main(new String[]{edgesPath, resultPath, ConnectedComponentsExampleData.MAX_ITERATIONS + ""});
-		expected = ConnectedComponentsExampleData.VERTICES_WITH_MIN_ID;
+		ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
+		expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
 	}
 
 	@After

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
index 407c4a4..f2f3d8c 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -25,7 +25,7 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.library.ConnectedComponentsAlgorithm;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
 import org.apache.flink.types.NullValue;
@@ -61,7 +61,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
 		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
 		DataSet<Vertex<Long, Long>> result = graph
-				.run(new ConnectedComponents(100)).getVertices();
+				.run(new ConnectedComponentsAlgorithm(100)).getVertices();
 
 		result.writeAsCsv(resultPath, "\n", " ");
 		env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
deleted file mode 100644
index fa1c246..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphExampleITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.EuclideanGraphExample;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class EuclideanGraphExampleITCase extends MultipleProgramsTestBase {
-
-	private String verticesPath;
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public EuclideanGraphExampleITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-		File verticesFile = tempFolder.newFile();
-		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
-
-		verticesPath = verticesFile.toURI().toString();
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testGraphWeightingExanple() throws Exception {
-		EuclideanGraphExample.main(new String[]{verticesPath, edgesPath, resultPath});
-		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
new file mode 100644
index 0000000..183c429
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.EuclideanGraphWeighing;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testGraphWeightingWeighing() throws Exception {
+		EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath});
+		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
deleted file mode 100644
index bc224a3..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.JaccardSimilarityMeasureExample;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class JaccardSimilarityMeasureExampleITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public JaccardSimilarityMeasureExampleITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
-
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testJaccardSimilarityMeasureExample() throws Exception {
-		JaccardSimilarityMeasureExample.main(new String[]{edgesPath, resultPath});
-		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
new file mode 100644
index 0000000..294a756
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasure;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testJaccardSimilarityMeasureExample() throws Exception {
+		JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath});
+		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
deleted file mode 100644
index 3298b7f..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java
+++ /dev/null
@@ -1,143 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.LabelPropagationExample;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationExampleITCase extends MultipleProgramsTestBase {
-
-	public LabelPropagationExampleITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String resultPath;
-    private String expectedResult;
-
-    @Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expectedResult, resultPath);
-	}
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of label propagation example with a simple graph
-		 */
-
-		final String vertices = "1	10\n" +
-				"2	10\n" +
-				"3	30\n" +
-				"4	40\n" +
-				"5	40\n" +
-				"6	40\n" +
-				"7	70\n";
-
-		final String edges = "1	3\n" +
-				"2	3\n" +
-				"4	7\n" +
-				"5	7\n" +
-				"6	7\n" +
-				"7	3\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
-
-		expectedResult = "1,10\n" +
-			"2,10\n" +
-			"3,10\n" +
-			"4,40\n" +
-			"5,40\n" +
-			"6,40\n" +
-			"7,40\n";
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test the label propagation example where a tie must be broken
-		 */
-
-		final String vertices = "1	10\n" +
-				"2	10\n" +
-				"3	10\n" +
-				"4	10\n" +
-				"5	0\n" +
-				"6	20\n" +
-				"7	20\n" +
-				"8	20\n" +
-				"9	20\n";
-
-		final String edges = "1	5\n" +
-				"2	5\n" +
-				"3	5\n" +
-				"4	5\n" +
-				"6	5\n" +
-				"7	5\n" +
-				"8	5\n" +
-				"9	5\n";
-
-		String verticesPath = createTempFile(vertices);
-		String edgesPath = createTempFile(edges);
-
-		LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"});
-
-		expectedResult = "1,10\n" +
-				"2,10\n" +
-				"3,10\n" +
-				"4,10\n" +
-				"5,20\n" +
-				"6,20\n" +
-				"7,20\n" +
-				"8,20\n" +
-				"9,20\n";
-	}
-
-	// -------------------------------------------------------------------------
-	//  Util methods
-	// -------------------------------------------------------------------------
-
-	private String createTempFile(final String rows) throws Exception {
-		File tempFile = tempFolder.newFile();
-		Files.write(rows, tempFile, Charsets.UTF_8);
-		return tempFile.toURI().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
new file mode 100644
index 0000000..858d06c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationITCase.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.LabelPropagation;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+	public LabelPropagationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of label propagation example with a simple graph
+		 */
+
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	30\n" +
+				"4	40\n" +
+				"5	40\n" +
+				"6	40\n" +
+				"7	70\n";
+
+		final String edges = "1	3\n" +
+				"2	3\n" +
+				"4	7\n" +
+				"5	7\n" +
+				"6	7\n" +
+				"7	3\n";
+
+		String verticesPath = createTempFile(vertices);
+		String edgesPath = createTempFile(edges);
+
+		LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
+
+		expectedResult = "1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,40\n" +
+			"5,40\n" +
+			"6,40\n" +
+			"7,40\n";
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test the label propagation example where a tie must be broken
+		 */
+
+		final String vertices = "1	10\n" +
+				"2	10\n" +
+				"3	10\n" +
+				"4	10\n" +
+				"5	0\n" +
+				"6	20\n" +
+				"7	20\n" +
+				"8	20\n" +
+				"9	20\n";
+
+		final String edges = "1	5\n" +
+				"2	5\n" +
+				"3	5\n" +
+				"4	5\n" +
+				"6	5\n" +
+				"7	5\n" +
+				"8	5\n" +
+				"9	5\n";
+
+		String verticesPath = createTempFile(vertices);
+		String edgesPath = createTempFile(edges);
+
+		LabelPropagation.main(new String[]{verticesPath, edgesPath, resultPath, "1"});
+
+		expectedResult = "1,10\n" +
+				"2,10\n" +
+				"3,10\n" +
+				"4,10\n" +
+				"5,20\n" +
+				"6,20\n" +
+				"7,20\n" +
+				"8,20\n" +
+				"9,20\n";
+	}
+
+	// -------------------------------------------------------------------------
+	//  Util methods
+	// -------------------------------------------------------------------------
+
+	private String createTempFile(final String rows) throws Exception {
+		File tempFile = tempFolder.newFile();
+		Files.write(rows, tempFile, Charsets.UTF_8);
+		return tempFile.toURI().toString();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
deleted file mode 100644
index def5006..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.SimpleCommunityDetectionExample;
-import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class SimpleCommunityDetectionITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public SimpleCommunityDetectionITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception{
-		resultPath = tempFolder.newFile().toURI().toString();
-	}
-	@After
-	public void after() throws Exception{
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example
-		 */
-		final String edges = "1	2	1.0\n" + "1	3	2.0\n" + "1	4	3.0\n" + "1	5	4.0\n" + "2	6	5.0\n" +
-				"6	7	6.0\n" + "6	8	7.0\n" + "7	8	8.0";
-		edgesPath = createTempFile(edges);
-
-		SimpleCommunityDetectionExample.main(new String[]{edgesPath, resultPath, "1",
-				SimpleCommunityDetectionData.DELTA + ""});
-
-		expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
-		 */
-
-		final String edges = "1	2	1.0\n" + "1	3	1.0\n" + "1	4	1.0\n" + "1	5	1.0";
-		edgesPath = createTempFile(edges);
-
-		SimpleCommunityDetectionExample.main(new String[] {edgesPath, resultPath, "1",
-				SimpleCommunityDetectionData.DELTA + ""});
-
-		expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
-	}
-
-
-	// -------------------------------------------------------------------------
-	// Util methods
-	// -------------------------------------------------------------------------
-	private String createTempFile(final String rows) throws Exception {
-		File tempFile = tempFolder.newFile();
-		Files.write(rows, tempFile, Charsets.UTF_8);
-		return tempFile.toURI().toString();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8f4039dc/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
index aa2d6f0..2e68b0a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.graph.test.example;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.SingleSourceShortestPathsExample;
+import org.apache.flink.graph.example.SingleSourceShortestPaths;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
@@ -60,7 +60,7 @@ public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
 
     @Test
     public void testSSSPExample() throws Exception {
-        SingleSourceShortestPathsExample.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+        SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
                 edgesPath, resultPath, 10 + ""});
         expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
     }


Mime
View raw message