flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [2/2] flink git commit: [FLINK-1520] [gelly] add types methods and make formatting changes to the graph csv reader
Date Thu, 24 Sep 2015 09:40:50 GMT
[FLINK-1520] [gelly] add types methods and make formatting changes to the graph csv reader

This squashes the following commits:

[FLINK-1520] [gelly] add named types methods for reading a Graph from CSV input,
with and without vertex/edge values. Change the examples and the tests accordingly.

[FLINK-1520] [gelly] corrections in Javadocs; updated documentation

This closes #1149


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d01d3697
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d01d3697
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d01d3697

Branch: refs/heads/master
Commit: d01d369701c8f79b06d60ee0f731392d3da9b912
Parents: 702277f
Author: vasia <vasia@apache.org>
Authored: Tue Jul 14 20:46:33 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Thu Sep 24 11:08:57 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  20 +-
 .../main/java/org/apache/flink/graph/Graph.java |  72 +++--
 .../org/apache/flink/graph/GraphCsvReader.java  | 310 ++++++++++---------
 .../graph/example/ConnectedComponents.java      |  51 +--
 .../example/GSASingleSourceShortestPaths.java   |  26 +-
 .../flink/graph/example/GraphMetrics.java       |  29 +-
 .../flink/graph/example/IncrementalSSSP.java    |  30 +-
 .../flink/graph/library/GSATriangleCount.java   |   1 +
 .../test/GatherSumApplyConfigurationITCase.java |   2 -
 .../operations/GraphCreationWithCsvITCase.java  |  30 +-
 10 files changed, 298 insertions(+), 273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index b7af56b..b6a0533 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -104,20 +104,24 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/ed
 Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
 {% endhighlight %}
 
-* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A `typesEdges()` method is called on the GraphCsvReader object returned by `fromCsvReader()` to inform the CsvReader of the types of the  fields for Edges. If Edge doesn't have a value only type of Vertex Key is passed. `typesEdges()` method returns a GraphCsvReader on calling calling `typesVertices()` or `typesVerticesNullEdge()` returns the instance of Graph:
+* from a CSV file of Edge data and an optional CSV file of Vertex data. In this case, Gelly will convert each row from the Edge CSV file to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field (if present) will be the edge value. Equivalently, each row from the optional Vertex CSV file will be converted to a `Vertex`, where the first field will be the vertex ID and the second field (if present) will be the vertex value. In order to get a `Graph` from a `GraphCsvReader` one has to specify the types, using one of the following methods:
 
-{% highlight java %}
-ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-Graph<String, Long, NullValue> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class).typesVerticesNullEdge(String.class, Long.class);
-{% endhighlight %}
+- `types(Class<K> vertexKey, Class<VV> vertexValue,Class<EV> edgeValue)`: both vertex and edge values are present.
+- `edgeTypes(Class<K> vertexKey, Class<EV> edgeValue)`: the Graph has edge values, but no vertex values.
+- `vertexTypes(Class<K> vertexKey, Class<VV> vertexValue)`: the Graph has vertex values, but no edge values.
+- `keyType(Class<K> vertexKey)`: the Graph has no vertex values and no edge values.
 
-If Vertices don't have a value, overloaded `typesVerticesNullEdge()` or `typesVertices()` Method should be used.
 
 {% highlight java %}
 ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-Graph<String, NullValue, Long> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class, Long.class).typesVerticesNullEdge(String.class);
+// create a Graph with String Vertex IDs, Long Vertex values and Double Edge values
+Graph<String, Long, Double> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env)
+					.types(String.class, Long.class, Double.class);
+
+
+// create a Graph with no Vertex or Edge values
+Graph<Long, NullValue, NullValue> simpleGraph = Graph.fromCsvReader("path/to/edge/input", env).keyType(Long.class);
 {% endhighlight %}
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 0153837..b24f749 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -137,7 +137,7 @@ public class Graph<K, VV, EV> {
 	 * @return the newly created graph.
 	 */
 	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
-			final MapFunction<K, VV> mapper,ExecutionEnvironment context) {
+			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 
 		return fromDataSet(context.fromCollection(edges), mapper, context);
 	}
@@ -282,48 +282,57 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	* Creates a graph from CSV files.
-	*
-	* Vertices with value are created from a CSV file with 2 fields
-	* Edges with value are created from a CSV file with 3 fields
-	* @param verticesPath path to a CSV file with the Vertices data.
-	* @param edgesPath path to a CSV file with the Edges data
-	* @param context the flink execution environment.
-	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
-	*Vertex ID, Vertex Value and Edge value returns a Graph
+	* Creates a Graph from a CSV file of vertices and a CSV file of edges.
+	* 
+	* @param verticesPath path to a CSV file with the Vertex data.
+	* @param edgesPath path to a CSV file with the Edge data
+	* @param context the Flink execution environment.
+	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader}, 
+	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
+	* 
+	* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+	* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+	* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+	* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
 	*/
-	public static  GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
+	public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
 		return new GraphCsvReader(verticesPath, edgesPath, context);
 	}
-	/** Creates a graph from a CSV file for Edges.Vertices are
-	* induced from the edges.
-	*
-	* Edges with value are created from a CSV file with 3 fields. Vertices are created
-	* automatically and their values are set to NullValue.
+
+	/** 
+	* Creates a graph from a CSV file of edges. Vertices will be created automatically.
 	*
 	* @param edgesPath a path to a CSV file with the Edges data
-	* @param context the flink execution environment.
-	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
-	* Vertex ID, Vertex Value and Edge value returns a Graph
+	* @param context the execution environment.
+	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
+	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
+	* 
+	* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+	* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+	* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+	* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
 	*/
 	public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
 		return new GraphCsvReader(edgesPath, context);
 	}
 
-	/**
-	 *Creates a graph from a CSV file for Edges., Vertices are
-	 * induced from the edges and vertex values are calculated by a mapper
-	 * function.  Edges with value are created from a CSV file with 3 fields.
-	 * Vertices are created automatically and their values are set by applying the provided map
-	 * function to the vertex ids.
+	/** 
+	 * Creates a graph from a CSV file of edges. Vertices will be created automatically and
+	 * Vertex values are set by the provided mapper.
 	 *
-	 * @param edgesPath a path to a CSV file with the Edges data
+	 * @param edgesPath a path to a CSV file with the Edge data
 	 * @param mapper the mapper function.
-	 * @param context the flink execution environment.
-	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} ,on which calling typesEdges() and typesVertices() methods to specify types of the
-	 * Vertex ID, Vertex Value and Edge value returns a Graph
+	 * @param context the execution environment.
+	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
+	 * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
+	 * 
+	 * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
+	 * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
+	 * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
+	 * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
 	 */
-	public static GraphCsvReader fromCsvReader(String edgesPath, final MapFunction mapper, ExecutionEnvironment context) {
+	public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
+			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 		return new GraphCsvReader(edgesPath, mapper, context);
 	}
 
@@ -412,7 +421,6 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-
 	/**
 	 * Apply a function to the attribute of each vertex in the graph.
 	 * 
@@ -1898,7 +1906,7 @@ public class Graph<K, VV, EV> {
 	 *            the function to apply to the neighborhood
 	 * @param direction
 	 *            the edge direction (in-, out-, all-)
-	 * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
+	 * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
 	 * @throws IllegalArgumentException
 	 */
 	public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index d4a5b30..f64c701 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -18,62 +18,65 @@
 
 package org.apache.flink.graph;
 import com.google.common.base.Preconditions;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.io.CsvReader;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
 /**
- * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
- * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
- * the delimiters (row and field),  the fields that should be included or skipped, and other flags
+ * A class to build a Graph using path(s) provided to CSV file(s) with optional vertex and edge data.
+ * The class also configures the CSV readers used to read edge and vertex data such as the field types,
+ * the delimiters (row and field), the fields that should be included or skipped, and other flags,
  * such as whether to skip the initial line as the header.
- * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
+ * The configuration is done using the functions provided in the {@link org.apache.flink.api.java.io.CsvReader} class.
  */
-@SuppressWarnings({"unused" , "unchecked"})
-public class GraphCsvReader<K,VV,EV> {
 
-	private final Path vertexPath,edgePath;
+public class GraphCsvReader {
+
+	@SuppressWarnings("unused")
+	private final Path vertexPath, edgePath;
 	private final ExecutionEnvironment executionContext;
-	protected CsvReader EdgeReader;
-	protected CsvReader VertexReader;
-	protected MapFunction<K, VV> mapper;
-	protected Class<K> vertexKey;
-	protected Class<VV> vertexValue;
-	protected Class<EV> edgeValue;
+	protected CsvReader edgeReader;
+	protected CsvReader vertexReader;
+	protected MapFunction<?, ?> mapper;
+	protected Class<?> vertexKey;
+	protected Class<?> vertexValue;
+	protected Class<?> edgeValue;
 
 //--------------------------------------------------------------------------------------------------------------------
-	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
+	public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) {
 		this.vertexPath = vertexPath;
 		this.edgePath = edgePath;
-		this.VertexReader = new CsvReader(vertexPath,context);
-		this.EdgeReader = new CsvReader(edgePath,context);
-		this.mapper=null;
-		this.executionContext=context;
+		this.vertexReader = new CsvReader(vertexPath, context);
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.mapper = null;
+		this.executionContext = context;
 	}
 
 	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
 		this.vertexPath = null;
 		this.edgePath = edgePath;
-		this.EdgeReader = new CsvReader(edgePath,context);
-		this.VertexReader = null;
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.vertexReader = null;
 		this.mapper = null;
-		this.executionContext=context;
+		this.executionContext = context;
 	}
 
-	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+	public <K, VV> GraphCsvReader(Path edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 		this.vertexPath = null;
 		this.edgePath = edgePath;
-		this.EdgeReader = new CsvReader(edgePath,context);
-		this.VertexReader = null;
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.vertexReader = null;
 		this.mapper = mapper;
-		this.executionContext=context;
+		this.executionContext = context;
 	}
 
-	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
+	public GraphCsvReader (String edgePath, ExecutionEnvironment context) {
 		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
 
 	}
@@ -84,102 +87,123 @@ public class GraphCsvReader<K,VV,EV> {
 	}
 
 
-	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
+	public <K, VV> GraphCsvReader(String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), mapper, context);
 	}
 
-	//--------------------------------------------------------------------------------------------------------------------
 	/**
-	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
-	 *
-	 * @param vertexKey The type of Vetex ID in the Graph.
-	 * @param  edgeValue The type of Edge Value in the returned Graph.
-	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
+	 * Creates a Graph from CSV input with vertex values and edge values.
+	 * The vertex values are specified through a vertices input file or a user-defined map function.
+	 * 
+	 * @param vertexKey the type of the vertex IDs
+	 * @param vertexValue the type of the vertex values
+	 * @param edgeValue the type of the edge values
+	 * @return a Graph with vertex and edge values.
 	 */
-	public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> edgeValue) {
-		this.vertexKey = vertexKey;
-		this.edgeValue = edgeValue;
-		return this;
-	}
+	@SuppressWarnings("unchecked")
+	public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
+			Class<EV> edgeValue) {
 
-	/**
-	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
-	 * This method is overloaded for the case when the type of EdgeValue is NullValue
-	 * @param vertexKey The type of Vetex ID in the Graph.
-	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
-	 */
-	public GraphCsvReader typesEdges(Class<K> vertexKey) {
-		this.vertexKey = vertexKey;
-		this.edgeValue = null;
-		return this;
-	}
+		DataSet<Tuple2<K, VV>> vertices = null;
 
-	/**
-	 * Specifies the types for the vertices fields and returns an instance of Graph
-	 * @param vertexKey The type of Vertex ID in the Graph.
-	 * @param vertexValue The type of Vertex Value in the Graph.
-	 * @return The {@link org.apache.flink.graph.Graph}
-	 */
-	public Graph<K, VV, EV> typesVertices(Class vertexKey, Class vertexValue) {
-		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue);
-		if(mapper == null && this.VertexReader != null) {
-		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
-		return Graph.fromTupleDataSet(vertices, edges, executionContext);
-		} else if(this.mapper != null) {
-		return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
-		} else {
-			return null;
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+
+		// the vertex value can be provided by an input file or a user-defined mapper
+		if (vertexReader != null) {
+			vertices = vertexReader.types(vertexKey, vertexValue);
+			return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		}
+		else if (mapper != null) {
+			return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+		}
+		else {
+			throw new RuntimeException("Vertex values have to be specified through a vertices input file"
+					+ "or a user-defined map function.");
 		}
 	}
 
 	/**
-	 * Specifies the types for the vertices fields and returns and instance of Graph
-	 * This method is overloaded for the case when vertices don't have a value
-	 * @param vertexKey The type of Vertex ID in the Graph.
-	 * @return The {@link org.apache.flink.graph.Graph}
+	 * Creates a Graph from CSV input with edge values, but without vertex values.
+	 * @param vertexKey the type of the vertex IDs
+	 * @param edgeValue the type of the edge values
+	 * @return a Graph where the edges are read from an edges CSV file (with values).
 	 */
-	public Graph<K, NullValue, EV> typesVertices(Class vertexKey) {
-		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey, this.vertexKey, this.edgeValue);
+	public <K, EV> Graph<K, NullValue, EV> edgeTypes(Class<K> vertexKey, Class<EV> edgeValue) {
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+
 		return Graph.fromTupleDataSet(edges, executionContext);
 	}
 
 	/**
-	 * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value
-	 * @param vertexKey The type of Vertex ID in the Graph.
-	 * @param vertexValue The type of Vertex Value in the Graph.
-	 * @return The {@link org.apache.flink.graph.Graph}
+	 * Creates a Graph from CSV input without vertex values or edge values.
+	 * @param vertexKey the type of the vertex IDs
+	 * @return a Graph where the vertex IDs are read from the edges input file.
 	 */
-	public Graph<K, VV, NullValue> typesVerticesNullEdge(Class vertexKey, Class vertexValue) {
-		DataSet<Tuple3<K, K, NullValue>> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey)
+	public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		@SuppressWarnings("serial")
+		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
 				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
-					public Tuple3<K, K, NullValue> map(Tuple2<K, K> value) {
-						return new Tuple3<K, K, NullValue>(value.f0, value.f1, NullValue.getInstance());
+
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
+						return new Tuple3<K, K, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
 					}
-				});
-		if(this.mapper == null && this.VertexReader != null) {
-		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
-		return Graph.fromTupleDataSet(vertices, edges, executionContext);
-		} else if (this.mapper != null) {
-			return Graph.fromTupleDataSet(edges, mapper, executionContext);
-		} else {
-		return null;
-		}
+				}).withForwardedFields("f0;f1");;
+
+		return Graph.fromTupleDataSet(edges, executionContext);
 	}
 
 	/**
-	 * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value
-	 * This method is overloaded for the case when vertices don't have a value
-	 * @param vertexKey The type of Vertex ID in the Graph.
-	 * @return The {@link org.apache.flink.graph.Graph}
+	 * Creates a Graph from CSV input without edge values.
+	 * The vertex values are specified through a vertices input file or a user-defined map function.
+	 * If no vertices input file is provided, the vertex IDs are automatically created from the edges
+	 * input file.
+	 * @param vertexKey the type of the vertex IDs
+	 * @param vertexValue the type of the vertex values
+	 * @return a Graph where the vertex IDs and vertex values.
 	 */
-	public Graph<K, NullValue, NullValue> typesVerticesNullEdge(Class vertexKey) {
-		DataSet<Tuple3<K, K, NullValue>> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey)
-				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
-					public Tuple3<K, K, NullValue> map(Tuple2<K, K> value) {
-						return new Tuple3<K, K, NullValue>(value.f0, value.f1, NullValue.getInstance());
+	@SuppressWarnings({ "serial", "unchecked" })
+	public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
+		
+		DataSet<Tuple2<K, VV>> vertices = null;
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
+				.map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
+
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
+						return new Tuple3<K, K, NullValue>(input.f0, input.f1, NullValue.getInstance());
 					}
-				});
-		return Graph.fromTupleDataSet(edges, executionContext);
+				}).withForwardedFields("f0;f1");
+
+		// the vertex value can be provided by an input file or a user-defined mapper
+		if (vertexReader != null) {
+			vertices = vertexReader.types(vertexKey, vertexValue);
+			return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		}
+		else if (mapper != null) {
+			return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+		}
+		else {
+			throw new RuntimeException("Vertex values have to be specified through a vertices input file"
+					+ "or a user-defined map function.");
+		}
 	}
 
 	/**
@@ -187,10 +211,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 *	({@code '\n'}) is used by default.
 	 *
 	 *@param delimiter The delimiter that separates the rows.
-	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader lineDelimiterEdges(String delimiter) {
-		this.EdgeReader.lineDelimiter(delimiter);
+		edgeReader.lineDelimiter(delimiter);
 		return this;
 	}
 
@@ -199,11 +223,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 *	({@code '\n'}) is used by default.
 	 *
 	 *@param delimiter The delimiter that separates the rows.
-	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader lineDelimiterVertices(String delimiter) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.lineDelimiter(delimiter);
+		if(this.vertexReader != null) {
+			this.vertexReader.lineDelimiter(delimiter);
 		}
 		return this;
 	}
@@ -216,8 +240,8 @@ public class GraphCsvReader<K,VV,EV> {
 	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader fieldDelimiterVertices(String delimiter) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.fieldDelimiter(delimiter);
+		if(this.vertexReader != null) {
+			this.vertexReader.fieldDelimiter(delimiter);
 		}
 		return this;
 	}
@@ -230,7 +254,7 @@ public class GraphCsvReader<K,VV,EV> {
 	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader fieldDelimiterEdges(String delimiter) {
-		this.EdgeReader.fieldDelimiter(delimiter);
+		this.edgeReader.fieldDelimiter(delimiter);
 		return this;
 	}
 
@@ -240,10 +264,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 * Leading or tailing whitespaces are not allowed.
 	 *
 	 * @param quoteCharacter The character which is used as quoting character.
-	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) {
-		this.EdgeReader.parseQuotedStrings(quoteCharacter);
+		this.edgeReader.parseQuotedStrings(quoteCharacter);
 		return this;
 	}
 
@@ -253,11 +277,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * Leading or tailing whitespaces are not allowed.
 	 *
 	 * @param quoteCharacter The character which is used as quoting character.
-	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.parseQuotedStrings(quoteCharacter);
+		if(this.vertexReader != null) {
+			this.vertexReader.parseQuotedStrings(quoteCharacter);
 		}
 		return this;
 	}
@@ -268,11 +292,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * This function only recognizes comments which start at the beginning of the line!
 	 *
 	 * @param commentPrefix The string that starts the comments.
-	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.ignoreComments(commentPrefix);
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreComments(commentPrefix);
 		}
 		return this;
 	}
@@ -283,10 +307,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 * This function only recognizes comments which start at the beginning of the line!
 	 *
 	 * @param commentPrefix The string that starts the comments.
-	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreCommentsEdges(String commentPrefix) {
-		this.EdgeReader.ignoreComments(commentPrefix);
+		this.edgeReader.ignoreComments(commentPrefix);
 		return this;
 	}
 
@@ -300,11 +324,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * occurs in the fields array.
 	 *
 	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
-	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.includeFields(vertexFields);
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(vertexFields);
 		}
 		return this;
 	}
@@ -319,10 +343,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 * occurs in the fields array.
 	 *
 	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
-	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) {
-		this.EdgeReader.includeFields(edgeFields);
+		this.edgeReader.includeFields(edgeFields);
 		return this;
 	}
 
@@ -337,11 +361,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
 	 *
 	 * @param mask The string mask defining which fields to include and which to skip.
-	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(String mask) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.includeFields(mask);
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(mask);
 		}
 		return this;
 	}
@@ -357,10 +381,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
 	 *
 	 * @param mask The string mask defining which fields to include and which to skip.
-	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsEdges(String mask) {
-		this.EdgeReader.includeFields(mask);
+		this.edgeReader.includeFields(mask);
 		return this;
 	}
 
@@ -381,11 +405,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * </ul>
 	 *
 	 * @param mask The bit mask defining which fields to include and which to skip.
-	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(long mask) {
-		if(this.VertexReader !=null) {
-			this.VertexReader.includeFields(mask);
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(mask);
 		}
 		return this;
 	}
@@ -407,31 +431,31 @@ public class GraphCsvReader<K,VV,EV> {
 	 * </ul>
 	 *
 	 * @param mask The bit mask defining which fields to include and which to skip.
-	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsEdges(long mask) {
-		this.EdgeReader.includeFields(mask);
+		this.edgeReader.includeFields(mask);
 		return this;
 	}
 
 	/**
 	 * Sets the CSV reader for the Edges file to ignore the first line. This is useful for files that contain a header line.
 	 *
-	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreFirstLineEdges() {
-		this.EdgeReader.ignoreFirstLine();
+		this.edgeReader.ignoreFirstLine();
 		return this;
 	}
 
 	/**
 	 * Sets the CSV reader for the Vertices file to ignore the first line. This is useful for files that contain a header line.
 	 *
-	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreFirstLineVertices() {
-		if(this.VertexReader !=null) {
-			this.VertexReader.ignoreFirstLine();
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreFirstLine();
 		}
 		return this;
 	}
@@ -440,10 +464,10 @@ public class GraphCsvReader<K,VV,EV> {
 	 * Sets the CSV reader for the Edges file  to ignore any invalid lines.
 	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
 	 *
-	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreInvalidLinesEdges() {
-		this.EdgeReader.ignoreInvalidLines();
+		this.edgeReader.ignoreInvalidLines();
 		return this;
 	}
 
@@ -451,11 +475,11 @@ public class GraphCsvReader<K,VV,EV> {
 	 * Sets the CSV reader Vertices file  to ignore any invalid lines.
 	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
 	 *
-	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreInvalidLinesVertices() {
-		if(this.VertexReader !=null) {
-			this.VertexReader.ignoreInvalidLines();
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreInvalidLines();
 		}
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index d8abd4e..bd08190 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -22,6 +22,8 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
@@ -57,8 +59,14 @@ public class ConnectedComponents implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		//util method getGraph is used
-		Graph<Long, Long, NullValue> graph = ConnectedComponents.getGraph(env);
+		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
+			@Override
+			public Long map(Long value) throws Exception {
+				return value;
+			}
+		}, env);
 
 		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
 				.run(new GSAConnectedComponents(maxIterations)).getVertices();
@@ -110,29 +118,24 @@ public class ConnectedComponents implements ProgramDescription {
 
 		return true;
 	}
-@SuppressWarnings("unchecked")
-	private static Graph<Long, Long, NullValue> getGraph(ExecutionEnvironment env) {
-		Graph<Long, Long, NullValue> graph;
-		if(!fileOutput) {
-			graph = Graph.fromDataSet(ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
-					new MapFunction<Long, Long>() {
-
-						public Long map(Long label) {
-							return label;
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
 						}
-					}, env);
+					});
 		} else {
-			graph = Graph.fromCsvReader(edgeInputPath,new MapFunction<Long, Long>() {
-				public Long map(Long label) {
-					return label;
-				}
-			}, env).ignoreCommentsEdges("#")
-					.fieldDelimiterEdges("\t")
-					.lineDelimiterEdges("\n")
-					.typesEdges(Long.class)
-					.typesVerticesNullEdge(Long.class, Long.class);
-
+			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
 		}
-		return graph;
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index cfa04e9..635a099 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
@@ -29,6 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example shows how to use Gelly's Gather-Sum-Apply iterations.
@@ -59,7 +61,9 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		Graph<Long, Double, Double> graph = GSASingleSourceShortestPaths.getGraph(env);
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
 		// Execute the GSA iteration
 		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
@@ -109,7 +113,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double gather(Neighbor<Double, Double> neighbor) {
 			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
 		}
-	}
+	};
 
 	@SuppressWarnings("serial")
 	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -117,7 +121,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	}
+	};
 
 	@SuppressWarnings("serial")
 	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
@@ -168,15 +172,15 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings("unchecked")
-	private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
-			return Graph.fromCsvReader(edgesInputPath, new InitVertices(srcVertexId), env).fieldDelimiterEdges("\t")
-					.lineDelimiterEdges("\n")
-					.typesEdges(Long.class, Double.class)
-					.typesVertices(Long.class, Double.class);
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
 		} else {
-			return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new InitVertices(srcVertexId), env);
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}
 	}
 
@@ -184,4 +188,4 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 	public String getDescription() {
 		return "GSA Single Source Shortest Paths";
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 8d9beeb..591ed26 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
@@ -56,7 +57,7 @@ public class GraphMetrics implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		/** create the graph **/
-		Graph<Long, NullValue, NullValue> graph = GraphMetrics.getGraph(env);
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
 		
 		/** get the number of vertices **/
 		long numVertices = graph.numberOfVertices();
@@ -149,17 +150,21 @@ public class GraphMetrics implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings({"serial", "unchecked"})
-	private static Graph<Long, NullValue, NullValue> getGraph(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return Graph.fromCsvReader(edgesInputPath, env)
-					.lineDelimiterEdges("\n")
-					.fieldDelimiterEdges("\t")
-					.typesEdges(Long.class)
-					.typesVerticesNullEdge(Long.class);
-
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(Long.class, Long.class).map(
+							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
+									return new Edge<Long, NullValue>(value.f0, value.f1, 
+											NullValue.getInstance());
+								}
+					});
 		} else {
-			return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env);
+			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index adf9c02..c03937d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -33,8 +33,11 @@ import org.apache.flink.graph.spargel.VertexCentricConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
- * This example illustrates the usage of vertex-centric iteration's
- * messaging direction configuration option.
+ * This example illustrates how to 
+ * <ul>
+ *  <li> create a Graph directly from CSV files
+ *  <li> use the vertex-centric iteration's messaging direction configuration option
+ * </ul>
  * 
  * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
  * upon edge removal.
@@ -102,20 +105,18 @@ public class IncrementalSSSP implements ProgramDescription {
 			// Emit results
 			if(fileOutput) {
 				resultedVertices.writeAsCsv(outputPath, "\n", ",");
+				env.execute("Incremental SSSP Example");
 			} else {
 				resultedVertices.print();
 			}
-
-			env.execute("Incremental SSSP Example");
 		} else {
 			// print the vertices
 			if(fileOutput) {
 				graph.getVertices().writeAsCsv(outputPath, "\n", ",");
+				env.execute("Incremental SSSP Example");
 			} else {
 				graph.getVertices().print();
 			}
-
-			env.execute("Incremental SSSP Example");
 		}
 	}
 
@@ -239,30 +240,20 @@ public class IncrementalSSSP implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings("unchecked")
 	private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
 		if(fileOutput) {
 			return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n")
-					.typesEdges(Long.class, Double.class)
-					.typesVertices(Long.class, Double.class);
+					.types(Long.class, Double.class, Double.class);
 		} else {
-			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
-					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
-					"<output path> <max iterations>");
 			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
 		}
 	}
 
-	@SuppressWarnings("unchecked")
 	private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) {
 		if(fileOutput) {
 			return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
-					.typesEdges(Long.class, Double.class)
-					.typesVertices(Long.class, Double.class);
+					.types(Long.class, Double.class, Double.class);
 		} else {
-			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
-					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
-					"<output path> <max iterations>");
 			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
 		}
 	}
@@ -271,9 +262,6 @@ public class IncrementalSSSP implements ProgramDescription {
 		if (fileOutput) {
 			return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved);
 		} else {
-			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
-					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
-					"<output path> <max iterations>");
 			return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 95c70bf..3d4d902 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -52,6 +52,7 @@ import java.util.TreeMap;
 public class GSATriangleCount implements
 		GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
 
+	@SuppressWarnings("serial")
 	@Override
 	public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 75f174c..67d32a8 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -401,6 +401,4 @@ public class GatherSumApplyConfigurationITCase extends MultipleProgramsTestBase
 			}
 		}
 	}
-
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d01d3697/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
index 2b78d32..99c66ec 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -46,7 +46,6 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 	private String expectedResult;
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCreateWithCsvFile() throws Exception {
 		/*
 		 * Test with two Csv files one with Vertex Data and one with Edges data
@@ -61,9 +60,8 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 				"3,1,to\n";
 		final FileInputSplit split2 = createTempFile(fileContent2);
 
-		Graph<Long,Long,String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env)
-				.typesEdges(Long.class, String.class)
-				.typesVertices(Long.class, Long.class);
+		Graph<Long, Long, String> graph = Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env)
+				.types(Long.class, Long.class, String.class);
 
 		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
 
@@ -75,7 +73,6 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCsvWithNullEdge() throws Exception {
 		/*
 		Test fromCsvReader with edge and vertex path and nullvalue for edge
@@ -90,10 +87,8 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 		final FileInputSplit split = createTempFile(vertexFileContent);
 		final FileInputSplit edgeSplit = createTempFile(edgeFileContent);
 
-		Graph<Long, String, NullValue> graph= Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(),
-				env)
-				.typesEdges(Long.class)
-				.typesVerticesNullEdge(Long.class, String.class);
+		Graph<Long, String, NullValue> graph = Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(),
+				env).vertexTypes(Long.class, String.class);
 
 		List<Triplet<Long, String, NullValue>> result = graph.getTriplets().collect();
 
@@ -105,7 +100,6 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCsvWithConstantValueMapper() throws Exception {
 		/*
 		*Test fromCsvReader with edge path and a mapper that assigns a Double constant as value
@@ -117,8 +111,8 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 		final FileInputSplit split = createTempFile(fileContent);
 
 		Graph<Long, Double, String> graph = Graph.fromCsvReader(split.getPath().toString(),
-				new AssignDoubleValueMapper(), env).typesEdges(Long.class, String.class)
-				.typesVertices(Long.class, Double.class);
+				new AssignDoubleValueMapper(), env).types(Long.class, Double.class, String.class);
+
 		List<Triplet<Long, Double, String>> result = graph.getTriplets().collect();
 		//graph.getTriplets().writeAsCsv(resultPath);
 		expectedResult = "1,2,0.1,0.1,ot\n" + "3,1,0.1,0.1,to\n" + "3,2,0.1,0.1,tt\n";
@@ -126,7 +120,6 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCreateWithOnlyEdgesCsvFile() throws Exception {
 		/*
 		 * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges()
@@ -137,11 +130,10 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 				"3,1,to\n";
 
 		final FileInputSplit split2 = createTempFile(fileContent2);
-		Graph<Long,NullValue,String> graph= Graph.fromCsvReader(split2.getPath().toString(), env)
+		Graph<Long, NullValue, String> graph= Graph.fromCsvReader(split2.getPath().toString(), env)
 				.ignoreFirstLineEdges()
 				.ignoreCommentsVertices("hi")
-				.typesEdges(Long.class, String.class)
-				.typesVertices(Long.class);
+				.edgeTypes(Long.class, String.class);
 
 		List<Triplet<Long, NullValue, String>> result = graph.getTriplets().collect();
 		expectedResult = "1,2,(null),(null),ot\n" +
@@ -152,7 +144,6 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	@SuppressWarnings("unchecked")
 	public void testCreateCsvFileDelimiterConfiguration() throws Exception {
 		/*
 		 * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and
@@ -173,12 +164,11 @@ public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
 
 		final FileInputSplit split2 = createTempFile(fileContent2);
 
-		Graph<Long,Long,String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env).
+		Graph<Long, Long, String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env).
 				ignoreFirstLineEdges().ignoreFirstLineVertices().
 				fieldDelimiterEdges(":").fieldDelimiterVertices(";").
 				lineDelimiterEdges("|").
-				typesEdges(Long.class, String.class)
-				.typesVertices(Long.class, Long.class);
+				types(Long.class, Long.class, String.class);
 
 		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
 


Mime
View raw message