flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/2] flink git commit: [FLINK-1520] [gelly] Read edges and vertices from CSV files
Date Thu, 24 Sep 2015 09:40:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master bc21de2e4 -> d01d36970


[FLINK-1520] [gelly] Read edges and vertices from CSV files

This squashes the following commit:

[FLINK-1520] [gelly]Changed the methods for specifying types.
Created a new file for tests. Made appropriate changes in gelly_guide.md


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/702277fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/702277fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/702277fa

Branch: refs/heads/master
Commit: 702277faa47baf34133cdf3c7d13b13ac62887dd
Parents: bc21de2
Author: Shivani <shghatge@gmail.com>
Authored: Wed Jun 17 15:37:36 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Thu Sep 24 11:08:56 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  17 +
 .../main/java/org/apache/flink/graph/Graph.java |  46 ++
 .../org/apache/flink/graph/GraphCsvReader.java  | 462 +++++++++++++++++++
 .../graph/example/ConnectedComponents.java      |  49 +-
 .../example/GSASingleSourceShortestPaths.java   |  24 +-
 .../flink/graph/example/GraphMetrics.java       |  27 +-
 .../flink/graph/example/IncrementalSSSP.java    |  68 +--
 .../example/SingleSourceShortestPaths.java      |   2 +-
 .../operations/GraphCreationWithCsvITCase.java  | 214 +++++++++
 9 files changed, 805 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 562df31..b7af56b 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -104,6 +104,23 @@ DataSet<Tuple3<String, String, Double>> edgeTuples = env.readCsvFile("path/to/ed
 Graph<String, Long, Double> graph = Graph.fromTupleDataSet(vertexTuples, edgeTuples, env);
 {% endhighlight %}
 
+* from a CSV file with three fields and an optional CSV file with 2 fields. In this case, Gelly will convert each row from the CSV file containing edges data to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each row from the optional CSV file containing vertices will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value. A `typesEdges()` method is called on the GraphCsvReader object returned by `fromCsvReader()` to inform the CsvReader of the types of the  fields for Edges. If Edge doesn't have a value only type of Vertex Key is passed. `typesEdges()` method returns a GraphCsvReader on calling calling `typesVertices()` or `typesVerticesNullEdge()` returns the instance of Graph:
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+Graph<String, Long, NullValue> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class).typesVerticesNullEdge(String.class, Long.class);
+{% endhighlight %}
+
+If Vertices don't have a value, overloaded `typesVerticesNullEdge()` or `typesVertices()` Method should be used.
+
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+Graph<String, NullValue, Long> graph = Graph.fromCsvReader("path/to/vertex/input", "path/to/edge/input", env).typesEdges(String.class, Long.class).typesVerticesNullEdge(String.class);
+{% endhighlight %}
+
+
 * from a `Collection` of edges and an optional `Collection` of vertices:
 
 {% highlight java %}

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 84085c9..0153837 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -282,6 +282,52 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	* Creates a graph from CSV files.
+	*
+	* Vertices with value are created from a CSV file with 2 fields
+	* Edges with value are created from a CSV file with 3 fields
+	* @param verticesPath path to a CSV file with the Vertices data.
+	* @param edgesPath path to a CSV file with the Edges data
+	* @param context the flink execution environment.
+	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
+	*Vertex ID, Vertex Value and Edge value returns a Graph
+	*/
+	public static  GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
+		return new GraphCsvReader(verticesPath, edgesPath, context);
+	}
+	/** Creates a graph from a CSV file for Edges.Vertices are
+	* induced from the edges.
+	*
+	* Edges with value are created from a CSV file with 3 fields. Vertices are created
+	* automatically and their values are set to NullValue.
+	*
+	* @param edgesPath a path to a CSV file with the Edges data
+	* @param context the flink execution environment.
+	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader} , on which calling typesEdges() and typesVertices() methods to specify types of the
+	* Vertex ID, Vertex Value and Edge value returns a Graph
+	*/
+	public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
+		return new GraphCsvReader(edgesPath, context);
+	}
+
+	/**
+	 *Creates a graph from a CSV file for Edges., Vertices are
+	 * induced from the edges and vertex values are calculated by a mapper
+	 * function.  Edges with value are created from a CSV file with 3 fields.
+	 * Vertices are created automatically and their values are set by applying the provided map
+	 * function to the vertex ids.
+	 *
+	 * @param edgesPath a path to a CSV file with the Edges data
+	 * @param mapper the mapper function.
+	 * @param context the flink execution environment.
+	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader} ,on which calling typesEdges() and typesVertices() methods to specify types of the
+	 * Vertex ID, Vertex Value and Edge value returns a Graph
+	 */
+	public static GraphCsvReader fromCsvReader(String edgesPath, final MapFunction mapper, ExecutionEnvironment context) {
+		return new GraphCsvReader(edgesPath, mapper, context);
+	}
+
+	/**
 	 * @return the flink execution environment.
 	 */
 	public ExecutionEnvironment getContext() {

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
new file mode 100644
index 0000000..d4a5b30
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -0,0 +1,462 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.types.NullValue;
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with edge (vertices) data
+ * The class also configures the CSV readers used to read edges(vertices) data such as the field types,
+ * the delimiters (row and field),  the fields that should be included or skipped, and other flags
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in The {@link org.apache.flink.api.java.io.CsvReader} class.
+ */
+@SuppressWarnings({"unused" , "unchecked"})
+public class GraphCsvReader<K,VV,EV> {
+
+	private final Path vertexPath,edgePath;
+	private final ExecutionEnvironment executionContext;
+	protected CsvReader EdgeReader;
+	protected CsvReader VertexReader;
+	protected MapFunction<K, VV> mapper;
+	protected Class<K> vertexKey;
+	protected Class<VV> vertexValue;
+	protected Class<EV> edgeValue;
+
+//--------------------------------------------------------------------------------------------------------------------
+	public GraphCsvReader(Path vertexPath,Path edgePath, ExecutionEnvironment context) {
+		this.vertexPath = vertexPath;
+		this.edgePath = edgePath;
+		this.VertexReader = new CsvReader(vertexPath,context);
+		this.EdgeReader = new CsvReader(edgePath,context);
+		this.mapper=null;
+		this.executionContext=context;
+	}
+
+	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+		this.vertexPath = null;
+		this.edgePath = edgePath;
+		this.EdgeReader = new CsvReader(edgePath,context);
+		this.VertexReader = null;
+		this.mapper = null;
+		this.executionContext=context;
+	}
+
+	public GraphCsvReader(Path edgePath,final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+		this.vertexPath = null;
+		this.edgePath = edgePath;
+		this.EdgeReader = new CsvReader(edgePath,context);
+		this.VertexReader = null;
+		this.mapper = mapper;
+		this.executionContext=context;
+	}
+
+	public GraphCsvReader (String edgePath,ExecutionEnvironment context) {
+		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
+
+	}
+
+	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
+		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),
+				new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
+	}
+
+
+	public GraphCsvReader (String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")),mapper, context);
+	}
+
+	//--------------------------------------------------------------------------------------------------------------------
+	/**
+	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
+	 *
+	 * @param vertexKey The type of Vetex ID in the Graph.
+	 * @param  edgeValue The type of Edge Value in the returned Graph.
+	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
+	 */
+	public GraphCsvReader typesEdges(Class<K> vertexKey, Class<EV> edgeValue) {
+		this.vertexKey = vertexKey;
+		this.edgeValue = edgeValue;
+		return this;
+	}
+
+	/**
+	 * Specifies the types for the edges fields and returns this instance of GraphCsvReader
+	 * This method is overloaded for the case when the type of EdgeValue is NullValue
+	 * @param vertexKey The type of Vetex ID in the Graph.
+	 * @return The {@link org.apache.flink.graph.GraphCsvReader}
+	 */
+	public GraphCsvReader typesEdges(Class<K> vertexKey) {
+		this.vertexKey = vertexKey;
+		this.edgeValue = null;
+		return this;
+	}
+
+	/**
+	 * Specifies the types for the vertices fields and returns an instance of Graph
+	 * @param vertexKey The type of Vertex ID in the Graph.
+	 * @param vertexValue The type of Vertex Value in the Graph.
+	 * @return The {@link org.apache.flink.graph.Graph}
+	 */
+	public Graph<K, VV, EV> typesVertices(Class vertexKey, Class vertexValue) {
+		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey,this.vertexKey, this.edgeValue);
+		if(mapper == null && this.VertexReader != null) {
+		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
+		return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		} else if(this.mapper != null) {
+		return Graph.fromTupleDataSet(edges, this.mapper, executionContext);
+		} else {
+			return null;
+		}
+	}
+
+	/**
+	 * Specifies the types for the vertices fields and returns and instance of Graph
+	 * This method is overloaded for the case when vertices don't have a value
+	 * @param vertexKey The type of Vertex ID in the Graph.
+	 * @return The {@link org.apache.flink.graph.Graph}
+	 */
+	public Graph<K, NullValue, EV> typesVertices(Class vertexKey) {
+		DataSet<Tuple3<K, K, EV>> edges = this.EdgeReader.types(this.vertexKey, this.vertexKey, this.edgeValue);
+		return Graph.fromTupleDataSet(edges, executionContext);
+	}
+
+	/**
+	 * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value
+	 * @param vertexKey The type of Vertex ID in the Graph.
+	 * @param vertexValue The type of Vertex Value in the Graph.
+	 * @return The {@link org.apache.flink.graph.Graph}
+	 */
+	public Graph<K, VV, NullValue> typesVerticesNullEdge(Class vertexKey, Class vertexValue) {
+		DataSet<Tuple3<K, K, NullValue>> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey)
+				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> value) {
+						return new Tuple3<K, K, NullValue>(value.f0, value.f1, NullValue.getInstance());
+					}
+				});
+		if(this.mapper == null && this.VertexReader != null) {
+		DataSet<Tuple2<K, VV>> vertices = this.VertexReader.types(vertexKey, vertexValue);
+		return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		} else if (this.mapper != null) {
+			return Graph.fromTupleDataSet(edges, mapper, executionContext);
+		} else {
+		return null;
+		}
+	}
+
+	/**
+	 * Specifies the types for the vertices fields and returns an instance of Graph when Edges don't have a value
+	 * This method is overloaded for the case when vertices don't have a value
+	 * @param vertexKey The type of Vertex ID in the Graph.
+	 * @return The {@link org.apache.flink.graph.Graph}
+	 */
+	public Graph<K, NullValue, NullValue> typesVerticesNullEdge(Class vertexKey) {
+		DataSet<Tuple3<K, K, NullValue>> edges= this.EdgeReader.types(this.vertexKey, this.vertexKey)
+				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> value) {
+						return new Tuple3<K, K, NullValue>(value.f0, value.f1, NullValue.getInstance());
+					}
+				});
+		return Graph.fromTupleDataSet(edges, executionContext);
+	}
+
+	/**
+	 *Configures the Delimiter that separates rows for the CSV reader used to read the edges
+	 *	({@code '\n'}) is used by default.
+	 *
+	 *@param delimiter The delimiter that separates the rows.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader lineDelimiterEdges(String delimiter) {
+		this.EdgeReader.lineDelimiter(delimiter);
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates rows for the CSV reader used to read the vertices
+	 *	({@code '\n'}) is used by default.
+	 *
+	 *@param delimiter The delimiter that separates the rows.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader lineDelimiterVertices(String delimiter) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.lineDelimiter(delimiter);
+		}
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates fields in a row for the CSV reader used to read the vertices
+	 * ({@code ','}) is used by default.
+	 *
+	 * @param delimiter The delimiter that separates the fields in a row.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader fieldDelimiterVertices(String delimiter) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.fieldDelimiter(delimiter);
+		}
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates fields in a row for the CSV reader used to read the edges
+	 * ({@code ','}) is used by default.
+	 *
+	 * @param delimiter The delimiter that separates the fields in a row.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader fieldDelimiterEdges(String delimiter) {
+		this.EdgeReader.fieldDelimiter(delimiter);
+		return this;
+	}
+
+	/**
+	 * Enables quoted String parsing for Edge Csv Reader. Field delimiters in quoted Strings are ignored.
+	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
+	 * Leading or tailing whitespaces are not allowed.
+	 *
+	 * @param quoteCharacter The character which is used as quoting character.
+	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) {
+		this.EdgeReader.parseQuotedStrings(quoteCharacter);
+		return this;
+	}
+
+	/**
+	 * Enables quoted String parsing for Vertex Csv Reader. Field delimiters in quoted Strings are ignored.
+	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
+	 * Leading or tailing whitespaces are not allowed.
+	 *
+	 * @param quoteCharacter The character which is used as quoting character.
+	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.parseQuotedStrings(quoteCharacter);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures the string that starts comments for the Vertex Csv Reader.
+	 * By default comments will be treated as invalid lines.
+	 * This function only recognizes comments which start at the beginning of the line!
+	 *
+	 * @param commentPrefix The string that starts the comments.
+	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.ignoreComments(commentPrefix);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures the string that starts comments for the Edge Csv Reader.
+	 * By default comments will be treated as invalid lines.
+	 * This function only recognizes comments which start at the beginning of the line!
+	 *
+	 * @param commentPrefix The string that starts the comments.
+	 * @return The Graph csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreCommentsEdges(String commentPrefix) {
+		this.EdgeReader.ignoreComments(commentPrefix);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
+	 * array. The parser will skip over all fields where the boolean value at the corresponding position
+	 * in the array is {@code false}. The result contains the fields where the corresponding position in
+	 * the boolean array is {@code true}.
+	 * The number of fields in the result is consequently equal to the number of times that {@code true}
+	 * occurs in the fields array.
+	 *
+	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.includeFields(vertexFields);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
+	 * array. The parser will skip over all fields where the boolean value at the corresponding position
+	 * in the array is {@code false}. The result contains the fields where the corresponding position in
+	 * the boolean array is {@code true}.
+	 * The number of fields in the result is consequently equal to the number of times that {@code true}
+	 * occurs in the fields array.
+	 *
+	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) {
+		this.EdgeReader.includeFields(edgeFields);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * positions in the string (read from position 0 to its length) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
+	 * The parser will skip over all fields where the character at the corresponding position
+	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
+	 * {@code false}). The result contains the fields where the corresponding position in
+	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
+	 *
+	 * @param mask The string mask defining which fields to include and which to skip.
+	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(String mask) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.includeFields(mask);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * positions in the string (read from position 0 to its length) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
+	 * The parser will skip over all fields where the character at the corresponding position
+	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
+	 * {@code false}). The result contains the fields where the corresponding position in
+	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
+	 *
+	 * @param mask The string mask defining which fields to include and which to skip.
+	 * @return The Graph Csv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(String mask) {
+		this.EdgeReader.includeFields(mask);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * bits in the value (read from least significant to most significant) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
+	 * non-zero bit.
+	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
+	 * include the fields where the corresponding bit is one.
+	 * <p>
+	 * Examples:
+	 * <ul>
+	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
+	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
+	 *       two and three, skip fields four and five, and include field six.</li>
+	 * </ul>
+	 *
+	 * @param mask The bit mask defining which fields to include and which to skip.
+	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(long mask) {
+		if(this.VertexReader !=null) {
+			this.VertexReader.includeFields(mask);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * bits in the value (read from least significant to most significant) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
+	 * non-zero bit.
+	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
+	 * include the fields where the corresponding bit is one.
+	 * <p>
+	 * Examples:
+	 * <ul>
+	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
+	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
+	 *       two and three, skip fields four and five, and include field six.</li>
+	 * </ul>
+	 *
+	 * @param mask The bit mask defining which fields to include and which to skip.
+	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(long mask) {
+		this.EdgeReader.includeFields(mask);
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Edges file to ignore the first line. This is useful for files that contain a header line.
+	 *
+	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreFirstLineEdges() {
+		this.EdgeReader.ignoreFirstLine();
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Vertices file to ignore the first line. This is useful for files that contain a header line.
+	 *
+	 * @return The Graph CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreFirstLineVertices() {
+		if(this.VertexReader !=null) {
+			this.VertexReader.ignoreFirstLine();
+		}
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Edges file  to ignore any invalid lines.
+	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+	 *
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreInvalidLinesEdges() {
+		this.EdgeReader.ignoreInvalidLines();
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader Vertices file  to ignore any invalid lines.
+	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+	 *
+	 * @return The CSV reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreInvalidLinesVertices() {
+		if(this.VertexReader !=null) {
+			this.VertexReader.ignoreInvalidLines();
+		}
+		return this;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index 2b17a14..d8abd4e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -22,8 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
@@ -59,14 +57,8 @@ public class ConnectedComponents implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
-
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
-			@Override
-			public Long map(Long value) throws Exception {
-				return value;
-			}
-		}, env);
+		//util method getGraph is used
+		Graph<Long, Long, NullValue> graph = ConnectedComponents.getGraph(env);
 
 		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
 				.run(new GSAConnectedComponents(maxIterations)).getVertices();
@@ -118,24 +110,29 @@ public class ConnectedComponents implements ProgramDescription {
 
 		return true;
 	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.ignoreComments("#")
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
-						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
-							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+@SuppressWarnings("unchecked")
+	private static Graph<Long, Long, NullValue> getGraph(ExecutionEnvironment env) {
+		Graph<Long, Long, NullValue> graph;
+		if(!fileOutput) {
+			graph = Graph.fromDataSet(ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
+					new MapFunction<Long, Long>() {
+
+						public Long map(Long label) {
+							return label;
 						}
-					});
+					}, env);
 		} else {
-			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+			graph = Graph.fromCsvReader(edgeInputPath,new MapFunction<Long, Long>() {
+				public Long map(Long label) {
+					return label;
+				}
+			}, env).ignoreCommentsEdges("#")
+					.fieldDelimiterEdges("\t")
+					.lineDelimiterEdges("\n")
+					.typesEdges(Long.class)
+					.typesVerticesNullEdge(Long.class, Long.class);
+
 		}
+		return graph;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
index 9ea8fe2..cfa04e9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
@@ -30,7 +29,6 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example shows how to use Gelly's Gather-Sum-Apply iterations.
@@ -61,9 +59,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+		Graph<Long, Double, Double> graph = GSASingleSourceShortestPaths.getGraph(env);
 
 		// Execute the GSA iteration
 		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
@@ -113,7 +109,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double gather(Neighbor<Double, Double> neighbor) {
 			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
@@ -121,7 +117,7 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
-	};
+	}
 
 	@SuppressWarnings("serial")
 	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
@@ -172,15 +168,15 @@ public class GSASingleSourceShortestPaths implements ProgramDescription {
 		return true;
 	}
 
-	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+	@SuppressWarnings("unchecked")
+	private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
 		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.fieldDelimiter("\t")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
+			return Graph.fromCsvReader(edgesInputPath, new InitVertices(srcVertexId), env).fieldDelimiterEdges("\t")
+					.lineDelimiterEdges("\n")
+					.typesEdges(Long.class, Double.class)
+					.typesVertices(Long.class, Double.class);
 		} else {
-			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+			return Graph.fromDataSet(SingleSourceShortestPathsData.getDefaultEdgeDataSet(env), new InitVertices(srcVertexId), env);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
index 6c4d0c2..8d9beeb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.ExampleUtils;
 import org.apache.flink.types.NullValue;
@@ -57,7 +56,7 @@ public class GraphMetrics implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		/** create the graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
+		Graph<Long, NullValue, NullValue> graph = GraphMetrics.getGraph(env);
 		
 		/** get the number of vertices **/
 		long numVertices = graph.numberOfVertices();
@@ -150,21 +149,17 @@ public class GraphMetrics implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n").fieldDelimiter("\t")
-					.types(Long.class, Long.class).map(
-							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-									return new Edge<Long, NullValue>(value.f0, value.f1, 
-											NullValue.getInstance());
-								}
-					});
+	@SuppressWarnings({"serial", "unchecked"})
+	private static Graph<Long, NullValue, NullValue> getGraph(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return Graph.fromCsvReader(edgesInputPath, env)
+					.lineDelimiterEdges("\n")
+					.fieldDelimiterEdges("\t")
+					.typesEdges(Long.class)
+					.typesVerticesNullEdge(Long.class);
+
 		} else {
-			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+			return Graph.fromDataSet(ExampleUtils.getRandomEdges(env, NUM_VERTICES), env);
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
index cc672b2..adf9c02 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -31,8 +31,6 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example illustrates the usage of vertex-centric iteration's
@@ -77,18 +75,12 @@ public class IncrementalSSSP implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
-
-		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
-
-		DataSet<Edge<Long, Double>> edgesInSSSP = getEdgesinSSSPDataSet(env);
-
 		Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+		Graph<Long, Double, Double> graph = IncrementalSSSP.getGraph(env);
 
 		// Assumption: all minimum weight paths are kept
-		Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+		Graph<Long, Double, Double> ssspGraph = IncrementalSSSP.getSSSPGraph(env);
 
 		// remove the edge
 		graph.removeEdge(edgeToBeRemoved);
@@ -96,7 +88,7 @@ public class IncrementalSSSP implements ProgramDescription {
 		// configure the iteration
 		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
-		if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+		if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
 
 			parameters.setDirection(EdgeDirection.IN);
 			parameters.setOptDegrees(true);
@@ -110,24 +102,20 @@ public class IncrementalSSSP implements ProgramDescription {
 			// Emit results
 			if(fileOutput) {
 				resultedVertices.writeAsCsv(outputPath, "\n", ",");
-
-				// since file sinks are lazy, we trigger the execution explicitly
-				env.execute("Incremental SSSP Example");
 			} else {
 				resultedVertices.print();
 			}
 
+			env.execute("Incremental SSSP Example");
 		} else {
 			// print the vertices
 			if(fileOutput) {
-				vertices.writeAsCsv(outputPath, "\n", ",");
-
-				// since file sinks are lazy, we trigger the execution explicitly
-				env.execute("Incremental SSSP Example");
+				graph.getVertices().writeAsCsv(outputPath, "\n", ",");
 			} else {
-				vertices.print();
+				graph.getVertices().print();
 			}
 
+			env.execute("Incremental SSSP Example");
 		}
 	}
 
@@ -251,45 +239,31 @@ public class IncrementalSSSP implements ProgramDescription {
 		return true;
 	}
 
-	private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(verticesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new Tuple2ToVertexMap<Long, Double>());
+	@SuppressWarnings("unchecked")
+	private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n")
+					.typesEdges(Long.class, Double.class)
+					.typesVertices(Long.class, Double.class);
 		} else {
 			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
 					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
 					"<output path> <max iterations>");
-			return IncrementalSSSPData.getDefaultVertexDataSet(env);
+			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
 		}
 	}
 
-	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
-		} else {
-			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
-					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
-					"<output path> <max iterations>");
-			return IncrementalSSSPData.getDefaultEdgeDataSet(env);
-		}
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgesinSSSPDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInSSSPInputPath)
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new Tuple3ToEdgeMap<Long, Double>());
+	@SuppressWarnings("unchecked")
+	private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
+					.typesEdges(Long.class, Double.class)
+					.typesVertices(Long.class, Double.class);
 		} else {
 			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
 					"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
 					"<output path> <max iterations>");
-			return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
index 391ebaf..ef09bff 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPaths.java
@@ -196,4 +196,4 @@ public class SingleSourceShortestPaths implements ProgramDescription {
 	public String getDescription() {
 		return "Vertex-centric Single Source Shortest Paths";
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/702277fa/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
new file mode 100644
index 0000000..2b78d32
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithCsvITCase.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import com.google.common.base.Charsets;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class GraphCreationWithCsvITCase extends MultipleProgramsTestBase {
+
+	public GraphCreationWithCsvITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	private String expectedResult;
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCreateWithCsvFile() throws Exception {
+		/*
+		 * Test with two Csv files one with Vertex Data and one with Edges data
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent =  "1,1\n"+
+				"2,2\n"+
+				"3,3\n";
+		final FileInputSplit split = createTempFile(fileContent);
+		final String fileContent2 =  "1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+		final FileInputSplit split2 = createTempFile(fileContent2);
+
+		Graph<Long,Long,String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env)
+				.typesEdges(Long.class, String.class)
+				.typesVertices(Long.class, Long.class);
+
+		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,1,2,ot\n" +
+				"3,2,3,2,tt\n" +
+				"3,1,3,1,to\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCsvWithNullEdge() throws Exception {
+		/*
+		Test fromCsvReader with edge and vertex path and nullvalue for edge
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String vertexFileContent = "1,one\n"+
+				"2,two\n"+
+				"3,three\n";
+		final String edgeFileContent = "1,2\n"+
+				"3,2\n"+
+				"3,1\n";
+		final FileInputSplit split = createTempFile(vertexFileContent);
+		final FileInputSplit edgeSplit = createTempFile(edgeFileContent);
+
+		Graph<Long, String, NullValue> graph= Graph.fromCsvReader(split.getPath().toString(), edgeSplit.getPath().toString(),
+				env)
+				.typesEdges(Long.class)
+				.typesVerticesNullEdge(Long.class, String.class);
+
+		List<Triplet<Long, String, NullValue>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,one,two,(null)\n"+
+				"3,2,three,two,(null)\n"+
+				"3,1,three,one,(null)\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCsvWithConstantValueMapper() throws Exception {
+		/*
+		*Test fromCsvReader with edge path and a mapper that assigns a Double constant as value
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent =  "1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+		final FileInputSplit split = createTempFile(fileContent);
+
+		Graph<Long, Double, String> graph = Graph.fromCsvReader(split.getPath().toString(),
+				new AssignDoubleValueMapper(), env).typesEdges(Long.class, String.class)
+				.typesVertices(Long.class, Double.class);
+		List<Triplet<Long, Double, String>> result = graph.getTriplets().collect();
+		//graph.getTriplets().writeAsCsv(resultPath);
+		expectedResult = "1,2,0.1,0.1,ot\n" + "3,1,0.1,0.1,to\n" + "3,2,0.1,0.1,tt\n";
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCreateWithOnlyEdgesCsvFile() throws Exception {
+		/*
+		 * Test with one Csv file one with Edges data. Also tests the configuration method ignoreFistLineEdges()
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		final String fileContent2 =  "header\n1,2,ot\n"+
+				"3,2,tt\n"+
+				"3,1,to\n";
+
+		final FileInputSplit split2 = createTempFile(fileContent2);
+		Graph<Long,NullValue,String> graph= Graph.fromCsvReader(split2.getPath().toString(), env)
+				.ignoreFirstLineEdges()
+				.ignoreCommentsVertices("hi")
+				.typesEdges(Long.class, String.class)
+				.typesVertices(Long.class);
+
+		List<Triplet<Long, NullValue, String>> result = graph.getTriplets().collect();
+		expectedResult = "1,2,(null),(null),ot\n" +
+				"3,2,(null),(null),tt\n" +
+				"3,1,(null),(null),to\n";
+
+		compareResultAsTuples(result, expectedResult);
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testCreateCsvFileDelimiterConfiguration() throws Exception {
+		/*
+		 * Test with an Edge and Vertex csv file. Tests the configuration methods FieldDelimiterEdges and
+		 * FieldDelimiterVertices
+		 * Also tests the configuration methods LineDelimiterEdges and LineDelimiterVertices
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		final String fileContent =  "header\n1;1\n"+
+				"2;2\n"+
+				"3;3\n";
+
+		final FileInputSplit split = createTempFile(fileContent);
+
+		final String fileContent2 =  "header|1:2:ot|"+
+				"3:2:tt|"+
+				"3:1:to|";
+
+		final FileInputSplit split2 = createTempFile(fileContent2);
+
+		Graph<Long,Long,String> graph= Graph.fromCsvReader(split.getPath().toString(),split2.getPath().toString(),env).
+				ignoreFirstLineEdges().ignoreFirstLineVertices().
+				fieldDelimiterEdges(":").fieldDelimiterVertices(";").
+				lineDelimiterEdges("|").
+				typesEdges(Long.class, String.class)
+				.typesVertices(Long.class, Long.class);
+
+		List<Triplet<Long, Long, String>> result = graph.getTriplets().collect();
+
+		expectedResult = "1,2,1,2,ot\n" +
+				"3,2,3,2,tt\n" +
+				"3,1,3,1,to\n";
+
+		compareResultAsTuples(result, expectedResult);
+
+	}
+
+	/*----------------------------------------------------------------------------------------------------------------*/
+	@SuppressWarnings("serial")
+	private static final class AssignDoubleValueMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 0.1d;
+		}
+	}
+
+	private FileInputSplit createTempFile(String content) throws IOException {
+		File tempFile = File.createTempFile("test_contents", "tmp");
+		tempFile.deleteOnExit();
+
+		OutputStreamWriter wrt = new OutputStreamWriter(
+				new FileOutputStream(tempFile), Charsets.UTF_8
+		);
+		wrt.write(content);
+		wrt.close();
+
+		return new FileInputSplit(0, new Path(tempFile.toURI().toString()), 0,
+							tempFile.length(), new String[] {"localhost"});
+	}
+}


Mime
View raw message