flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [08/25] flink git commit: [FLINK-3511] [gelly] Introduce flink-gelly-examples module
Date Fri, 26 Feb 2016 19:58:54 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
new file mode 100644
index 0000000..2b4277d
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/EuclideanGraphData.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.EuclideanGraphWeighing;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Euclidean Graph example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class EuclideanGraphData {
+
+	public static final int NUM_VERTICES = 9;
+
+	public static final String VERTICES = "1,1.0,1.0\n" + "2,2.0,2.0\n" + "3,3.0,3.0\n" + "4,4.0,4.0\n" + "5,5.0,5.0\n" +
+			"6,6.0,6.0\n" + "7,7.0,7.0\n" + "8,8.0,8.0\n" + "9,9.0,9.0";
+
+	public static DataSet<Vertex<Long, EuclideanGraphWeighing.Point>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, EuclideanGraphWeighing.Point>> vertices = new ArrayList<Vertex<Long, EuclideanGraphWeighing.Point>>();
+		for(int i=1; i<=NUM_VERTICES; i++) {
+			vertices.add(new Vertex<Long, EuclideanGraphWeighing.Point>(new Long(i),
+					new EuclideanGraphWeighing.Point(new Double(i), new Double(i))));
+		}
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,2\n" + "1,4\n" + "2,3\n" + "2,4\n" + "2,5\n" +
+			"3,5\n" + "4,5\n" + "4,6\n" + "5,7\n" + "5,9\n" + "6,7\n" + "6,8\n" +
+			"7,8\n" + "7,9\n" +  "8,9";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 0.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 0.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 0.0));
+		edges.add(new Edge<Long, Double>(4L, 6L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(5L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(6L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 8L, 0.0));
+		edges.add(new Edge<Long, Double>(7L, 9L, 0.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 0.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_WEIGHTED_EDGES = "1,2,1.4142135623730951\n" + "1,4,4.242640687119285\n" +
+			"2,3,1.4142135623730951\n" + "2,4,2.8284271247461903\n" + "2,5,4.242640687119285\n" + "3,5,2.8284271247461903\n" +
+			"4,5,1.4142135623730951\n" + "4,6,2.8284271247461903\n" + "5,7,2.8284271247461903\n" + "5,9,5.656854249492381\n" +
+			"6,7,1.4142135623730951\n" + "6,8,2.8284271247461903\n" + "7,8,1.4142135623730951\n" + "7,9,2.8284271247461903\n" +
+			"8,9,1.4142135623730951";
+
+	private EuclideanGraphData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
new file mode 100644
index 0000000..99e363a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/IncrementalSSSPData.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the IncrementalSSSP example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class IncrementalSSSPData {
+
+	public static final int NUM_VERTICES = 5;
+
+	public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5,0.0";
+
+	public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 6.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 1.0));
+		vertices.add(new Vertex<Long, Double>(5L, 0.0));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
+			"4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String SRC_EDGE_TO_BE_REMOVED = "2";
+
+	public static final String TRG_EDGE_TO_BE_REMOVED = "5";
+
+	public static final String VAL_EDGE_TO_BE_REMOVED = "2.0";
+
+	public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
+
+		return new Edge<Long, Double>(2L, 5L, 2.0);
+	}
+
+	public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n"
+			+ "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
+
+	private IncrementalSSSPData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..054f041
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+	public static final String EDGES = "1	2\n" + "1	3\n" + "1	4\n" + "1	5\n" + "2	3\n" + "2	4\n" +
+			"2	5\n" + "3	4\n" + "3	5\n" + "4	5";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" +
+			"2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6";
+
+	private JaccardSimilarityMeasureData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
new file mode 100644
index 0000000..8decb24
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/LabelPropagationData.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+/**
+ * Provides the default data set used for the Label Propagation test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class LabelPropagationData {
+	
+	public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,40\n" +
+			"5,40\n" +
+			"6,40\n" +
+			"7,40\n";
+
+	public static final String LABELS_WITH_TIE ="1,10\n" +
+			"2,10\n" +
+			"3,10\n" +
+			"4,10\n" +
+			"5,20\n" +
+			"6,20\n" +
+			"7,20\n" +
+			"8,20\n" +
+			"9,20\n";
+
+	private LabelPropagationData() {}
+
+	public static final DataSet<Vertex<Long, Long>> getDefaultVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 30l));
+		vertices.add(new Vertex<Long, Long>(4l, 40l));
+		vertices.add(new Vertex<Long, Long>(5l, 40l));
+		vertices.add(new Vertex<Long, Long>(6l, 40l));
+		vertices.add(new Vertex<Long, Long>(7l, 40l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 3L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final DataSet<Vertex<Long, Long>> getTieVertexSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Long>> vertices = new ArrayList<Vertex<Long, Long>>();
+		vertices.add(new Vertex<Long, Long>(1l, 10l));
+		vertices.add(new Vertex<Long, Long>(2l, 10l));
+		vertices.add(new Vertex<Long, Long>(3l, 10l));
+		vertices.add(new Vertex<Long, Long>(4l, 10l));
+		vertices.add(new Vertex<Long, Long>(5l, 0l));
+		vertices.add(new Vertex<Long, Long>(6l, 20l));
+		vertices.add(new Vertex<Long, Long>(7l, 20l));
+		vertices.add(new Vertex<Long, Long>(8l, 20l));
+		vertices.add(new Vertex<Long, Long>(9l, 20l));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final DataSet<Edge<Long, NullValue>> getTieEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(5L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(7L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(8L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(9L, 5L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
new file mode 100644
index 0000000..e4c98fe
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/MusicProfilesData.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+/**
+ * Provides the default data sets used for the Music Profiles example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class MusicProfilesData {
+
+	public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) {
+		List<Tuple3<String, String, Integer>> triplets = new ArrayList<Tuple3<String, String, Integer>>();
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_2", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_3", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_4", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_1", "song_5", 1));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_6", 40));
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_7", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_2", "song_8", 3));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_2", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_3", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_8", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_9", 1));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_10", 8));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_11", 90));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_12", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_13", 34));
+		triplets.add(new Tuple3<String, String, Integer>("user_3", "song_14", 17));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_1", 100));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_6", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_8", 20));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_12", 30));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_13", 1));
+		triplets.add(new Tuple3<String, String, Integer>("user_4", "song_15", 1));
+		
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_3", 300));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_4", 4));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_5", 5));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_8", 8));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_9", 9));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_10", 10));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_12", 12));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_13", 13));
+		triplets.add(new Tuple3<String, String, Integer>("user_5", "song_15", 15));
+
+		triplets.add(new Tuple3<String, String, Integer>("user_6", "song_6", 30));
+
+		return env.fromCollection(triplets);
+	}
+	
+	public static DataSet<String> getMismatches(ExecutionEnvironment env) {
+		List<String> errors = new ArrayList<String>();
+		errors.add("ERROR: <song_8 track_8> Sever");
+		errors.add("ERROR: <song_15 track_15> Black Trees");
+		return env.fromCollection(errors);
+	}
+
+	public static final String USER_SONG_TRIPLETS = "user_1	song_1	100\n" + "user_1	song_5	200\n"
+			+ "user_2	song_1	10\n" + "user_2	song_4	20\n"
+			+ "user_3	song_2	3\n"
+			+ "user_4	song_2	1\n" + "user_4	song_3	2\n"
+			+ "user_5	song_3	30";
+
+	public static final String MISMATCHES = "ERROR: <song_5 track_8> Angie";
+
+	public static final String MAX_ITERATIONS = "2";
+
+	public static final String TOP_SONGS_RESULT = "user_1	song_1\n" +
+								"user_2	song_4\n" +
+								"user_3	song_2\n" +
+								"user_4	song_3\n" +
+								"user_5	song_3";
+
+	public static final String COMMUNITIES_RESULT = "user_1	1\n" +
+								"user_2	1\n" +
+								"user_3	3\n" +
+								"user_4	3\n" +
+								"user_5	4";
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
new file mode 100644
index 0000000..a45de88
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/PageRankData.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the PageRank test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class PageRankData {
+	
+	public static final String EDGES = "2	1\n" +
+										"5	2\n" + 
+										"5	4\n" +
+										"4	3\n" +
+										"4	2\n" +
+										"1	4\n" +
+										"1	2\n" +
+										"1	3\n" +
+										"3	5\n";
+
+	
+	public static final String RANKS_AFTER_3_ITERATIONS = "1,0.237\n" +
+														"2,0.248\n" + 
+														"3,0.173\n" +
+														"4,0.175\n" +
+														"5,0.165\n";
+
+	private PageRankData() {}
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(2L, 1L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(5L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+}
+

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
new file mode 100644
index 0000000..75b4484
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SingleSourceShortestPathsData.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+/**
+ * Provides the default data set used for the Single Source Shortest Paths example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class SingleSourceShortestPathsData {
+
+	public static final Long SRC_VERTEX_ID = 1L;
+
+	public static final String EDGES = "1\t2\t12.0\n" + "1\t3\t13.0\n" + "2\t3\t23.0\n" + "3\t4\t34.0\n" + "3\t5\t35.0\n" +
+					"4\t5\t45.0\n" + "5\t1\t51.0";
+
+	public static final Object[][] DEFAULT_EDGES = new Object[][] {
+		new Object[]{1L, 2L, 12.0},
+		new Object[]{1L, 3L, 13.0},
+		new Object[]{2L, 3L, 23.0},
+		new Object[]{3L, 4L, 34.0},
+		new Object[]{3L, 5L, 35.0},
+		new Object[]{4L, 5L, 45.0},
+		new Object[]{5L, 1L, 51.0}
+	};
+
+	public static final String RESULTED_SINGLE_SOURCE_SHORTEST_PATHS =  "1,0.0\n" + "2,12.0\n" + "3,13.0\n" + 
+								"4,47.0\n" + "5,48.0";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+		
+		List<Edge<Long, Double>> edgeList = new LinkedList<Edge<Long, Double>>();
+		for (Object[] edge : DEFAULT_EDGES) {
+			edgeList.add(new Edge<Long, Double>((Long) edge[0], (Long) edge[1], (Double) edge[2]));
+		}
+		return env.fromCollection(edgeList);
+	}
+
+	private SingleSourceShortestPathsData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
new file mode 100644
index 0000000..c14d5de
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import com.google.common.collect.Lists;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+
+import java.util.List;
+
+/**
+ * Provides the default data set used for Summarization tests.
+ */
+public class SummarizationData {
+
+	private SummarizationData() {}
+
+	/**
+	 * The resulting vertex id can be any id of the vertices summarized by the single vertex.
+	 *
+	 * Format:
+	 *
+	 * "possible-id[,possible-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_VERTICES = new String[] {
+			"0,1;A,2",
+			"2,3,4;B,3",
+			"5;C,1"
+	};
+
+	/**
+	 * Format:
+	 *
+	 * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
+			"0,1;0,1;A,2",
+			"0,1;2,3,4;A,1",
+			"2,3,4;0,1;A,1",
+			"2,3,4;0,1;C,2",
+			"2,3,4;2,3,4;B,2",
+			"5;2,3,4;D,2"
+	};
+
+	/**
+	 * Format:
+	 *
+	 * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
+	 */
+	public static final String[] EXPECTED_EDGES_ABSENT_VALUES = new String[] {
+			"0,1;0,1;(null),2",
+			"0,1;2,3,4;(null),1",
+			"2,3,4;0,1;(null),3",
+			"2,3,4;2,3,4;(null),2",
+			"5;2,3,4;(null),2"
+	};
+
+	/**
+	 * Creates a set of vertices with attached {@link String} values.
+	 *
+	 * @param env execution environment
+	 * @return vertex data set with string values
+	 */
+	public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) {
+		List<Vertex<Long, String>> vertices = Lists.newArrayListWithExpectedSize(6);
+		vertices.add(new Vertex<>(0L, "A"));
+		vertices.add(new Vertex<>(1L, "A"));
+		vertices.add(new Vertex<>(2L, "B"));
+		vertices.add(new Vertex<>(3L, "B"));
+		vertices.add(new Vertex<>(4L, "B"));
+		vertices.add(new Vertex<>(5L, "C"));
+
+		return env.fromCollection(vertices);
+	}
+
+	/**
+	 * Creates a set of edges with attached {@link String} values.
+	 *
+	 * @param env execution environment
+	 * @return edge data set with string values
+	 */
+	public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) {
+		List<Edge<Long, String>> edges = Lists.newArrayListWithExpectedSize(10);
+		edges.add(new Edge<>(0L, 1L, "A"));
+		edges.add(new Edge<>(1L, 0L, "A"));
+		edges.add(new Edge<>(1L, 2L, "A"));
+		edges.add(new Edge<>(2L, 1L, "A"));
+		edges.add(new Edge<>(2L, 3L, "B"));
+		edges.add(new Edge<>(3L, 2L, "B"));
+		edges.add(new Edge<>(4L, 0L, "C"));
+		edges.add(new Edge<>(4L, 1L, "C"));
+		edges.add(new Edge<>(5L, 2L, "D"));
+		edges.add(new Edge<>(5L, 3L, "D"));
+
+		return env.fromCollection(edges);
+	}
+
+	/**
+	 * Creates a set of edges with {@link NullValue} as edge value.
+	 *
+	 * @param env execution environment
+	 * @return edge data set with null values
+	 */
+	@SuppressWarnings("serial")
+	public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) {
+		return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() {
+			@Override
+			public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception {
+				return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance());
+			}
+		});
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
new file mode 100644
index 0000000..71b874c
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/TriangleCountData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.data;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Triangle Count test program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class TriangleCountData {
+
+	public static final String EDGES = "1	2\n"+"1	3\n"+"2	3\n"+"2	6\n"+"3	4\n"+"3	5\n"+"3	6\n"+"4	5\n"+"6	7\n";
+
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+
+	public static List<Tuple3<Long,Long,Long>> getListOfTriangles()	{
+		ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
+		ret.add(new Tuple3<>(1L,2L,3L));
+		ret.add(new Tuple3<>(2L,3L,6L));
+		ret.add(new Tuple3<>(3L,4L,5L));
+		return ret;
+	}
+
+	private TriangleCountData () {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
new file mode 100644
index 0000000..b1bc831
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples.utils;
+
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+public class ExampleUtils {
+
+	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
+	public static void printResult(DataSet set, String msg) {
+		set.output(new PrintingOutputFormatWithMessage(msg) {
+		});
+	}
+
+	public static class PrintingOutputFormatWithMessage<T> implements
+			OutputFormat<T> {
+
+		private static final long serialVersionUID = 1L;
+
+		private transient PrintStream stream;
+
+		private transient String prefix;
+
+		private String message;
+
+		// --------------------------------------------------------------------------------------------
+
+		/**
+		 * Instantiates a printing output format that prints to standard out.
+		 */
+		public PrintingOutputFormatWithMessage() {
+		}
+
+		public PrintingOutputFormatWithMessage(String msg) {
+			this.message = msg;
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) {
+			// get the target stream
+			this.stream = System.out;
+
+			// set the prefix to message
+			this.prefix = message + ": ";
+		}
+
+		@Override
+		public void writeRecord(T record) {
+			if (this.prefix != null) {
+				this.stream.println(this.prefix + record.toString());
+			} else {
+				this.stream.println(record.toString());
+			}
+		}
+
+		@Override
+		public void close() {
+			this.stream = null;
+			this.prefix = null;
+		}
+
+		@Override
+		public String toString() {
+			return "Print to System.out";
+		}
+
+		@Override
+		public void configure(Configuration parameters) {
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static DataSet<Vertex<Long, NullValue>> getVertexIds(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).map(
+				new MapFunction<Long, Vertex<Long, NullValue>>() {
+					public Vertex<Long, NullValue> map(Long l) {
+						return new Vertex<Long, NullValue>(l, NullValue
+								.getInstance());
+					}
+				});
+	}
+
+	@SuppressWarnings("serial")
+	public static DataSet<Edge<Long, NullValue>> getRandomEdges(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).flatMap(
+				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+					@Override
+					public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception {
+						int numOutEdges = (int) (Math.random() * (numVertices / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numVertices) + 1;
+							out.collect(new Edge<Long, NullValue>(key, target,
+									NullValue.getInstance()));
+						}
+					}
+				});
+	}
+
+	public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
+			ExecutionEnvironment env) {
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 1.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 4.0));
+		vertices.add(new Vertex<Long, Double>(5L, 5.0));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
+			ExecutionEnvironment env) {
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
+		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
+		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
+
+		return env.fromCollection(edges);
+	}
+
+	/**
+	 * Private constructor to prevent instantiation.
+	 */
+	private ExampleUtils() {
+		throw new RuntimeException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
new file mode 100644
index 0000000..704d476
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/ConnectedComponents.scala
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.library.GSAConnectedComponents
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.examples.data.ConnectedComponentsDefaultData
+import org.apache.flink.types.NullValue
+import org.apache.flink.api.common.functions.MapFunction
+import java.lang.Long
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in [[org.apache.flink.graph.library]]. 
+ * 
+ * In particular, this example uses the
+ * [[GSAConnectedComponents]]
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 and 1-3.
+ *
+ * Usage {{
+ *   ConnectedComponents <edge path> <result path> <number of iterations>
+ *   }}
+ * If no parameters are provided, the program is run with default data from
+ * [[ConnectedComponentsDefaultData]]
+ */
+object ConnectedComponents {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Long, NullValue](edges, new InitVertices, env)
+
+    val components = graph.run(new GSAConnectedComponents[Long, NullValue](maxIterations))
+
+
+    // emit result
+    if (fileOutput) {
+      components.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Connected Components Example")
+    } else {
+      components.print()
+    }
+  }
+
+  private final class InitVertices extends MapFunction[Long, Long] {
+    override def map(id: Long) = id
+  }
+
+  // ***********************************************************************
+  // UTIL METHODS
+  // ***********************************************************************
+
+    private var fileOutput = false
+    private var edgesInputPath: String = null
+    private var outputPath: String = null
+    private var maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS
+
+    private def parseParameters(args: Array[String]): Boolean = {
+      if(args.length > 0) {
+        if(args.length != 3) {
+          System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+            "<num iterations>")
+        }
+        fileOutput = true
+        edgesInputPath = args(0)
+        outputPath = args(1)
+        maxIterations = 2
+      } else {
+        System.out.println("Executing ConnectedComponents example with default parameters" +
+          " and built-in default data.")
+        System.out.println("  Provide parameters to read input data from files.")
+        System.out.println("  See the documentation for the correct format of input files.")
+        System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+          "<num iterations>")
+      }
+      true
+    }
+
+    private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+      if (fileOutput) {
+        env.readCsvFile[(Long, Long)](edgesInputPath,
+          lineDelimiter = "\n",
+          fieldDelimiter = "\t")
+          .map(edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      } else {
+        val edgeData = ConnectedComponentsDefaultData.DEFAULT_EDGES map {
+          case Array(x, y) => (x.asInstanceOf[Long], y.asInstanceOf[Long])
+        }
+        env.fromCollection(edgeData).map(
+        edge => new Edge[Long, NullValue](edge._1, edge._2, NullValue.getInstance))
+      }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
new file mode 100644
index 0000000..0a10ad7
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GSASingleSourceShortestPaths.scala
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.gsa.{ApplyFunction, GatherFunction, Neighbor, SumFunction}
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+
+/**
+ * This example shows how to use Gelly's gather-sum-apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[SingleSourceShortestPathsData]]
+ */
+object GSASingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the gather-sum-apply iteration
+    val result = graph.runGatherSumApplyIteration(new CalculateDistances, new ChooseMinDistance,
+      new UpdateDistance, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("GSA Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  private final class CalculateDistances extends GatherFunction[Double, Double, Double] {
+    override def gather(neighbor: Neighbor[Double, Double]) = {
+      neighbor.getNeighborValue + neighbor.getEdgeValue
+    }
+  }
+
+  private final class ChooseMinDistance extends SumFunction[Double, Double, Double] {
+    override def sum(newValue: Double, currentValue: Double) = {
+      Math.min(newValue, currentValue)
+    }
+  }
+
+  private final class UpdateDistance extends ApplyFunction[Long, Double, Double] {
+    override def apply(newDistance: Double, oldDistance: Double) = {
+      if (newDistance < oldDistance) {
+        setResult(newDistance)
+      }
+    }
+  }
+
+  // **************************************************************************
+  // UTIL METHODS
+  // **************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = 3
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>")
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
new file mode 100644
index 0000000..f9fa82d
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.types.NullValue
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * {{{
+ *   <sourceVertexID>\t<targetVertexID>
+ * }}}
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
+ */
+object GraphMetrics {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    /** create the graph **/
+    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
+
+    /** get the number of vertices **/
+    val numVertices = graph.numberOfVertices
+
+    /** get the number of edges **/
+    val numEdges = graph.numberOfEdges
+
+    /** compute the average node degree **/
+    val verticesWithDegrees = graph.getDegrees
+    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+
+    /** find the vertex with the maximum in-degree **/
+    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum in-degree **/
+    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
+
+    /** find the vertex with the maximum out-degree **/
+    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
+
+    /** find the vertex with the minimum out-degree **/
+    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
+
+    /** print the results **/
+    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
+    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
+    avgDegree.printOnTaskManager("Average node degree")
+    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
+    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
+
+  }
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if (args.length > 0) {
+      fileOutput = true
+      if (args.length == 1) {
+        edgesPath = args(0)
+        true
+      } else {
+        System.err.println("Usage: GraphMetrics <edges path>")
+        false
+      }
+    } else {
+      System.out.println("Executing GraphMetrics example with built-in default data.")
+      System.out.println("  Provide parameters to read input data from a file.")
+      System.out.println("  Usage: GraphMetrics <edges path>")
+      true
+    }
+  }
+
+  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long)](
+        edgesPath,
+        fieldDelimiter = "\t").map(
+        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
+    } else {
+      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
+        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
+          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
+          for ( i <- 0 to numOutEdges ) {
+            val target: Long = ((Math.random() * numVertices) + 1).toLong
+            new Edge[Long, NullValue](key, target, NullValue.getInstance())
+          }
+      })
+    }
+  }
+
+  private var fileOutput: Boolean = false
+  private var edgesPath: String = null
+  private var outputPath: String = null
+  private val numVertices = 100
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
new file mode 100644
index 0000000..4f84bb0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/SingleSourceShortestPaths.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.examples
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.spargel.VertexUpdateFunction
+import org.apache.flink.graph.spargel.MessageIterator
+import org.apache.flink.graph.Vertex
+import org.apache.flink.graph.spargel.MessagingFunction
+import org.apache.flink.graph.examples.data.SingleSourceShortestPathsData
+import scala.collection.JavaConversions._
+import org.apache.flink.graph.scala.utils.Tuple3ToEdgeMap
+
+/**
+ * This example shows how to use Gelly's scatter-gather iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * [[SingleSourceShortestPathsData]]
+ */
+object SingleSourceShortestPaths {
+  def main(args: Array[String]) {
+    if (!parseParameters(args)) {
+      return
+    }
+
+    val env = ExecutionEnvironment.getExecutionEnvironment
+    val edges: DataSet[Edge[Long, Double]] = getEdgesDataSet(env)
+    val graph = Graph.fromDataSet[Long, Double, Double](edges, new InitVertices(srcVertexId), env)
+
+    // Execute the scatter-gather iteration
+    val result = graph.runScatterGatherIteration(new VertexDistanceUpdater,
+      new MinDistanceMessenger, maxIterations)
+
+    // Extract the vertices as the result
+    val singleSourceShortestPaths = result.getVertices
+
+    // emit result
+    if (fileOutput) {
+      singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",")
+      env.execute("Single Source Shortest Paths Example")
+    } else {
+      singleSourceShortestPaths.print()
+    }
+  }
+
+  // --------------------------------------------------------------------------------------------
+  //  Single Source Shortest Path UDFs
+  // --------------------------------------------------------------------------------------------
+
+  private final class InitVertices(srcId: Long) extends MapFunction[Long, Double] {
+
+    override def map(id: Long) = {
+      if (id.equals(srcId)) {
+        0.0
+      } else {
+        Double.PositiveInfinity
+      }
+    }
+  }
+
+  /**
+   * Function that updates the value of a vertex by picking the minimum
+   * distance from all incoming messages.
+   */
+  private final class VertexDistanceUpdater extends VertexUpdateFunction[Long, Double, Double] {
+
+    override def updateVertex(vertex: Vertex[Long, Double], inMessages: MessageIterator[Double]) {
+      var minDistance = Double.MaxValue
+      while (inMessages.hasNext) {
+        val msg = inMessages.next
+        if (msg < minDistance) {
+          minDistance = msg
+        }
+      }
+      if (vertex.getValue > minDistance) {
+        setNewVertexValue(minDistance)
+      }
+    }
+  }
+
+  /**
+   * Distributes the minimum distance associated with a given vertex among all
+   * the target vertices summed up with the edge's value.
+   */
+  private final class MinDistanceMessenger extends
+    MessagingFunction[Long, Double, Double, Double] {
+
+    override def sendMessages(vertex: Vertex[Long, Double]) {
+      if (vertex.getValue < Double.PositiveInfinity) {
+        for (edge: Edge[Long, Double] <- getEdges) {
+          sendMessageTo(edge.getTarget, vertex.getValue + edge.getValue)
+        }
+      }
+    }
+  }
+
+  // ****************************************************************************
+  // UTIL METHODS
+  // ****************************************************************************
+
+  private var fileOutput = false
+  private var srcVertexId = 1L
+  private var edgesInputPath: String = null
+  private var outputPath: String = null
+  private var maxIterations = 5
+
+  private def parseParameters(args: Array[String]): Boolean = {
+    if(args.length > 0) {
+      if(args.length != 4) {
+        System.err.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+          " <input edges path> <output path> <num iterations>")
+      }
+      fileOutput = true
+      srcVertexId = args(0).toLong
+      edgesInputPath = args(1)
+      outputPath = args(2)
+      maxIterations = 3
+    } else {
+      System.out.println("Executing Single Source Shortest Paths example "
+        + "with default parameters and built-in default data.")
+      System.out.println("  Provide parameters to read input data from files.")
+      System.out.println("  See the documentation for the correct format of input files.")
+      System.out.println("Usage: SingleSourceShortestPaths <source vertex id>" +
+        " <input edges path> <output path> <num iterations>")
+    }
+    true
+  }
+
+  private def getEdgesDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, Double]] = {
+    if (fileOutput) {
+      env.readCsvFile[(Long, Long, Double)](edgesInputPath,
+        lineDelimiter = "\n",
+        fieldDelimiter = "\t")
+        .map(new Tuple3ToEdgeMap[Long, Double]())
+    } else {
+      val edgeData = SingleSourceShortestPathsData.DEFAULT_EDGES map {
+        case Array(x, y, z) => (x.asInstanceOf[Long], y.asInstanceOf[Long],
+          z.asInstanceOf[Double])
+      }
+      env.fromCollection(edgeData).map(new Tuple3ToEdgeMap[Long, Double]())
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
new file mode 100644
index 0000000..cd8af9b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/CommunityDetectionITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.CommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class CommunityDetectionITCase extends MultipleProgramsTestBase {
+
+	public CommunityDetectionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expected;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example
+		 */
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+
+		expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
+		compareResultAsTuples(result, expected);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
+				CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
+
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
+        		.getVertices().collect();
+		expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
+		compareResultAsTuples(result, expected);
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitLabels implements MapFunction<Long, Long>{
+
+		public Long map(Long id) {
+			return id;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
new file mode 100644
index 0000000..8b9234b
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/LabelPropagationITCase.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.LabelPropagationData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class LabelPropagationITCase extends MultipleProgramsTestBase {
+
+	public LabelPropagationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String expectedResult;
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of label propagation example with a simple graph
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getDefaultVertexSet(env),
+				LabelPropagationData.getDefaultEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph
+			.run(new LabelPropagation<Long, Long, NullValue>(1))
+			.collect();
+
+		expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test the label propagation example where a tie must be broken
+		 */
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, NullValue> inputGraph = Graph.fromDataSet(
+				LabelPropagationData.getTieVertexSet(env),
+				LabelPropagationData.getTieEdgeDataSet(env), env);
+
+        List<Vertex<Long, Long>> result = inputGraph
+			.run(new LabelPropagation<Long, Long, NullValue>(1))
+			.collect();
+
+		expectedResult = LabelPropagationData.LABELS_WITH_TIE;
+		compareResultAsTuples(result, expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
new file mode 100644
index 0000000..034bcd5
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.PageRankData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
+        		.collect();
+        
+        compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
+        		.collect();
+        
+        compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
+        		.collect();
+        
+        compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
+        		.collect();
+        
+        compareWithDelta(result, 0.01);
+	}
+
+	private void compareWithDelta(List<Vertex<Long, Double>> result,
+																double delta) {
+
+		String resultString = "";
+        for (Vertex<Long, Double> v : result) {
+        	resultString += v.f0.toString() + "," + v.f1.toString() +"\n";
+        }
+
+		String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+		Arrays.sort(expected);
+        Arrays.sort(resultArray);
+
+		for (int i = 0; i < expected.length; i++) {
+			String[] expectedFields = expected[i].split(",");
+			String[] resultFields = resultArray[i].split(",");
+
+			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+			double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+			Assert.assertTrue("Values differ by more than the permissible delta",
+					Math.abs(expectedPayLoad - resultPayLoad) < delta);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 1.0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2c605d27/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
new file mode 100644
index 0000000..17ddcfa
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import com.google.common.collect.Lists;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.data.SummarizationData;
+import org.apache.flink.graph.library.Summarization.EdgeValue;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.regex.Pattern;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+@RunWith(Parameterized.class)
+public class SummarizationITCase extends MultipleProgramsTestBase {
+
+	private static final Pattern TOKEN_SEPARATOR = Pattern.compile(";");
+
+	private static final Pattern ID_SEPARATOR = Pattern.compile(",");
+
+	public SummarizationITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testWithVertexAndEdgeValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, String, String> input = Graph.fromDataSet(
+				SummarizationData.getVertices(env),
+				SummarizationData.getEdges(env),
+				env
+		);
+
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
+		List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList();
+
+		Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output =
+				input.run(new Summarization<Long, String, String>());
+
+		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+		env.execute();
+
+		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+		validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges);
+	}
+
+	@Test
+	public void testWithVertexAndAbsentEdgeValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, String, NullValue> input = Graph.fromDataSet(
+				SummarizationData.getVertices(env),
+				SummarizationData.getEdgesWithAbsentValues(env),
+				env
+		);
+
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
+		List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList();
+
+		Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output =
+				input.run(new Summarization<Long, String, NullValue>());
+
+		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+		env.execute();
+
+		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+		validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges);
+	}
+
+	private void validateVertices(String[] expectedVertices,
+																List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) {
+		Arrays.sort(expectedVertices);
+		Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() {
+			@Override
+			public int compare(Vertex<Long, Summarization.VertexValue<String>> o1,
+												 Vertex<Long, Summarization.VertexValue<String>> o2) {
+				int result = o1.getId().compareTo(o2.getId());
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
+				}
+				return result;
+			}
+		});
+
+		for (int i = 0; i < expectedVertices.length; i++) {
+			validateVertex(expectedVertices[i], actualVertices.get(i));
+		}
+	}
+
+	private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges,
+														 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+		Arrays.sort(expectedEdges);
+		Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () {
+
+			@Override
+			public int compare(Edge<Long, EdgeValue<EV>> o1, Edge<Long, EdgeValue<EV>> o2) {
+				int result = o1.getSource().compareTo(o2.getSource());
+				if (result == 0) {
+					result = o1.getTarget().compareTo(o2.getTarget());
+				}
+				if (result == 0) {
+					result = o1.getTarget().compareTo(o2.getTarget());
+				}
+				if (result == 0) {
+					result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
+				}
+				if (result == 0) {
+					result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
+				}
+				return result;
+			}
+		});
+
+		for (int i = 0; i < expectedEdges.length; i++) {
+			validateEdge(expectedEdges[i], actualEdges.get(i));
+		}
+	}
+
+	private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) {
+		String[] tokens = TOKEN_SEPARATOR.split(expected);
+		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
+		assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue());
+		assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount());
+	}
+
+	private <EV> void validateEdge(String expected, Edge<Long, EdgeValue<EV>> actual) {
+		String[] tokens = TOKEN_SEPARATOR.split(expected);
+		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getSource()));
+		assertTrue(getListFromIdRange(tokens[1]).contains(actual.getTarget()));
+		assertEquals(getGroupValue(tokens[2]), actual.getValue().getEdgeGroupValue().toString());
+		assertEquals(getGroupCount(tokens[2]), actual.getValue().getEdgeGroupCount());
+	}
+
+	private List<Long> getListFromIdRange(String idRange) {
+		List<Long> result = Lists.newArrayList();
+		for (String id : ID_SEPARATOR.split(idRange)) {
+			result.add(Long.parseLong(id));
+		}
+		return result;
+	}
+
+	private String getGroupValue(String token) {
+		return ID_SEPARATOR.split(token)[0];
+	}
+
+	private Long getGroupCount(String token) {
+		return Long.valueOf(ID_SEPARATOR.split(token)[1]);
+	}
+}


Mime
View raw message