flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [47/50] [abbrv] flink git commit: [FLINK-1201] [gelly] added licence headers; added gelly to addons pom file; fixed checkstyle errors
Date Wed, 11 Feb 2015 10:49:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
index 50c9ae5..c490bb3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/LabelPropagationExample.java
@@ -1,71 +1,94 @@
-package flink.graphs.example;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.*;
-import flink.graphs.library.LabelPropagation;
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.*;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.LabelPropagation;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
 /**
- * This example uses the label propagation algorithm to detect communities by propagating labels.
- * Initially, each vertex is assigned its id as its label.
- * The vertices iteratively propagate their labels to their neighbors and adopt the most frequent label
- * among their neighbors.
- * The algorithm converges when no vertex changes value or the maximum number of iterations have been reached.
+ * This example uses the label propagation algorithm to detect communities by
+ * propagating labels. Initially, each vertex is assigned its id as its label.
+ * The vertices iteratively propagate their labels to their neighbors and adopt
+ * the most frequent label among their neighbors. The algorithm converges when
+ * no vertex changes value or the maximum number of iterations have been
+ * reached.
  */
 public class LabelPropagationExample implements ProgramDescription {
 
-    public static void main (String [] args) throws Exception {
+	public static void main(String[] args) throws Exception {
 
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
-        DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
 
-        Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges,	env);
 
-        DataSet<Vertex<Long, Long>> verticesWithCommunity =
-                graph.run(new LabelPropagation<Long>(maxIterations)).getVertices();
+		DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
+				new LabelPropagation<Long>(maxIterations)).getVertices();
 
-        verticesWithCommunity.print();
+		verticesWithCommunity.print();
 
-        env.execute();
-    }
+		env.execute();
+	}
 
-    @Override
-    public String getDescription() {
-        return "Label Propagation Example";
-    }
+	@Override
+	public String getDescription() {
+		return "Label Propagation Example";
+	}
 
-    private static long numVertices = 100;
-    private static int maxIterations = 20;
+	private static long numVertices = 100;
+	private static int maxIterations = 20;
 
 	@SuppressWarnings("serial")
 	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numVertices)
-                    .map(new MapFunction<Long, Vertex<Long, Long>>() {
-                        public Vertex<Long, Long> map(Long l) throws Exception {
-                            return new Vertex<Long, Long>(l, l);
-                        }
-                    });
-    }
+		return env.generateSequence(1, numVertices).map(
+				new MapFunction<Long, Vertex<Long, Long>>() {
+					public Vertex<Long, Long> map(Long l) throws Exception {
+						return new Vertex<Long, Long>(l, l);
+					}
+				});
+	}
 
-    @SuppressWarnings("serial")
+	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numVertices)
-                    .flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-                        @Override
-                        public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) {
-                            int numOutEdges = (int) (Math.random() * (numVertices / 2));
-                            for (int i = 0; i < numOutEdges; i++) {
-                                long target = (long) (Math.random() * numVertices) + 1;
-                                out.collect(new Edge<Long, NullValue>(key, target, NullValue.getInstance()));
-                            }
-                        }
-                    });
-    }
+		return env.generateSequence(1, numVertices).flatMap(
+				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+					@Override
+					public void flatMap(Long key,
+							Collector<Edge<Long, NullValue>> out) {
+						int numOutEdges = (int) (Math.random() * (numVertices / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numVertices) + 1;
+							out.collect(new Edge<Long, NullValue>(key, target,
+									NullValue.getInstance()));
+						}
+					}
+				});
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 668c765..948ac5b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -1,4 +1,22 @@
-package flink.graphs.example;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -13,176 +31,186 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.MusicProfilesData;
+import org.apache.flink.graph.library.LabelPropagation;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.EdgeDirection;
-import flink.graphs.EdgesFunctionWithVertexValue;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.example.utils.MusicProfilesData;
-import flink.graphs.library.LabelPropagation;
-
 @SuppressWarnings("serial")
 public class MusicProfiles implements ProgramDescription {
 
 	/**
-	 * This example demonstrates how to mix the "record" Flink API with the graph API.
-	 * The input is a set <userId - songId - playCount> triplets and a set of
-	 * bad records,i.e. song ids that should not be trusted.
-	 * Initially, we use the record API to filter out the bad records.
-	 * Then, we use the graph API to create a user -> song weighted bipartite graph
-	 * and compute the top song (most listened) per user.
-	 * Then, we use the record API again, to create a user-user similarity graph, 
-	 * based on common songs, where two users that listen to the same song are connected.
-	 * Finally, we use the graph API to run the label propagation community detection algorithm
-	 * on the similarity graph.
+	 * This example demonstrates how to mix the "record" Flink API with the
+	 * graph API. The input is a set <userId - songId - playCount> triplets and
+	 * a set of bad records,i.e. song ids that should not be trusted. Initially,
+	 * we use the record API to filter out the bad records. Then, we use the
+	 * graph API to create a user -> song weighted bipartite graph and compute
+	 * the top song (most listened) per user. Then, we use the record API again,
+	 * to create a user-user similarity graph, based on common songs, where two
+	 * users that listen to the same song are connected. Finally, we use the
+	 * graph API to run the label propagation community detection algorithm on
+	 * the similarity graph.
 	 */
-	public static void main (String [] args) throws Exception {
-    	
-    	ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-    	final int numIterations = 10;
-
-    	/** 
-    	 *  Read the user-song-play triplets
-    	 *  The format is <userID>\t<songID>\t<playcount>
-    	 */
-    	DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env);
-
-    	/**
-    	 *  Read the mismatches dataset and extract the songIDs
-    	 *  The format is "ERROR: <songID trackID> song_title"
-    	 */
-    	DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
-
-    	/**
-    	 *  Filter out the mismatches from the triplets dataset
-    	 */
-    	DataSet<Tuple3<String, String, Integer>> validTriplets = triplets.coGroup(mismatches)
-    			.where(1).equalTo(0).with(new FilterOutMismatches());
-
-    	/**
-    	 *  Create a user -> song weighted bipartite graph
-    	 *  where the edge weights correspond to play counts
-    	 */
-    	Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env);
-
-    	/**
-    	 *  Get the top track (most listened) for each user
-    	 */
-    	DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph.reduceOnEdges(new GetTopSongPerUser(), 
-    			EdgeDirection.OUT).filter(new FilterSongNodes());
-
-    	usersWithTopTrack.print();
-
-    	/**
-    	 * Create a user-user similarity graph, based on common songs, 
-    	 * i.e. two users that listen to the same song are connected.
-    	 * For each song, we create an edge between each pair of its in-neighbors.
-    	 */
-    	DataSet<Edge<String, NullValue>> similarUsers = userSongGraph.getEdges().groupBy(1)
-    			.reduceGroup(new CreateSimilarUserEdges()).distinct();
-
-    	Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
-
-    			new MapFunction<String, Long>() {
-					public Long map(String value) { return 1l; }
-
-    	}, env).getUndirected();
-
-    	/**
-    	 * Detect user communities using the label propagation library method
-    	 */
-
-    	// Initialize each vertex with a unique numeric label
-    	DataSet<Tuple2<String, Long>> idsWithInitialLabels = similarUsersGraph.getVertices()
-    			.reduceGroup(new AssignInitialLabelReducer());
-
-    	// update the vertex values and run the label propagation algorithm
-    	DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph.joinWithVertices(idsWithInitialLabels,
-    					new MapFunction<Tuple2<Long, Long>, Long>() {
-							public Long map(Tuple2<Long, Long> value) {	return value.f1; }
-						})
-    			.run(new LabelPropagation<String>(numIterations)).getVertices();
-
-    	verticesWithCommunity.print();
-
-    	env.execute();
-    }
+	public static void main(String[] args) throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final int numIterations = 10;
+
+		/**
+		 * Read the user-song-play triplets The format is
+		 * <userID>\t<songID>\t<playcount>
+		 */
+		DataSet<Tuple3<String, String, Integer>> triplets = MusicProfilesData.getUserSongTriplets(env);
+
+		/**
+		 * Read the mismatches dataset and extract the songIDs The format is
+		 * "ERROR: <songID trackID> song_title"
+		 */
+		DataSet<Tuple1<String>> mismatches = MusicProfilesData.getMismatches(env).map(new ExtractMismatchSongIds());
+
+		/**
+		 * Filter out the mismatches from the triplets dataset
+		 */
+		DataSet<Tuple3<String, String, Integer>> validTriplets = triplets
+				.coGroup(mismatches).where(1).equalTo(0)
+				.with(new FilterOutMismatches());
+
+		/**
+		 * Create a user -> song weighted bipartite graph where the edge weights
+		 * correspond to play counts
+		 */
+		Graph<String, NullValue, Integer> userSongGraph = Graph.fromTupleDataSet(validTriplets, env);
+
+		/**
+		 * Get the top track (most listened) for each user
+		 */
+		DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph
+				.reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
+				.filter(new FilterSongNodes());
+
+		usersWithTopTrack.print();
+
+		/**
+		 * Create a user-user similarity graph, based on common songs, i.e. two
+		 * users that listen to the same song are connected. For each song, we
+		 * create an edge between each pair of its in-neighbors.
+		 */
+		DataSet<Edge<String, NullValue>> similarUsers = userSongGraph
+				.getEdges().groupBy(1)
+				.reduceGroup(new CreateSimilarUserEdges()).distinct();
+
+		Graph<String, Long, NullValue> similarUsersGraph = Graph.fromDataSet(similarUsers,
+				new MapFunction<String, Long>() {
+					public Long map(String value) {
+						return 1l;
+					}
+				}, env).getUndirected();
+
+		/**
+		 * Detect user communities using the label propagation library method
+		 */
+
+		// Initialize each vertex with a unique numeric label
+		DataSet<Tuple2<String, Long>> idsWithInitialLabels = similarUsersGraph
+				.getVertices().reduceGroup(new AssignInitialLabelReducer());
+
+		// update the vertex values and run the label propagation algorithm
+		DataSet<Vertex<String, Long>> verticesWithCommunity = similarUsersGraph
+				.joinWithVertices(idsWithInitialLabels,
+						new MapFunction<Tuple2<Long, Long>, Long>() {
+							public Long map(Tuple2<Long, Long> value) {
+								return value.f1;
+							}
+						}).run(new LabelPropagation<String>(numIterations))
+				.getVertices();
+
+		verticesWithCommunity.print();
+
+		env.execute();
+	}
 
 	public static final class ExtractMismatchSongIds implements MapFunction<String, Tuple1<String>> {
+
 		public Tuple1<String> map(String value) {
-			String[] tokens = value.split("\\s+"); 
+			String[] tokens = value.split("\\s+");
 			String songId = tokens[1].substring(1);
 			return new Tuple1<String>(songId);
 		}
-    }
-
-	public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>, 
-    	Tuple1<String>, Tuple3<String, String, Integer>> {
-		public void coGroup(
-				Iterable<Tuple3<String, String, Integer>> triplets,
-				Iterable<Tuple1<String>> invalidSongs,
-				Collector<Tuple3<String, String, Integer>> out) {
+	}
+
+	public static final class FilterOutMismatches implements CoGroupFunction<Tuple3<String, String, Integer>,
+		Tuple1<String>, Tuple3<String, String, Integer>> {
+
+		public void coGroup(Iterable<Tuple3<String, String, Integer>> triplets,
+				Iterable<Tuple1<String>> invalidSongs, Collector<Tuple3<String, String, Integer>> out) {
+
 			if (!invalidSongs.iterator().hasNext()) {
 				// this is a valid triplet
 				for (Tuple3<String, String, Integer> triplet : triplets) {
-					out.collect(triplet);					
+					out.collect(triplet);
 				}
 			}
 		}
-    }
+	}
 
- 	public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> {
+	public static final class FilterSongNodes implements FilterFunction<Tuple2<String, String>> {
 		public boolean filter(Tuple2<String, String> value) throws Exception {
 			return !value.f1.equals("");
 		}
-    }
+	}
 
-	public static final class GetTopSongPerUser implements EdgesFunctionWithVertexValue
-		<String, NullValue, Integer, Tuple2<String, String>> {
-		public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex,	
+	public static final class GetTopSongPerUser	implements EdgesFunctionWithVertexValue<String, NullValue, Integer,
+		Tuple2<String, String>> {
+
+		public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex, 
 				Iterable<Edge<String, Integer>> edges) {
+
 			int maxPlaycount = 0;
 			String topSong = "";
-			for (Edge<String, Integer> edge: edges) {
+			for (Edge<String, Integer> edge : edges) {
 				if (edge.getValue() > maxPlaycount) {
 					maxPlaycount = edge.getValue();
 					topSong = edge.getTarget();
 				}
 			}
-			return new Tuple2<String, String> (vertex.getId(), topSong);
+			return new Tuple2<String, String>(vertex.getId(), topSong);
 		}
-    }
+	}
+
+	public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>,
+		Edge<String, NullValue>> {
 
- 	public static final class CreateSimilarUserEdges implements GroupReduceFunction<Edge<String, Integer>,
- 		Edge<String, NullValue>> {
 		public void reduce(Iterable<Edge<String, Integer>> edges, Collector<Edge<String, NullValue>> out) {
 			List<String> listeners = new ArrayList<String>();
 			for (Edge<String, Integer> edge : edges) {
 				listeners.add(edge.getSource());
 			}
-			for (int i=0; i < listeners.size()-1; i++) {
-				out.collect(new Edge<String, NullValue>(listeners.get(i), listeners.get(i+1),
-						NullValue.getInstance()));
+			for (int i = 0; i < listeners.size() - 1; i++) {
+				out.collect(new Edge<String, NullValue>(listeners.get(i),
+						listeners.get(i + 1), NullValue.getInstance()));
 			}
 		}
-    }
+	}
 
- 	public static final class AssignInitialLabelReducer implements GroupReduceFunction<Vertex<String, Long>,
- 		Tuple2<String, Long>> {
- 		public void reduce(Iterable<Vertex<String, Long>> vertices, Collector<Tuple2<String, Long>> out) {
+	public static final class AssignInitialLabelReducer implements GroupReduceFunction<Vertex<String, Long>,
+		Tuple2<String, Long>> {
+
+		public void reduce(Iterable<Vertex<String, Long>> vertices,	Collector<Tuple2<String, Long>> out) {
 			long label = 0;
 			for (Vertex<String, Long> vertex : vertices) {
 				out.collect(new Tuple2<String, Long>(vertex.getId(), label));
 				label++;
 			}
 		}
- 	}
+	}
 
 	@Override
 	public String getDescription() {
 		return "Music Profiles Example";
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
index e3f815a..400508c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/PageRankExample.java
@@ -1,80 +1,102 @@
-package flink.graphs.example;
-
-
-import flink.graphs.*;
-import flink.graphs.library.PageRank;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.*;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.PageRank;
 import org.apache.flink.util.Collector;
 
 public class PageRankExample implements ProgramDescription {
 
-    @SuppressWarnings("serial")
-	public static void main (String [] args) throws Exception {
+	@SuppressWarnings("serial")
+	public static void main(String[] args) throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Double>> pages = getPagesDataSet(env);
+
+		DataSet<Edge<Long, Double>> links = getLinksDataSet(env);
+
+		Graph<Long, Double, Double> network = Graph.fromDataSet(pages, links, env);
 
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
 
-        DataSet<Vertex<Long,Double>> pages = getPagesDataSet(env);
+		// assign the transition probabilities as the edge weights
+		Graph<Long, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees,
+						new MapFunction<Tuple2<Double, Long>, Double>() {
+							public Double map(Tuple2<Double, Long> value) {
+								return value.f0 / value.f1;
+							}
+						});
 
-        DataSet<Edge<Long,Double>> links = getLinksDataSet(env);
+		DataSet<Vertex<Long, Double>> pageRanks = networkWithWeights.run(
+				new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations))
+				.getVertices();
 
-        Graph<Long, Double, Double> network = new Graph<Long, Double, Double>(pages, links, env);
-        
-        DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
-        
-        // assign the transition probabilities as the edge weights
-        Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, 
-        		new MapFunction<Tuple2<Double, Long>, Double>() {
-					public Double map(Tuple2<Double, Long> value) {
-						return value.f0 / value.f1;
+		pageRanks.print();
+
+		env.execute();
+	}
+
+	@Override
+	public String getDescription() {
+		return "PageRank";
+	}
+
+	private static final double DAMPENING_FACTOR = 0.85;
+	private static long numPages = 10;
+	private static int maxIterations = 10;
+
+	@SuppressWarnings("serial")
+	private static DataSet<Vertex<Long, Double>> getPagesDataSet(ExecutionEnvironment env) {
+		return env.generateSequence(1, numPages).map(
+				new MapFunction<Long, Vertex<Long, Double>>() {
+					@Override
+					public Vertex<Long, Double> map(Long l) throws Exception {
+						return new Vertex<Long, Double>(l, 1.0 / numPages);
 					}
 				});
 
-        DataSet<Vertex<Long,Double>> pageRanks =
-        		networkWithWeights.run(new PageRank<Long>(numPages, DAMPENING_FACTOR, maxIterations)).getVertices();
-
-        pageRanks.print();
-
-        env.execute();
-    }
-
-    @Override
-    public String getDescription() {
-        return "PageRank";
-    }
-
-    private static final double DAMPENING_FACTOR = 0.85;
-    private static long numPages = 10;
-    private static int maxIterations = 10;
-
-    @SuppressWarnings("serial")
-	private static DataSet<Vertex<Long,Double>> getPagesDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numPages)
-                    .map(new MapFunction<Long, Vertex<Long, Double>>() {
-                        @Override
-                        public Vertex<Long, Double> map(Long l) throws Exception {
-                            return new Vertex<Long, Double>(l, 1.0 / numPages);
-                        }
-                    });
-
-    }
-
-    @SuppressWarnings("serial")
-    private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
-            return env.generateSequence(1, numPages)
-                    .flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
-                        @Override
-                        public void flatMap(Long key, Collector<Edge<Long, Double>> out) throws Exception {
-                            int numOutEdges = (int) (Math.random() * (numPages / 2));
-                            for (int i = 0; i < numOutEdges; i++) {
-                                long target = (long) (Math.random() * numPages) + 1;
-                                out.collect(new Edge<Long, Double>(key, target, 1.0));
-                            }
-                        }
-                    });
-    }
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, Double>> getLinksDataSet(ExecutionEnvironment env) {
+		return env.generateSequence(1, numPages).flatMap(
+				new FlatMapFunction<Long, Edge<Long, Double>>() {
+					@Override
+					public void flatMap(Long key,
+							Collector<Edge<Long, Double>> out) throws Exception {
+						int numOutEdges = (int) (Math.random() * (numPages / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numPages) + 1;
+							out.collect(new Edge<Long, Double>(key, target, 1.0));
+						}
+					}
+				});
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 75e33dc..7f31525 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -1,41 +1,59 @@
-package flink.graphs.example;
-
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.example.utils.ExampleUtils;
-import flink.graphs.library.SingleSourceShortestPaths;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ExampleUtils;
+import org.apache.flink.graph.library.SingleSourceShortestPaths;
 
 public class SingleSourceShortestPathsExample implements ProgramDescription {
 
-    private static int maxIterations = 5;
+	private static int maxIterations = 5;
 
-    public static void main (String [] args) throws Exception {
+	public static void main(String[] args) throws Exception {
 
-        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-        DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env);
+		DataSet<Vertex<Long, Double>> vertices = ExampleUtils.getLongDoubleVertexData(env);
 
-        DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env);
+		DataSet<Edge<Long, Double>> edges = ExampleUtils.getLongDoubleEdgeData(env);
 
-        Long srcVertexId = 1L;
+		Long srcVertexId = 1L;
 
-        Graph<Long, Double, Double> graph = Graph.create(vertices, edges, env);
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
 
-        DataSet<Vertex<Long,Double>> singleSourceShortestPaths =
-                graph.run(new SingleSourceShortestPaths<Long>(srcVertexId, maxIterations)).getVertices();
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = graph
+				.run(new SingleSourceShortestPaths<Long>(srcVertexId,
+						maxIterations)).getVertices();
 
-        singleSourceShortestPaths.print();
+		singleSourceShortestPaths.print();
 
-        env.execute();
-    }
+		env.execute();
+	}
 
-    @Override
-    public String getDescription() {
-        return "Single Source Shortest Paths";
-    }
+	@Override
+	public String getDescription() {
+		return "Single Source Shortest Paths";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
index 8c131e4..3f1c7bb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/EdgeWithLongIdNullValueParser.java
@@ -1,10 +1,27 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.types.NullValue;
 
-import flink.graphs.Edge;
-
 public class EdgeWithLongIdNullValueParser extends RichMapFunction<String, Edge<Long, NullValue>> {
 	private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
index 4588230..d986478 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/ExampleUtils.java
@@ -1,4 +1,22 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import java.io.PrintStream;
 import java.util.ArrayList;
@@ -10,46 +28,47 @@ import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Vertex;
-
 public class ExampleUtils {
 
 	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-	public static void printResult(DataSet set, String msg, ExecutionEnvironment env) {
+	public static void printResult(DataSet set, String msg) {
 		set.output(new PrintingOutputFormatWithMessage(msg) {
 		});
 	}
-	
-	public static class PrintingOutputFormatWithMessage<T> implements OutputFormat<T> {
+
+	public static class PrintingOutputFormatWithMessage<T> implements
+			OutputFormat<T> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		private transient PrintStream stream;
-		
+
 		private transient String prefix;
-		
+
 		private String message;
-		
+
 		// --------------------------------------------------------------------------------------------
-		
+
 		/**
 		 * Instantiates a printing output format that prints to standard out.
 		 */
-		public PrintingOutputFormatWithMessage() {}
-		
+		public PrintingOutputFormatWithMessage() {
+		}
+
 		public PrintingOutputFormatWithMessage(String msg) {
-			this.message = msg;	
+			this.message = msg;
 		}
 
 		@Override
 		public void open(int taskNumber, int numTasks) {
 			// get the target stream
 			this.stream = System.out;
-			
+
 			// set the prefix to message
 			this.prefix = message + ": ";
 		}
@@ -58,8 +77,7 @@ public class ExampleUtils {
 		public void writeRecord(T record) {
 			if (this.prefix != null) {
 				this.stream.println(this.prefix + record.toString());
-			}
-			else {
+			} else {
 				this.stream.println(record.toString());
 			}
 		}
@@ -69,41 +87,46 @@ public class ExampleUtils {
 			this.stream = null;
 			this.prefix = null;
 		}
-		
+
 		@Override
 		public String toString() {
 			return "Print to System.out";
 		}
 
 		@Override
-		public void configure(Configuration parameters) {}
+		public void configure(Configuration parameters) {
+		}
 	}
 
 	@SuppressWarnings("serial")
-	public static DataSet<Vertex<Long, NullValue>> getVertexIds(ExecutionEnvironment env,
-			final long numVertices) {
-        return env.generateSequence(1, numVertices)
-                .map(new MapFunction<Long, Vertex<Long, NullValue>>() {
-                    public Vertex<Long, NullValue> map(Long l) {
-                        return new Vertex<Long, NullValue>(l, NullValue.getInstance());
-                    }
-                });
+	public static DataSet<Vertex<Long, NullValue>> getVertexIds(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).map(
+				new MapFunction<Long, Vertex<Long, NullValue>>() {
+					public Vertex<Long, NullValue> map(Long l) {
+						return new Vertex<Long, NullValue>(l, NullValue
+								.getInstance());
+					}
+				});
 	}
 
 	@SuppressWarnings("serial")
-	public static DataSet<Edge<Long, NullValue>> getRandomEdges(ExecutionEnvironment env,
-			final long numVertices) {
-	        return env.generateSequence(1, numVertices)
-	                .flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-	                    @Override
-	                    public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception {
-	                        int numOutEdges = (int) (Math.random() * (numVertices / 2));
-	                        for (int i = 0; i < numOutEdges; i++) {
-	                            long target = (long) (Math.random() * numVertices) + 1;
-	                            out.collect(new Edge<Long, NullValue>(key, target, NullValue.getInstance()));
-	                        }
-	                    }
-	                });
+	public static DataSet<Edge<Long, NullValue>> getRandomEdges(
+			ExecutionEnvironment env, final long numVertices) {
+		return env.generateSequence(1, numVertices).flatMap(
+				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+					@Override
+					public void flatMap(Long key,
+							Collector<Edge<Long, NullValue>> out)
+							throws Exception {
+						int numOutEdges = (int) (Math.random() * (numVertices / 2));
+						for (int i = 0; i < numOutEdges; i++) {
+							long target = (long) (Math.random() * numVertices) + 1;
+							out.collect(new Edge<Long, NullValue>(key, target,
+									NullValue.getInstance()));
+						}
+					}
+				});
 	}
 
 	public static final DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
@@ -117,7 +140,7 @@ public class ExampleUtils {
 
 		return env.fromCollection(vertices);
 	}
-	
+
 	public static final DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
 			ExecutionEnvironment env) {
 		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
@@ -128,8 +151,7 @@ public class ExampleUtils {
 		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
 		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
 		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-		
+
 		return env.fromCollection(edges);
 	}
 }
-

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
index cfe9c88..0a7162d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -1,4 +1,22 @@
-package flink.graphs.example.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
 
 import java.util.ArrayList;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index fc77d0b..69d7713 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -1,10 +1,28 @@
-package flink.graphs.library;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.*;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+package org.apache.flink.graph.library;
 
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.NullValue;
 
 import java.io.Serializable;
@@ -13,43 +31,43 @@ import java.util.Map;
 import java.util.Map.Entry;
 
 /**
- * An implementation of the label propagation algorithm.
- * The iterative algorithm detects communities by propagating labels.
- * In each iteration, a vertex adopts the label that is most frequent among its neighbors' labels.
- * Labels are represented by Longs and we assume a total ordering among them, in order to break ties.
- * The algorithm converges when no vertex changes its value or the maximum number of iterations have been reached.
- * Note that different initializations might lead to different results.
- *
+ * An implementation of the label propagation algorithm. The iterative algorithm
+ * detects communities by propagating labels. In each iteration, a vertex adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ * 
  */
 @SuppressWarnings("serial")
-public class LabelPropagation<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Long, NullValue> {
+public class LabelPropagation<K extends Comparable<K> & Serializable>
+		implements GraphAlgorithm<K, Long, NullValue> {
 
-    private final int maxIterations;
+	private final int maxIterations;
 
-    public LabelPropagation(int maxIterations) {
-        this.maxIterations = maxIterations;
-    }
+	public LabelPropagation(int maxIterations) {
+		this.maxIterations = maxIterations;
+	}
 
-    @Override
-    public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+	@Override
+	public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
 
-    	// iteratively adopt the most frequent label among the neighbors
-    	// of each vertex
-    	return input.runVertexCentricIteration(
-                new UpdateVertexLabel<K>(),
-                new SendNewLabelToNeighbors<K>(),
-                maxIterations
-        );
-    }
+		// iteratively adopt the most frequent label among the neighbors
+		// of each vertex
+		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(),
+				new SendNewLabelToNeighbors<K>(), maxIterations);
+	}
 
-    /**
-     * Function that updates the value of a vertex by adopting the most frequent label
-     * among its in-neighbors
-     */
-    public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable>
-            extends VertexUpdateFunction<K, Long, Long> {
+	/**
+	 * Function that updates the value of a vertex by adopting the most frequent
+	 * label among its in-neighbors
+	 */
+	public static final class UpdateVertexLabel<K extends Comparable<K> & Serializable>
+			extends VertexUpdateFunction<K, Long, Long> {
 
-		public void updateVertex(K vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+		public void updateVertex(K vertexKey, Long vertexValue,
+				MessageIterator<Long> inMessages) {
 			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
 
 			long maxFrequency = 1;
@@ -60,12 +78,12 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements
 				if (labelsWithFrequencies.containsKey(msg)) {
 					long currentFreq = labelsWithFrequencies.get(msg);
 					labelsWithFrequencies.put(msg, currentFreq + 1);
-				}
-				else {
+				} else {
 					labelsWithFrequencies.put(msg, 1L);
 				}
 			}
-			// select the most frequent label: if two or more labels have the same frequency,
+			// select the most frequent label: if two or more labels have the
+			// same frequency,
 			// the node adopts the label with the highest value
 			for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
 				if (entry.getValue() == maxFrequency) {
@@ -73,8 +91,7 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements
 					if (entry.getKey() > mostFrequentLabel) {
 						mostFrequentLabel = entry.getKey();
 					}
-				}
-				else if (entry.getValue() > maxFrequency) {
+				} else if (entry.getValue() > maxFrequency) {
 					maxFrequency = entry.getValue();
 					mostFrequentLabel = entry.getKey();
 				}
@@ -83,16 +100,16 @@ public class LabelPropagation<K extends Comparable<K> & Serializable> implements
 			// set the new vertex value
 			setNewVertexValue(mostFrequentLabel);
 		}
-    }
+	}
 
-    /**
-     * Sends the vertex label to all out-neighbors
-     */
-    public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable>
-            extends MessagingFunction<K, Long, Long, NullValue> {
+	/**
+	 * Sends the vertex label to all out-neighbors
+	 */
+	public static final class SendNewLabelToNeighbors<K extends Comparable<K> & Serializable>
+			extends MessagingFunction<K, Long, Long, NullValue> {
 
-    	public void sendMessages(K vertexKey, Long newLabel) {
-            sendMessageToAllNeighbors(newLabel);
-        }
-    }
+		public void sendMessages(K vertexKey, Long newLabel) {
+			sendMessageToAllNeighbors(newLabel);
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index d29a9dc..39b8ef1 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -1,76 +1,95 @@
-package flink.graphs.library;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.GraphAlgorithm;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+package org.apache.flink.graph.library;
 
 import java.io.Serializable;
 
-public class PageRank<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> {
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
-    private long numVertices;
-    private double beta;
-    private int maxIterations;
+public class PageRank<K extends Comparable<K> & Serializable> implements
+		GraphAlgorithm<K, Double, Double> {
 
-    public PageRank(long numVertices, double beta, int maxIterations) {
-        this.numVertices = numVertices;
-        this.beta = beta;
-        this.maxIterations = maxIterations;
-    }
+	private long numVertices;
+	private double beta;
+	private int maxIterations;
 
-    @Override
-    public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
-        return network.runVertexCentricIteration(
-                new VertexRankUpdater<K>(numVertices, beta),
-                new RankMessenger<K>(),
-                maxIterations
-        );
-    }
+	public PageRank(long numVertices, double beta, int maxIterations) {
+		this.numVertices = numVertices;
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+	}
 
+	@Override
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) {
+		return network.runVertexCentricIteration(new VertexRankUpdater<K>(
+				numVertices, beta), new RankMessenger<K>(), maxIterations);
+	}
 
-    /**
-     * Function that updates the rank of a vertex by summing up the partial ranks from all incoming messages
-     * and then applying the dampening formula.
-     */
-    @SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K extends Comparable<K> & Serializable> extends VertexUpdateFunction<K, Double, Double> {
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial
+	 * ranks from all incoming messages and then applying the dampening formula.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexRankUpdater<K extends Comparable<K> & Serializable>
+			extends VertexUpdateFunction<K, Double, Double> {
 
-        private final long numVertices;
-        private final double beta;
+		private final long numVertices;
+		private final double beta;
 
-        public VertexRankUpdater(long numVertices, double beta) {
-            this.numVertices = numVertices;
-            this.beta = beta;
-        }
+		public VertexRankUpdater(long numVertices, double beta) {
+			this.numVertices = numVertices;
+			this.beta = beta;
+		}
 
-        @Override
-        public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-            double rankSum = 0.0;
-            for (double msg : inMessages) {
-                rankSum += msg;
-            }
+		@Override
+		public void updateVertex(K vertexKey, Double vertexValue,
+				MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
 
-            // apply the dampening factor / random jump
-            double newRank = (beta * rankSum) + (1-beta)/numVertices;
-            setNewVertexValue(newRank);
-        }
-    }
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
 
-    /**
-     * Distributes the rank of a vertex among all target vertices according to the transition probability,
-     * which is associated with an edge as the edge value.
-     */
-    @SuppressWarnings("serial")
-	public static final class RankMessenger<K extends Comparable<K> & Serializable> extends MessagingFunction<K, Double, Double, Double> {
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to
+	 * the transition probability, which is associated with an edge as the edge
+	 * value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class RankMessenger<K extends Comparable<K> & Serializable>
+			extends MessagingFunction<K, Double, Double, Double> {
 
-        @Override
-        public void sendMessages(K vertexId, Double newRank) {
-            for (Edge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.getTarget(), newRank * edge.getValue());
-            }
-        }
-    }
+		@Override
+		public void sendMessages(K vertexId, Double newRank) {
+			for (Edge<K, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 0da8a90..2f575e7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -1,94 +1,114 @@
-package flink.graphs.library;
-
-import flink.graphs.*;
-import flink.graphs.spargel.MessageIterator;
-import flink.graphs.spargel.MessagingFunction;
-import flink.graphs.spargel.VertexUpdateFunction;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable> implements GraphAlgorithm<K, Double, Double> {
+public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
+		implements GraphAlgorithm<K, Double, Double> {
+
+	private final K srcVertexId;
+	private final Integer maxIterations;
 
-    private final K srcVertexId;
-    private final Integer maxIterations;
+	public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+		this.srcVertexId = srcVertexId;
+		this.maxIterations = maxIterations;
+	}
 
-    public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
-        this.srcVertexId = srcVertexId;
-        this.maxIterations = maxIterations;
-    }
+	@Override
+	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
 
-    @Override
-    public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runVertexCentricIteration(new VertexDistanceUpdater<K>(),
+						new MinDistanceMessenger<K>(), maxIterations);
+	}
 
-    	return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-    			.runVertexCentricIteration(
-                new VertexDistanceUpdater<K>(),
-                new MinDistanceMessenger<K>(),
-                maxIterations
-        );
-    }
+	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
+			implements MapFunction<Vertex<K, Double>, Double> {
 
-    public static final class InitVerticesMapper<K extends Comparable<K> & Serializable> 
-    	implements MapFunction<Vertex<K,Double>, Double> {
+		private K srcVertexId;
 
-    	private K srcVertexId;
+		public InitVerticesMapper(K srcId) {
+			this.srcVertexId = srcId;
+		}
 
-    	public InitVerticesMapper(K srcId) {
-    		this.srcVertexId = srcId;
-    	}
-    		
 		public Double map(Vertex<K, Double> value) {
 			if (value.f0.equals(srcVertexId)) {
 				return 0.0;
-			}
-			else {
+			} else {
 				return Double.MAX_VALUE;
 			}
 		}
-    }
-
-    /**
-     * Function that updates the value of a vertex by picking the minimum distance from all incoming messages.
-     *
-     * @param <K>
-     */
-    public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable>
-            extends VertexUpdateFunction<K, Double, Double> {
-
-        @Override
-        public void updateVertex(K vertexKey, Double vertexValue, MessageIterator<Double> inMessages) {
-
-            Double minDistance = Double.MAX_VALUE;
-
-            for (double msg : inMessages) {
-                if (msg < minDistance) {
-                    minDistance = msg;
-                }
-            }
-
-            if (vertexValue > minDistance) {
-                setNewVertexValue(minDistance);
-            }
-        }
-    }
-
-    /**
-     * Distributes the minimum distance associated with a given vertex among all the target vertices
-     * summed up with the edge's value.
-     *
-     * @param <K>
-     */
-    public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable>
-            extends MessagingFunction<K, Double, Double, Double> {
-
-        @Override
-        public void sendMessages(K vertexKey, Double newDistance) throws Exception {
-            for (Edge<K, Double> edge : getOutgoingEdges()) {
-                sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
-            }
-        }
-    }
+	}
+
+	/**
+	 * Function that updates the value of a vertex by picking the minimum
+	 * distance from all incoming messages.
+	 * 
+	 * @param <K>
+	 */
+	public static final class VertexDistanceUpdater<K extends Comparable<K> & Serializable>
+			extends VertexUpdateFunction<K, Double, Double> {
+
+		@Override
+		public void updateVertex(K vertexKey, Double vertexValue,
+				MessageIterator<Double> inMessages) {
+
+			Double minDistance = Double.MAX_VALUE;
+
+			for (double msg : inMessages) {
+				if (msg < minDistance) {
+					minDistance = msg;
+				}
+			}
+
+			if (vertexValue > minDistance) {
+				setNewVertexValue(minDistance);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 * 
+	 * @param <K>
+	 */
+	public static final class MinDistanceMessenger<K extends Comparable<K> & Serializable>
+			extends MessagingFunction<K, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(K vertexKey, Double newDistance)
+				throws Exception {
+			for (Edge<K, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
+			}
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
deleted file mode 100644
index 695b2b8..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/package-info.java
+++ /dev/null
@@ -1,4 +0,0 @@
-/**
- * 
- */
-package flink.graphs;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
index 08ba2c0..d6fdc8a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.util.Iterator;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index ab451bb..e8a297f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.Collection;
@@ -26,11 +26,10 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-
 /**
  * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
  * 

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 5f89e90..9c72485 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -38,11 +38,10 @@ import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Vertex;
-
 /**
  * This class represents iterative graph computations, programmed in a vertex-centric perspective.
  * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
@@ -106,7 +105,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
 			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
 			DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, 
-			int maximumNumberOfIterations, boolean edgeHasValueMarker)
+			int maximumNumberOfIterations)
 	{
 		Validate.notNull(uf);
 		Validate.notNull(mf);
@@ -319,7 +318,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
 					int maximumNumberOfIterations)
 	{
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations, true);
+		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
 	}
 	
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index e30451c..1157a18 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -16,18 +16,17 @@
  * limitations under the License.
  */
 
-package flink.graphs.spargel;
+package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
 import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Vertex;
-
 /**
  * This class must be extended by functions that compute the state of the vertex depending on the old state and the
  * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
index 86103a6..a7b7b62 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-
-import flink.graphs.Edge;
+import org.apache.flink.graph.Edge;
 
 public class EdgeToTuple3Map<K extends Comparable<K> & Serializable, 
 	EV extends Serializable> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 0b0dc18..aba1c14 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -1,4 +1,22 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -14,27 +32,26 @@ public class GraphUtils {
 
 	@SuppressWarnings({ "unchecked", "rawtypes" })
 	public static DataSet<Integer> count(DataSet set, ExecutionEnvironment env) {
-		List<Integer> list = new ArrayList<>();
+		List<Integer> list = new ArrayList<Integer>();
 		list.add(0);
 		DataSet<Integer> initialCount = env.fromCollection(list);
-        return set
-                .map(new OneMapper())
-                .union(initialCount)
-                .reduce(new AddOnesReducer())
-                .first(1);
-    }
+		return set.map(new OneMapper()).union(initialCount)
+				.reduce(new AddOnesReducer()).first(1);
+	}
+
+	private static final class OneMapper<T extends Tuple> implements
+			MapFunction<T, Integer> {
+		@Override
+		public Integer map(T o) throws Exception {
+			return 1;
+		}
+	}
 
-	private static final class OneMapper<T extends Tuple> implements MapFunction<T, Integer> {
-            @Override
-            public Integer map(T o) throws Exception {
-                return 1;
-            }
-    }
-    
-    private static final class AddOnesReducer implements ReduceFunction<Integer> {
-            @Override
-            public Integer reduce(Integer one, Integer two) throws Exception {
-                return one + two;
-            }
-    } 
+	private static final class AddOnesReducer implements
+			ReduceFunction<Integer> {
+		@Override
+		public Integer reduce(Integer one, Integer two) throws Exception {
+			return one + two;
+		}
+	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
index 893ae95..d58e4ff 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-
-import flink.graphs.Vertex;
+import org.apache.flink.graph.Vertex;
 
 public class Tuple2ToVertexMap<K extends Comparable<K> & Serializable, 
 	VV extends Serializable> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
index 2a6cb23..3668dd2 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple3;
-
-import flink.graphs.Edge;
+import org.apache.flink.graph.Edge;
 
 /**
  * create an Edge DataSetfrom a Tuple3 dataset

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
index 30f867d..318e1ed 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -1,11 +1,28 @@
-package flink.graphs.utils;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
 
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple2;
-
-import flink.graphs.Vertex;
+import org.apache.flink.graph.Vertex;
 
 public class VertexToTuple2Map<K extends Comparable<K> & Serializable, 
 	VV extends Serializable> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
index 88dfcde..47209b3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -1,21 +1,39 @@
-package flink.graphs.validation;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
 import java.io.Serializable;
 
 import org.apache.flink.api.java.DataSet;
-
-import flink.graphs.Graph;
+import org.apache.flink.graph.Graph;
 
 /**
  * A validation method for different types of Graphs
- *
+ * 
  * @param <K>
  * @param <VV>
  * @param <EV>
  */
 @SuppressWarnings("serial")
-public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable,
-        EV extends Serializable> implements Serializable{
+public abstract class GraphValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+		implements Serializable {
 
-    public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
+	public abstract DataSet<Boolean> validate(Graph<K, VV, EV> graph);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/081de39b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
index 6b7a619..b043f3c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -1,77 +1,87 @@
-package flink.graphs.validation;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
 
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.util.Collector;
 
-import flink.graphs.Edge;
-import flink.graphs.Graph;
-import flink.graphs.Vertex;
-import flink.graphs.utils.GraphUtils;
-
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable,
-        EV extends Serializable> extends  GraphValidator<K, VV, EV> {
-
-    /**
-     * Checks that the edge set input contains valid vertex Ids,
-     * i.e. that they also exist in the vertex input set.
-     * @return a singleton DataSet<Boolean> stating whether a graph is valid
-     * with respect to its vertex ids.
-     */
-    @Override
-    public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
-        DataSet<Tuple1<K>> edgeIds = graph.getEdges().flatMap(new MapEdgeIds<K, EV>()).distinct();
-        DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0).equalTo(0)
-                .with(new GroupInvalidIds<K, VV>()).first(1);
-
-        return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()), graph.getContext())
-                .map(new InvalidIdsMap());
-    }
-
-    private static final class MapEdgeIds<K extends Comparable<K> & Serializable,
-            EV extends Serializable> implements FlatMapFunction<Edge<K, EV>,
-            Tuple1<K>> {
-
-        @Override
-        public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-            out.collect(new Tuple1<K>(edge.f0));
-            out.collect(new Tuple1<K>(edge.f1));
-        }
-    }
+public class InvalidVertexIdsValidator<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+		extends GraphValidator<K, VV, EV> {
 
-    private static final class GroupInvalidIds<K extends Comparable<K> & Serializable,
-            VV extends Serializable> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
+	/**
+	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
+	 * also exist in the vertex input set.
+	 * 
+	 * @return a singleton DataSet<Boolean> stating whether a graph is valid
+	 *         with respect to its vertex ids.
+	 */
+	@Override
+	public DataSet<Boolean> validate(Graph<K, VV, EV> graph) {
+		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
+				.flatMap(new MapEdgeIds<K, EV>()).distinct();
+		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
+				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
 
-        @Override
-        public void coGroup(Iterable<Vertex<K, VV>> vertexId,
-                            Iterable<Tuple1<K>> edgeId, Collector<K> out) {
-            if (!(vertexId.iterator().hasNext())) {
-                // found an id that doesn't exist in the vertex set
-                out.collect(edgeId.iterator().next().f0);
-            }
-        }
-    }
+		return GraphUtils.count(invalidIds.map(new KToTupleMap<K>()),
+				graph.getContext()).map(new InvalidIdsMap());
+	}
 
-    private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
+	private static final class MapEdgeIds<K extends Comparable<K> & Serializable, EV extends Serializable>
+			implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
+			out.collect(new Tuple1<K>(edge.f0));
+			out.collect(new Tuple1<K>(edge.f1));
+		}
+	}
 
-        @Override
-        public Tuple1<K> map (K key)throws Exception {
-            return new Tuple1<>(key);
-        }
-    }
+	private static final class GroupInvalidIds<K extends Comparable<K> & Serializable, VV extends Serializable>
+			implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
+		public void coGroup(Iterable<Vertex<K, VV>> vertexId,
+				Iterable<Tuple1<K>> edgeId, Collector<K> out) {
+			if (!(vertexId.iterator().hasNext())) {
+				// found an id that doesn't exist in the vertex set
+				out.collect(edgeId.iterator().next().f0);
+			}
+		}
+	}
 
-    private static final class InvalidIdsMap implements MapFunction<Integer, Boolean> {
+	private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
+		public Tuple1<K> map(K key) throws Exception {
+			return new Tuple1<K>(key);
+		}
+	}
 
-        @Override
-        public Boolean map (Integer numberOfInvalidIds)throws Exception {
-            return numberOfInvalidIds == 0;
-        }
-    }
+	private static final class InvalidIdsMap implements	MapFunction<Integer, Boolean> {
+		public Boolean map(Integer numberOfInvalidIds) throws Exception {
+			return numberOfInvalidIds == 0;
+		}
+	}
 
 }
\ No newline at end of file


Mime
View raw message