flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [05/25] flink git commit: [FLINK-3511] [gelly] Introduce flink-gelly-examples module
Date Fri, 26 Feb 2016 19:58:51 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 866f334..12047e7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -41,7 +41,7 @@ import org.apache.flink.types.NullValue;
  * 
  * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
  * 
- * @see org.apache.flink.graph.library.GSAConnectedComponents
+ * @see GSAConnectedComponents
  */
 @SuppressWarnings("serial")
 public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index a44ba14..0354da4 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -34,7 +34,7 @@ import org.apache.flink.types.NullValue;
  * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
  * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
  * 
- * @see org.apache.flink.graph.library.ConnectedComponents
+ * @see ConnectedComponents
  */
 public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 0c5080d..29183e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -116,4 +116,4 @@ public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, D
 			}
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index c6bba4c..3842e6c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -7,7 +7,7 @@
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
  *
- *	 http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -353,4 +353,4 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
 			this.setField(vertex3, V3);
 		}
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
new file mode 100644
index 0000000..2f619a6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.test.testdata.ConnectedComponentsData;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.apache.flink.types.NullValue;
+
+import java.io.BufferedReader;
+
+@SuppressWarnings("serial")
+public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
+
+	private static final long SEED = 9487520347802987L;
+
+	private static final int NUM_VERTICES = 1000;
+
+	private static final int NUM_EDGES = 10000;
+
+	private String resultPath;
+
+	@Override
+	protected void preSubmit() throws Exception {
+		resultPath = getTempFilePath("results");
+	}
+
+	@Override
+	protected void testProgram() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
+		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
+
+		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
+
+		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+		DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100));
+
+		result.writeAsCsv(resultPath, "\n", " ");
+		env.execute();
+	}
+
+	/**
+	 * A map function that takes a Long value and creates a 2-tuple out of it:
+	 * <pre>(Long value) -> (value, value)</pre>
+	 */
+	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
+		@Override
+		public Vertex<Long, Long> map(Long value) {
+			return new Vertex<>(value, value);
+		}
+	}
+
+	@Override
+	protected void postSubmit() throws Exception {
+		for (BufferedReader reader : getResultReader(resultPath)) {
+			ConnectedComponentsData.checkOddEvenResult(reader);
+		}
+	}
+
+	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
+		public Edge<Long, NullValue> map(String value) {
+			String[] nums = value.split(" ");
+			return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
+					NullValue.getInstance());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
deleted file mode 100755
index 039a05c..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class GatherSumApplyITCase extends MultipleProgramsTestBase {
-
-	public GatherSumApplyITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	private String expectedResult;
-
-	// --------------------------------------------------------------------------------------------
-	//  Connected Components Test
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testConnectedComponents() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-				ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
-				new InitMapperCC(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(
-        		new GSAConnectedComponents<Long, NullValue>(16)).collect();
-
-		expectedResult = "1,1\n" +
-				"2,1\n" +
-				"3,1\n" +
-				"4,1\n";
-
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Single Source Shortest Path Test
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testSingleSourceShortestPaths() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-				SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
-				new InitMapperSSSP(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(
-        		new GSASingleSourceShortestPaths<>(1L, 16)).collect();
-
-		expectedResult = "1,0.0\n" +
-				"2,12.0\n" +
-				"3,13.0\n" +
-				"4,47.0\n" +
-				"5,48.0\n";
-
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitMapperCC implements MapFunction<Long, Long> {
-		public Long map(Long value) {
-			return value;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitMapperSSSP implements MapFunction<Long, Double> {
-		public Double map(Long value) {
-			return 0.0;
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
deleted file mode 100644
index b0bacc4..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsITCase.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.ConnectedComponents;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public ConnectedComponentsITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testConnectedComponentsExample() throws Exception {
-		ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
-		expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
deleted file mode 100644
index 183c429..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/EuclideanGraphWeighingITCase.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.EuclideanGraphWeighing;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
-
-	private String verticesPath;
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-		File verticesFile = tempFolder.newFile();
-		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
-
-		verticesPath = verticesFile.toURI().toString();
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testGraphWeightingWeighing() throws Exception {
-		EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath});
-		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
deleted file mode 100644
index b4cdfd5..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.IncrementalSSSP;
-import org.apache.flink.graph.example.utils.IncrementalSSSPData;
-import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
-
-	private String verticesPath;
-
-	private String edgesPath;
-
-	private String edgesInSSSPPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public IncrementalSSSPITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-		File verticesFile = tempFolder.newFile();
-		Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
-
-		File edgesInSSSPFile = tempFolder.newFile();
-		Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
-
-		verticesPath = verticesFile.toURI().toString();
-		edgesPath = edgesFile.toURI().toString();
-		edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
-	}
-
-	@Test
-	public void testIncrementalSSSP() throws Exception {
-		IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
-				IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
-				IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
-		expected = IncrementalSSSPData.RESULTED_VERTICES;
-	}
-
-	@Test
-	public void testIncrementalSSSPNonSPEdge() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
-		DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
-		DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
-		// the edge to be removed is a non-SP edge
-		Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-		// Assumption: all minimum weight paths are kept
-		Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
-		// remove the edge
-		graph.removeEdge(edgeToBeRemoved);
-
-		// configure the iteration
-		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
-
-		if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
-
-			parameters.setDirection(EdgeDirection.IN);
-			parameters.setOptDegrees(true);
-
-			// run the scatter gather iteration to propagate info
-			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
-					new IncrementalSSSP.VertexDistanceUpdater(),
-					new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
-					IncrementalSSSPData.NUM_VERTICES, parameters);
-
-			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
-
-			resultedVertices.writeAsCsv(resultPath, "\n", ",");
-			env.execute();
-		} else {
-			vertices.writeAsCsv(resultPath, "\n", ",");
-			env.execute();
-		}
-
-		expected = IncrementalSSSPData.VERTICES;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
deleted file mode 100644
index 294a756..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureITCase.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-import org.apache.flink.graph.example.JaccardSimilarityMeasure;
-import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
-
-	private String edgesPath;
-
-	private String resultPath;
-
-	private String expected;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		resultPath = tempFolder.newFile().toURI().toString();
-
-		File edgesFile = tempFolder.newFile();
-		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
-
-		edgesPath = edgesFile.toURI().toString();
-	}
-
-	@Test
-	public void testJaccardSimilarityMeasureExample() throws Exception {
-		JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath});
-		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expected, resultPath);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
deleted file mode 100644
index 8152885..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/MusicProfilesITCase.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.MusicProfiles;
-import org.apache.flink.graph.example.utils.MusicProfilesData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-import java.util.ArrayList;
-import java.util.Arrays;
-
-@RunWith(Parameterized.class)
-public class MusicProfilesITCase extends MultipleProgramsTestBase {
-
-	private String tripletsPath;
-
-	private String mismatchesPath;
-
-	private String topSongsResultPath;
-
-	private String communitiesResultPath;
-
-	private String expectedTopSongs;
-
-	@Rule
-	public TemporaryFolder tempFolder = new TemporaryFolder();
-
-	public MusicProfilesITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Before
-	public void before() throws Exception {
-		topSongsResultPath = tempFolder.newFile().toURI().toString();
-		communitiesResultPath = tempFolder.newFile().toURI().toString();
-
-		File tripletsFile = tempFolder.newFile();
-		Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8);
-		tripletsPath = tripletsFile.toURI().toString();
-
-		File mismatchesFile = tempFolder.newFile();
-		Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8);
-		mismatchesPath = mismatchesFile.toURI().toString();
-	}
-
-	@Test
-	public void testMusicProfilesExample() throws Exception {
-		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
-				MusicProfilesData.MAX_ITERATIONS + ""});
-		expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
-	}
-
-	@After
-	public void after() throws Exception {
-		compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
-
-		ArrayList<String> list = new ArrayList<>();
-		readAllResultLines(list, communitiesResultPath, new String[]{}, false);
-
-		String[] result = list.toArray(new String[list.size()]);
-		Arrays.sort(result);
-
-		// check that user_1 and user_2 are in the same community
-		Assert.assertEquals("users 1 and 2 are not in the same community",
-				result[0].substring(7), result[1].substring(7));
-
-		// check that user_3, user_4 and user_5 are in the same community
-		Assert.assertEquals("users 3 and 4 are not in the same community",
-				result[2].substring(7), result[3].substring(7));
-		Assert.assertEquals("users 4 and 5 are not in the same community",
-				result[3].substring(7), result[4].substring(7));
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
deleted file mode 100644
index d8f8c8f..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SingleSourceShortestPathsITCase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.example;
-
-import com.google.common.base.Charsets;
-import com.google.common.io.Files;
-
-import org.apache.flink.graph.example.GSASingleSourceShortestPaths;
-import org.apache.flink.graph.example.SingleSourceShortestPaths;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.File;
-
-@RunWith(Parameterized.class)
-public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
-
-    private String edgesPath;
-
-    private String resultPath;
-
-    private String expected;
-
-    @Rule
-    public TemporaryFolder tempFolder = new TemporaryFolder();
-
-    public SingleSourceShortestPathsITCase(TestExecutionMode mode) {
-        super(mode);
-    }
-
-    @Before
-    public void before() throws Exception {
-        resultPath = tempFolder.newFile().toURI().toString();
-
-        File edgesFile = tempFolder.newFile();
-        Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
-        edgesPath = edgesFile.toURI().toString();
-    }
-
-    @Test
-    public void testSSSPExample() throws Exception {
-        SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
-                edgesPath, resultPath, 10 + ""});
-        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
-    }
-
-    @Test
-    public void testGSASSSPExample() throws Exception {
-        GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
-                edgesPath, resultPath, 10 + ""});
-        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
-    }
-
-    @After
-    public void after() throws Exception {
-        compareResultsByLinesInMemory(expected, resultPath);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
deleted file mode 100644
index 421eaa9..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.CommunityDetectionData;
-import org.apache.flink.graph.library.CommunityDetection;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class CommunityDetectionITCase extends MultipleProgramsTestBase {
-
-	public CommunityDetectionITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	private String expected;
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example
-		 */
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
-				CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
-        		.getVertices().collect();
-
-		expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
-		compareResultAsTuples(result, expected);
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
-		 */
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
-				CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
-
-        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
-        		.getVertices().collect();
-		expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
-		compareResultAsTuples(result, expected);
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitLabels implements MapFunction<Long, Long>{
-
-		public Long map(Long id) {
-			return id;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
deleted file mode 100644
index c8d85f0..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.library.ConnectedComponents;
-import org.apache.flink.test.testdata.ConnectedComponentsData;
-import org.apache.flink.test.util.JavaProgramTestBase;
-import org.apache.flink.types.NullValue;
-
-import java.io.BufferedReader;
-
-@SuppressWarnings("serial")
-public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTestBase {
-
-	private static final long SEED = 9487520347802987L;
-
-	private static final int NUM_VERTICES = 1000;
-
-	private static final int NUM_EDGES = 10000;
-
-	private String resultPath;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		resultPath = getTempFilePath("results");
-	}
-
-	@Override
-	protected void testProgram() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		DataSet<Long> vertexIds = env.generateSequence(1, NUM_VERTICES);
-		DataSet<String> edgeString = env.fromElements(ConnectedComponentsData.getRandomOddEvenEdges(NUM_EDGES, NUM_VERTICES, SEED).split("\n"));
-
-		DataSet<Edge<Long, NullValue>> edges = edgeString.map(new EdgeParser());
-
-		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
-
-		DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100));
-
-		result.writeAsCsv(resultPath, "\n", " ");
-		env.execute();
-	}
-
-	/**
-	 * A map function that takes a Long value and creates a 2-tuple out of it:
-	 * <pre>(Long value) -> (value, value)</pre>
-	 */
-	public static final class IdAssigner implements MapFunction<Long, Vertex<Long, Long>> {
-		@Override
-		public Vertex<Long, Long> map(Long value) {
-			return new Vertex<>(value, value);
-		}
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		for (BufferedReader reader : getResultReader(resultPath)) {
-			ConnectedComponentsData.checkOddEvenResult(reader);
-		}
-	}
-
-	public static final class EdgeParser extends RichMapFunction<String, Edge<Long, NullValue>> {
-		public Edge<Long, NullValue> map(String value) {
-			String[] nums = value.split(" ");
-			return new Edge<>(Long.parseLong(nums[0]), Long.parseLong(nums[1]),
-					NullValue.getInstance());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
deleted file mode 100644
index 520269b..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.LabelPropagationData;
-import org.apache.flink.graph.library.LabelPropagation;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class LabelPropagationITCase extends MultipleProgramsTestBase {
-
-	public LabelPropagationITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-    private String expectedResult;
-
-	@Test
-	public void testSingleIteration() throws Exception {
-		/*
-		 * Test one iteration of label propagation example with a simple graph
-		 */
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-				LabelPropagationData.getDefaultVertexSet(env),
-				LabelPropagationData.getDefaultEdgeDataSet(env), env);
-
-        List<Vertex<Long, Long>> result = inputGraph
-			.run(new LabelPropagation<Long, Long, NullValue>(1))
-			.collect();
-
-		expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
-		compareResultAsTuples(result, expectedResult);
-	}
-
-	@Test
-	public void testTieBreaker() throws Exception {
-		/*
-		 * Test the label propagation example where a tie must be broken
-		 */
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
-				LabelPropagationData.getTieVertexSet(env),
-				LabelPropagationData.getTieEdgeDataSet(env), env);
-
-        List<Vertex<Long, Long>> result = inputGraph
-			.run(new LabelPropagation<Long, Long, NullValue>(1))
-			.collect();
-
-		expectedResult = LabelPropagationData.LABELS_WITH_TIE;
-		compareResultAsTuples(result, expectedResult);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
deleted file mode 100644
index 431ab70..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.PageRankData;
-import org.apache.flink.graph.library.GSAPageRank;
-import org.apache.flink.graph.library.PageRank;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
-	public PageRankITCase(TestExecutionMode mode){
-		super(mode);
-	}
-
-	@Test
-	public void testPageRankWithThreeIterations() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
-        		.collect();
-        
-        compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testGSAPageRankWithThreeIterations() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
-        		.collect();
-        
-        compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
-        		.collect();
-        
-        compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
-        		.collect();
-        
-        compareWithDelta(result, 0.01);
-	}
-
-	private void compareWithDelta(List<Vertex<Long, Double>> result,
-																double delta) {
-
-		String resultString = "";
-        for (Vertex<Long, Double> v : result) {
-        	resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
-        }
-
-		String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
-		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
-
-		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
-
-		Arrays.sort(expected);
-        Arrays.sort(resultArray);
-
-		for (int i = 0; i < expected.length; i++) {
-			String[] expectedFields = expected[i].split(",");
-			String[] resultFields = resultArray[i].split(",");
-
-			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
-			double resultPayLoad = Double.parseDouble(resultFields[1]);
-
-			Assert.assertTrue("Values differ by more than the permissible delta",
-					Math.abs(expectedPayLoad - resultPayLoad) < delta);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitMapper implements MapFunction<Long, Double> {
-		public Double map(Long value) {
-			return 1.0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
deleted file mode 100644
index abb4511..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/SummarizationITCase.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import com.google.common.collect.Lists;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SummarizationData;
-import org.apache.flink.graph.library.Summarization;
-import org.apache.flink.graph.library.Summarization.EdgeValue;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.List;
-import java.util.regex.Pattern;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-@RunWith(Parameterized.class)
-public class SummarizationITCase extends MultipleProgramsTestBase {
-
-	private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
-
-	private static final Pattern ID_SEPARATOR = Pattern.compile(",");
-
-	public SummarizationITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testWithVertexAndEdgeValues() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, String, String> input = Graph.fromDataSet(
-				SummarizationData.getVertices(env),
-				SummarizationData.getEdges(env),
-				env
-		);
-
-		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
-		List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList();
-
-		Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output =
-				input.run(new Summarization<Long, String, String>());
-
-		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
-		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
-
-		env.execute();
-
-		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
-		validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges);
-	}
-
-	@Test
-	public void testWithVertexAndAbsentEdgeValues() throws Exception {
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		Graph<Long, String, NullValue> input = Graph.fromDataSet(
-				SummarizationData.getVertices(env),
-				SummarizationData.getEdgesWithAbsentValues(env),
-				env
-		);
-
-		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
-		List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList();
-
-		Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output =
-				input.run(new Summarization<Long, String, NullValue>());
-
-		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
-		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
-
-		env.execute();
-
-		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
-		validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges);
-	}
-
-	private void validateVertices(String[] expectedVertices,
-																List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) {
-		Arrays.sort(expectedVertices);
-		Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() {
-			@Override
-			public int compare(Vertex<Long, Summarization.VertexValue<String>> o1,
-												 Vertex<Long, Summarization.VertexValue<String>> o2) {
-				int result = o1.getId().compareTo(o2.getId());
-				if (result == 0) {
-					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-				}
-				if (result == 0) {
-					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-				}
-				if (result == 0) {
-					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-				}
-				return result;
-			}
-		});
-
-		for (int i = 0; i < expectedVertices.length; i++) {
-			validateVertex(expectedVertices[i], actualVertices.get(i));
-		}
-	}
-
-	private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges,
-														 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
-		Arrays.sort(expectedEdges);
-		Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () {
-
-			@Override
-			public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>> o2) {
-				int result = o1.getSource().compareTo(o2.getSource());
-				if (result == 0) {
-					result = o1.getTarget().compareTo(o2.getTarget());
-				}
-				if (result == 0) {
-					result = o1.getTarget().compareTo(o2.getTarget());
-				}
-				if (result == 0) {
-					result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
-				}
-				if (result == 0) {
-					result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
-				}
-				return result;
-			}
-		});
-
-		for (int i = 0; i < expectedEdges.length; i++) {
-			validateEdge(expectedEdges[i], actualEdges.get(i));
-		}
-	}
-
-	private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) {
-		String[] tokens = TOKEN_SEPARATOR.split(expected);
-		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
-		assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue());
-		assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount());
-	}
-
-	private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>> actual) {
-		String[] tokens = TOKEN_SEPARATOR.split(expected);
-		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
-		assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
-		assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString());
-		assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount());
-	}
-
-	private List<Long> getListFromIdRange(String idRange) {
-		List<Long> result = Lists.newArrayList();
-		for (String id : ID_SEPARATOR.split(idRange)) {
-			result.add(Long.parseLong(id));
-		}
-		return result;
-	}
-
-	private String getGroupValue(String token) {
-		return ID_SEPARATOR.split(token)[0];
-	}
-
-	private Long getGroupCount(String token) {
-		return Long.valueOf(ID_SEPARATOR.split(token)[1]);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
deleted file mode 100644
index 15f59fe..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.TriangleCountData;
-import org.apache.flink.graph.library.GSATriangleCount;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleCountITCase extends MultipleProgramsTestBase {
-
-	public TriangleCountITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testGSATriangleCount() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-				env).getUndirected();
-
-		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
-		String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
-
-		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
deleted file mode 100644
index d06ba30..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.TriangleCountData;
-import org.apache.flink.graph.library.TriangleEnumerator;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
-
-	public TriangleEnumeratorITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testTriangleEnumerator() throws Exception	{
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-				env);
-
-		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
-		List<Tuple3<Long,Long,Long>>  expectedResult = TriangleCountData.getListOfTriangles();
-
-		Assert.assertEquals(actualOutput.size(), expectedResult.size());
-		for(Tuple3<Long,Long,Long> resultTriangle:actualOutput)	{
-			Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/pom.xml b/flink-libraries/pom.xml
index 34153c9..64ad7e4 100644
--- a/flink-libraries/pom.xml
+++ b/flink-libraries/pom.xml
@@ -36,6 +36,7 @@ under the License.
 	<modules>
 		<module>flink-gelly</module>
 		<module>flink-gelly-scala</module>
+		<module>flink-gelly-examples</module>
 		<module>flink-python</module>
 		<module>flink-table</module>
 		<module>flink-ml</module>


Mime
View raw message