flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [07/25] flink git commit: [FLINK-3511] [gelly] Introduce flink-gelly-examples module
Date Fri, 26 Feb 2016 19:58:53 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
new file mode 100644
index 0000000..aaada8f
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.data.TriangleCountData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleCountITCase extends MultipleProgramsTestBase {
+
+	public TriangleCountITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testGSATriangleCount() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+				env).getUndirected();
+
+		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
+		String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+
+		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
new file mode 100644
index 0000000..56b3289
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleEnumeratorITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.data.TriangleCountData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
+
+	public TriangleEnumeratorITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testTriangleEnumerator() throws Exception	{
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+				env);
+
+		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+		List<Tuple3<Long,Long,Long>>  expectedResult = TriangleCountData.getListOfTriangles();
+
+		Assert.assertEquals(actualOutput.size(), expectedResult.size());
+		for(Tuple3<Long,Long,Long> resultTriangle:actualOutput)	{
+			Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..7a3d550
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.graph.library.GSASingleSourceShortestPaths;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+	public GatherSumApplyITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testConnectedComponents() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+				new InitMapperCC(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(
+        		new GSAConnectedComponents<Long, NullValue>(16)).collect();
+
+		expectedResult = "1,1\n" +
+				"2,1\n" +
+				"3,1\n" +
+				"4,1\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testSingleSourceShortestPaths() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
+				new InitMapperSSSP(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(
+        		new GSASingleSourceShortestPaths<>(1L, 16)).collect();
+
+		expectedResult = "1,0.0\n" +
+				"2,12.0\n" +
+				"3,13.0\n" +
+				"4,47.0\n" +
+				"5,48.0\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapperCC implements MapFunction<Long, Long> {
+		public Long map(Long value) {
+			return value;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapperSSSP implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
new file mode 100644
index 0000000..d0de8dc
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/ConnectedComponentsITCase.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.ConnectedComponents;
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class ConnectedComponentsITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public ConnectedComponentsITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(ConnectedComponentsDefaultData.EDGES, edgesFile, Charsets.UTF_8);
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testConnectedComponentsExample() throws Exception {
+		ConnectedComponents.main(new String[]{edgesPath, resultPath, ConnectedComponentsDefaultData.MAX_ITERATIONS + ""});
+		expected = ConnectedComponentsDefaultData.VERTICES_WITH_MIN_ID;
+	}
+
+	@After
+	public void after() throws Exception {
+		TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
new file mode 100644
index 0000000..922c4b2
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/EuclideanGraphWeighingITCase.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.EuclideanGraphWeighing;
+import org.apache.flink.graph.examples.data.EuclideanGraphData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class EuclideanGraphWeighingITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public EuclideanGraphWeighingITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(EuclideanGraphData.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testGraphWeightingWeighing() throws Exception {
+		EuclideanGraphWeighing.main(new String[]{verticesPath, edgesPath, resultPath});
+		expected = EuclideanGraphData.RESULTED_WEIGHTED_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..d27dcd8
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/IncrementalSSSPITCase.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.IncrementalSSSP;
+import org.apache.flink.graph.examples.data.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String edgesInSSSPPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public IncrementalSSSPITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
+
+		File edgesInSSSPFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+		edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+	}
+
+	@Test
+	public void testIncrementalSSSP() throws Exception {
+		IncrementalSSSP.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
+				IncrementalSSSPData.SRC_EDGE_TO_BE_REMOVED, IncrementalSSSPData.TRG_EDGE_TO_BE_REMOVED,
+				IncrementalSSSPData.VAL_EDGE_TO_BE_REMOVED,resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+		expected = IncrementalSSSPData.RESULTED_VERTICES;
+	}
+
+	@Test
+	public void testIncrementalSSSPNonSPEdge() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Vertex<Long, Double>> vertices = IncrementalSSSPData.getDefaultVertexDataSet(env);
+		DataSet<Edge<Long, Double>> edges = IncrementalSSSPData.getDefaultEdgeDataSet(env);
+		DataSet<Edge<Long, Double>> edgesInSSSP = IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+		// the edge to be removed is a non-SP edge
+		Edge<Long, Double> edgeToBeRemoved = new Edge<>(3L, 5L, 5.0);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+		// Assumption: all minimum weight paths are kept
+		Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+		// remove the edge
+		graph.removeEdge(edgeToBeRemoved);
+
+		// configure the iteration
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+
+		if(IncrementalSSSP.isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+			parameters.setDirection(EdgeDirection.IN);
+			parameters.setOptDegrees(true);
+
+			// run the scatter gather iteration to propagate info
+			Graph<Long, Double, Double> result = ssspGraph.runScatterGatherIteration(
+					new IncrementalSSSP.VertexDistanceUpdater(),
+					new IncrementalSSSP.InvalidateMessenger(edgeToBeRemoved),
+					IncrementalSSSPData.NUM_VERTICES, parameters);
+
+			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+			resultedVertices.writeAsCsv(resultPath, "\n", ",");
+			env.execute();
+		} else {
+			vertices.writeAsCsv(resultPath, "\n", ",");
+			env.execute();
+		}
+
+		expected = IncrementalSSSPData.VERTICES;
+	}
+
+	@After
+	public void after() throws Exception {
+		TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
new file mode 100644
index 0000000..92cca86
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/JaccardSimilarityMeasureITCase.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.examples.JaccardSimilarityMeasure;
+import org.apache.flink.graph.examples.data.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public JaccardSimilarityMeasureITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testJaccardSimilarityMeasureExample() throws Exception {
+		JaccardSimilarityMeasure.main(new String[]{edgesPath, resultPath});
+		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
new file mode 100644
index 0000000..d76a3ec
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/MusicProfilesITCase.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.examples.MusicProfiles;
+import org.apache.flink.graph.examples.data.MusicProfilesData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Arrays;
+
+@RunWith(Parameterized.class)
+public class MusicProfilesITCase extends MultipleProgramsTestBase {
+
+	private String tripletsPath;
+
+	private String mismatchesPath;
+
+	private String topSongsResultPath;
+
+	private String communitiesResultPath;
+
+	private String expectedTopSongs;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public MusicProfilesITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		topSongsResultPath = tempFolder.newFile().toURI().toString();
+		communitiesResultPath = tempFolder.newFile().toURI().toString();
+
+		File tripletsFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.USER_SONG_TRIPLETS, tripletsFile, Charsets.UTF_8);
+		tripletsPath = tripletsFile.toURI().toString();
+
+		File mismatchesFile = tempFolder.newFile();
+		Files.write(MusicProfilesData.MISMATCHES, mismatchesFile, Charsets.UTF_8);
+		mismatchesPath = mismatchesFile.toURI().toString();
+	}
+
+	@Test
+	public void testMusicProfilesExample() throws Exception {
+		MusicProfiles.main(new String[]{tripletsPath, mismatchesPath, topSongsResultPath, "0", communitiesResultPath,
+				MusicProfilesData.MAX_ITERATIONS + ""});
+		expectedTopSongs = MusicProfilesData.TOP_SONGS_RESULT;
+	}
+
+	@After
+	public void after() throws Exception {
+		TestBaseUtils.compareResultsByLinesInMemory(expectedTopSongs, topSongsResultPath);
+
+		ArrayList<String> list = new ArrayList<>();
+		TestBaseUtils.readAllResultLines(list, communitiesResultPath, new String[]{}, false);
+
+		String[] result = list.toArray(new String[list.size()]);
+		Arrays.sort(result);
+
+		// check that user_1 and user_2 are in the same community
+		Assert.assertEquals("users 1 and 2 are not in the same community",
+				result[0].substring(7), result[1].substring(7));
+
+		// check that user_3, user_4 and user_5 are in the same community
+		Assert.assertEquals("users 3 and 4 are not in the same community",
+				result[2].substring(7), result[3].substring(7));
+		Assert.assertEquals("users 4 and 5 are not in the same community",
+				result[3].substring(7), result[4].substring(7));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
new file mode 100644
index 0000000..faf92c0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/SingleSourceShortestPathsITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+
+import org.apache.flink.graph.examples.GSASingleSourceShortestPaths;
+import org.apache.flink.graph.examples.SingleSourceShortestPaths;
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SingleSourceShortestPathsITCase extends MultipleProgramsTestBase {
+
+    private String edgesPath;
+
+    private String resultPath;
+
+    private String expected;
+
+    @Rule
+    public TemporaryFolder tempFolder = new TemporaryFolder();
+
+    public SingleSourceShortestPathsITCase(TestExecutionMode mode) {
+        super(mode);
+    }
+
+    @Before
+    public void before() throws Exception {
+        resultPath = tempFolder.newFile().toURI().toString();
+
+        File edgesFile = tempFolder.newFile();
+        Files.write(SingleSourceShortestPathsData.EDGES, edgesFile, Charsets.UTF_8);
+        edgesPath = edgesFile.toURI().toString();
+    }
+
+    @Test
+    public void testSSSPExample() throws Exception {
+        SingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @Test
+    public void testGSASSSPExample() throws Exception {
+        GSASingleSourceShortestPaths.main(new String[]{SingleSourceShortestPathsData.SRC_VERTEX_ID + "",
+                edgesPath, resultPath, 10 + ""});
+        expected = SingleSourceShortestPathsData.RESULTED_SINGLE_SOURCE_SHORTEST_PATHS;
+    }
+
+    @After
+    public void after() throws Exception {
+        TestBaseUtils.compareResultsByLinesInMemory(expected, resultPath);
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
deleted file mode 100644
index 75b793e..0000000
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/ConnectedComponents.scala
+++ /dev/null
@@ -1,120 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.library.GSAConnectedComponents
-import java.lang.Long
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in [[org.apache.flink.graph.library]]. 
- * 
- * In particular, this example uses the
- * [[org.apache.flink.graph.library.GSAConnectedComponents]]
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 and 1-3.
- *
- * Usage {{
- *   ConnectedComponents <edge path> <result path> <number of iterations>
- *   }}
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData]]
- */
-object ConnectedComponents {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
-
-    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
-
-
-    // emit result
-    if (fileOutput) {
-      components.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Connected Components Example")
-    } else {
-      components.print()
-    }
-  }
-
-  private final class InitVertices extends MapFunction[Long, Long] {
-    override def map(id: Long) = id
-  }
-
-  // ***********************************************************************
-  // UTIL METHODS
-  // ***********************************************************************
-
-    private var fileOutput = false
-    private var edgesInputPath: String = null
-    private var outputPath: String = null
-    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
-
-    private def parseParameters(args: Array[String]): Boolean = {
-      if(args.length > 0) {
-        if(args.length != 3) {
-          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
-            "<num iterations>")
-        }
-        fileOutput = true
-        edgesInputPath = args(0)
-        outputPath = args(1)
-        maxIterations = 2
-      } else {
-        System.out.println("Executing ConnectedComponents example with default parameters" +
-          " and built-in default data.")
-        System.out.println("  Provide parameters to read input data from files.")
-        System.out.println("  See the documentation for the correct format of input files.")
-        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-          "<num iterations>")
-      }
-      true
-    }
-
-    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-      if (fileOutput) {
-        env.readCsvFile[(Long, Long)](edgesInputPath,
-          lineDelimiter = "\n",
-          fieldDelimiter = "\t")
-          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
-      } else {
-        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
-          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
-        }
-        env.fromCollection(edgeData).map(
-        edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
-      }
-    }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
deleted file mode 100644
index 68435ba..0000000
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GSASingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction}
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-
-/**
- * This example shows how to use Gelly's gather-sum-apply iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object GSASingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
-    // Execute the gather-sum-apply iteration
-    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
-      new UpdateDistance, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("GSA Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // --------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
-    override def gather(neighbor: Neighbor[Double, Double]) = {
-      neighbor.getNeighborValue + neighbor.getEdgeValue
-    }
-  }
-
-  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
-    override def sum(newValue: Double, currentValue: Double) = {
-      Math.min(newValue, currentValue)
-    }
-  }
-
-  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
-    override def apply(newDistance: Double, oldDistance: Double) = {
-      if (newDistance < oldDistance) {
-        setResult(newDistance)
-      }
-    }
-  }
-
-  // **************************************************************************
-  // UTIL METHODS
-  // **************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-          " <input edges path> <output path> <num iterations>")
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = 3
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>")
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
deleted file mode 100644
index 1c3fcdd..0000000
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/GraphMetrics.scala
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            val target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
deleted file mode 100644
index 827f1a3..0000000
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/example/SingleSourceShortestPaths.scala
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.example
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.graph.spargel.VertexUpdateFunction
-import org.apache.flink.graph.spargel.MessageIterator
-import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.spargel.MessagingFunction
-import scala.collection.JavaConversions._
-import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData
-
-/**
- * This example shows how to use Gelly's scatter-gather iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * [[org.apache.flink.graph.example.utils.SingleSourceShortestPathsData]]
- */
-object SingleSourceShortestPaths {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
-    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
-
-    // Execute the scatter-gather iteration
-    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
-      new MinDistanceMessenger, maxIterations)
-
-    // Extract the vertices as the result
-    val singleSourceShortestPaths = result.getVertices
-
-    // emit result
-    if (fileOutput) {
-      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
-      env.execute("Single Source Shortest Paths Example")
-    } else {
-      singleSourceShortestPaths.print()
-    }
-  }
-
-  // --------------------------------------------------------------------------------------------
-  //  Single Source Shortest Path UDFs
-  // --------------------------------------------------------------------------------------------
-
-  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
-
-    override def map(id: Long) = {
-      if (id.equals(srcId)) {
-        0.0
-      } else {
-        Double.PositiveInfinity
-      }
-    }
-  }
-
-  /**
-   * Function that updates the value of a vertex by picking the minimum
-   * distance from all incoming messages.
-   */
-  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
-
-    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
-      var minDistance = Double.MaxValue
-      while (inMessages.hasNext) {
-        val msg = inMessages.next
-        if (msg < minDistance) {
-          minDistance = msg
-        }
-      }
-      if (vertex.getValue > minDistance) {
-        setNewVertexValue(minDistance)
-      }
-    }
-  }
-
-  /**
-   * Distributes the minimum distance associated with a given vertex among all
-   * the target vertices summed up with the edge's value.
-   */
-  private final class MinDistanceMessenger extends
-    MessagingFunction[Long, Double, Double, Double] {
-
-    override def sendMessages(vertex: Vertex[Long, Double]) {
-      if (vertex.getValue < Double.PositiveInfinity) {
-        for (edge: Edge[Long, Double] <- getEdges) {
-          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
-        }
-      }
-    }
-  }
-
-  // ****************************************************************************
-  // UTIL METHODS
-  // ****************************************************************************
-
-  private var fileOutput = false
-  private var srcVertexId = 1L
-  private var edgesInputPath: String = null
-  private var outputPath: String = null
-  private var maxIterations = 5
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if(args.length > 0) {
-      if(args.length != 4) {
-        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-          " <input edges path> <output path> <num iterations>")
-      }
-      fileOutput = true
-      srcVertexId = args(0).toLong
-      edgesInputPath = args(1)
-      outputPath = args(2)
-      maxIterations = 3
-    } else {
-      System.out.println("Executing Single Source Shortest Paths example "
-        + "with default parameters and built-in default data.")
-      System.out.println("  Provide parameters to read input data from files.")
-      System.out.println("  See the documentation for the correct format of input files.")
-      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
-        " <input edges path> <output path> <num iterations>")
-    }
-    true
-  }
-
-  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
-        lineDelimiter = "\n",
-        fieldDelimiter = "\t")
-        .map(new Tuple3ToEdgeMap[Long, Double]())
-    } else {
-      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
-        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
-          z.asInstanceOf[Double])
-      }
-      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
deleted file mode 100644
index cd52e04..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
-import org.apache.flink.graph.library.GSAConnectedComponents;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example shows how to use Gelly's library methods.
- * You can find all available library methods in {@link org.apache.flink.graph.library}. 
- * 
- * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents}
- * library method to compute the connected components of the input graph.
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\n1\t3\n</code> defines two edges,
- * 1-2 with and 1-3.
- *
- * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; </code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
- */
-public class ConnectedComponents implements ProgramDescription {
-
-	@SuppressWarnings("serial")
-	public static void main(String [] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		}, env);
-
-		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-				.run(new GSAConnectedComponents<Long, NullValue>(maxIterations));
-
-		// emit result
-		if (fileOutput) {
-			verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("Connected Components Example");
-		} else {
-			verticesWithMinIds.print();
-		}
-	}
-
-	@Override
-	public String getDescription() {
-		return "Connected Components Example";
-	}
-
-	// *************************************************************************
-	// UTIL METHODS
-	// *************************************************************************
-
-	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-	private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
-
-	private static boolean parseParameters(String [] args) {
-		if(args.length > 0) {
-			if(args.length != 3) {
-				System.err.println("Usage ConnectedComponents <edge path> <output path> " +
-						"<num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			maxIterations = Integer.parseInt(args[2]);
-
-		} else {
-			System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
-			System.out.println("Provide parameters to read input data from files.");
-			System.out.println("Usage ConnectedComponents <edge path> <output path> " +
-					"<num iterations>");
-		}
-
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
-						}
-					});
-		} else {
-			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
deleted file mode 100644
index 712be3e..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
+++ /dev/null
@@ -1,214 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeJoinFunction;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.EuclideanGraphData;
-
-import java.io.Serializable;
-
-/**
- * This example shows how to use Gelly's {@link Graph#getTriplets()} and
- * {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)} methods.
- * 
- * Given a directed, unweighted graph, with vertex values representing points in a plan,
- * return a weighted graph where the edge weights are equal to the Euclidean distance between the
- * src and the trg vertex values.
- *
- * <p>
- * Input files are plain text files and must be formatted as follows:
- * <ul>
- * 	<li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
- * 	the value being formed of two doubles separated by a comma.
- * 	For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
- * 	<li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
- * 	Edges themselves are separated by newlines.
- * 	For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
- * </ul>
- *
- * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
- */
-@SuppressWarnings("serial")
-public class EuclideanGraphWeighing implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
-
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// the edge value will be the Euclidean distance between its src and trg vertex
-		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
-				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
-
-					@Override
-					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
-							throws Exception {
-
-						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
-						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
-
-						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
-								srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
-					}
-				});
-
-		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
-				new EdgeJoinFunction<Double, Double>() {
-
-					public Double edgeJoin(Double edgeValue, Double inputValue) {
-						return inputValue;
-					}
-				});
-
-		// retrieve the edges from the final result
-		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
-
-		// emit result
-		if (fileOutput) {
-			result.writeAsCsv(outputPath, "\n", ",");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("Euclidean Graph Weighing Example");
-		} else {
-			result.print();
-		}
-
-	}
-
-	@Override
-	public String getDescription() {
-		return "Weighing a graph by computing the Euclidean distance " +
-				"between its vertices";
-	}
-
-	// *************************************************************************
-	//     DATA TYPES
-	// *************************************************************************
-
-	/**
-	 * A simple two-dimensional point.
-	 */
-	public static class Point implements Serializable {
-
-		public double x, y;
-
-		public Point() {}
-
-		public Point(double x, double y) {
-			this.x = x;
-			this.y = y;
-		}
-
-		public double euclideanDistance(Point other) {
-			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
-		}
-
-		@Override
-		public String toString() {
-			return x + " " + y;
-		}
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static String verticesInputPath = null;
-
-	private static String edgesInputPath = null;
-
-	private static String outputPath = null;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if (args.length == 3) {
-				fileOutput = true;
-				verticesInputPath = args[0];
-				edgesInputPath = args[1];
-				outputPath = args[2];
-			} else {
-				System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
-				System.out.println("Provide parameters to read input data from files.");
-				System.out.println("See the documentation for the correct format of input files.");
-				System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
-						" <output path>");
-				return false;
-			}
-		}
-		return true;
-	}
-
-	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(verticesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
-
-						@Override
-						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
-							return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
-						}
-					});
-		} else {
-			return EuclideanGraphData.getDefaultVertexDataSet(env);
-		}
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
-
-						@Override
-						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
-							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
-						}
-					});
-		} else {
-			return EuclideanGraphData.getDefaultEdgeDataSet(env);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
deleted file mode 100755
index 635a099..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ /dev/null
@@ -1,191 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-
-/**
- * This example shows how to use Gelly's Gather-Sum-Apply iterations.
- * 
- * It is an implementation of the Single-Source-Shortest-Paths algorithm.
- * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. 
- *
- * The input file is a plain text file and must be formatted as follows:
- * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
- * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
- *
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
- */
-public class GSASingleSourceShortestPaths implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if(!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
-
-		// Execute the GSA iteration
-		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
-				new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
-
-		// emit result
-		if(fileOutput) {
-			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
-
-			// since file sinks are lazy, we trigger the execution explicitly
-			env.execute("GSA Single Source Shortest Paths");
-		} else {
-			singleSourceShortestPaths.print();
-		}
-
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Single Source Shortest Path UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class InitVertices implements MapFunction<Long, Double>{
-
-		private long srcId;
-
-		public InitVertices(long srcId) {
-			this.srcId = srcId;
-		}
-
-		public Double map(Long id) {
-			if (id.equals(srcId)) {
-				return 0.0;
-			}
-			else {
-				return Double.POSITIVE_INFINITY;
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
-
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
-
-		public Double sum(Double newValue, Double currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
-
-		public void apply(Double newDistance, Double oldDistance) {
-			if (newDistance < oldDistance) {
-				setResult(newDistance);
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-
-	private static Long srcVertexId = 1l;
-
-	private static String edgesInputPath = null;
-
-	private static String outputPath = null;
-
-	private static int maxIterations = 5;
-
-	private static boolean parseParameters(String[] args) {
-
-		if (args.length > 0) {
-			if(args.length != 4) {
-				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-				return false;
-			}
-
-			fileOutput = true;
-			srcVertexId = Long.parseLong(args[0]);
-			edgesInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-				System.out.println("Executing GSASingle Source Shortest Paths example "
-						+ "with default parameters and built-in default data.");
-				System.out.println("  Provide parameters to read input data from files.");
-				System.out.println("  See the documentation for the correct format of input files.");
-				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
-						" <input edges path> <output path> <num iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
-		}
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Single Source Shortest Paths";
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
deleted file mode 100644
index 117f7d1..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.example.utils.ExampleUtils;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-public class GraphMetrics implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		/** create the graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
-		
-		/** get the number of vertices **/
-		long numVertices = graph.numberOfVertices();
-		
-		/** get the number of edges **/
-		long numEdges = graph.numberOfEdges();
-		
-		/** compute the average node degree **/
-		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
-
-		DataSet<Double> avgNodeDegree = verticesWithDegrees
-				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
-		
-		/** find the vertex with the maximum in-degree **/
-		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum in-degree **/
-		DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the maximum out-degree **/
-		DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum out-degree **/
-		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
-		
-		/** print the results **/
-		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
-		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
-		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
-		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
-		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
-		ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree");
-		ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree");
-
-		env.execute();
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
-
-		private long numberOfVertices;
-
-		public AvgNodeDegreeMapper(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		public Double map(Tuple2<Long, Long> sumTuple) {
-			return (double) (sumTuple.f1 / numberOfVertices) ;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
-		public Long map(Tuple2<Long, Long> value) { return value.f0; }
-	}
-
-	@Override
-	public String getDescription() {
-		return "Graph Metrics Example";
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static String edgesInputPath = null;
-
-	static final int NUM_VERTICES = 100;
-
-	static final long SEED = 9876;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 1) {
-				System.err.println("Usage: GraphMetrics <input edges>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgesInputPath = args[0];
-		} else {
-			System.out.println("Executing Graph Metrics example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("Usage: GraphMetrics <input edges>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n").fieldDelimiter("\t")
-					.types(Long.class, Long.class).map(
-							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-									return new Edge<Long, NullValue>(value.f0, value.f1, 
-											NullValue.getInstance());
-								}
-					});
-		} else {
-			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
-		}
-	}
-}
\ No newline at end of file


Mime
View raw message