flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [20/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there
Date Fri, 09 Oct 2015 16:05:58 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
new file mode 100644
index 0000000..f64c701
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+import com.google.common.base.Preconditions;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvReader;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.api.java.ExecutionEnvironment;
+
+/**
+ * A class to build a Graph using path(s) provided to CSV file(s) with optional vertex and edge data.
+ * The class also configures the CSV readers used to read edge and vertex data such as the field types,
+ * the delimiters (row and field), the fields that should be included or skipped, and other flags,
+ * such as whether to skip the initial line as the header.
+ * The configuration is done using the functions provided in the {@link org.apache.flink.api.java.io.CsvReader} class.
+ */
+
+public class GraphCsvReader {
+
+	@SuppressWarnings("unused")
+	private final Path vertexPath, edgePath;
+	private final ExecutionEnvironment executionContext;
+	protected CsvReader edgeReader;
+	protected CsvReader vertexReader;
+	protected MapFunction<?, ?> mapper;
+	protected Class<?> vertexKey;
+	protected Class<?> vertexValue;
+	protected Class<?> edgeValue;
+
+//--------------------------------------------------------------------------------------------------------------------
+	public GraphCsvReader(Path vertexPath, Path edgePath, ExecutionEnvironment context) {
+		this.vertexPath = vertexPath;
+		this.edgePath = edgePath;
+		this.vertexReader = new CsvReader(vertexPath, context);
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.mapper = null;
+		this.executionContext = context;
+	}
+
+	public GraphCsvReader(Path edgePath, ExecutionEnvironment context) {
+		this.vertexPath = null;
+		this.edgePath = edgePath;
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.vertexReader = null;
+		this.mapper = null;
+		this.executionContext = context;
+	}
+
+	public <K, VV> GraphCsvReader(Path edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+		this.vertexPath = null;
+		this.edgePath = edgePath;
+		this.edgeReader = new CsvReader(edgePath, context);
+		this.vertexReader = null;
+		this.mapper = mapper;
+		this.executionContext = context;
+	}
+
+	public GraphCsvReader (String edgePath, ExecutionEnvironment context) {
+		this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
+
+	}
+
+	public GraphCsvReader(String vertexPath, String edgePath, ExecutionEnvironment context) {
+		this(new Path(Preconditions.checkNotNull(vertexPath, "The file path may not be null.")),
+				new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
+	}
+
+
+	public <K, VV> GraphCsvReader(String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), mapper, context);
+	}
+
+	/**
+	 * Creates a Graph from CSV input with vertex values and edge values.
+	 * The vertex values are specified through a vertices input file or a user-defined map function.
+	 * 
+	 * @param vertexKey the type of the vertex IDs
+	 * @param vertexValue the type of the vertex values
+	 * @param edgeValue the type of the edge values
+	 * @return a Graph with vertex and edge values.
+	 */
+	@SuppressWarnings("unchecked")
+	public <K, VV, EV> Graph<K, VV, EV> types(Class<K> vertexKey, Class<VV> vertexValue,
+			Class<EV> edgeValue) {
+
+		DataSet<Tuple2<K, VV>> vertices = null;
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+
+		// the vertex value can be provided by an input file or a user-defined mapper
+		if (vertexReader != null) {
+			vertices = vertexReader.types(vertexKey, vertexValue);
+			return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		}
+		else if (mapper != null) {
+			return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+		}
+		else {
+			throw new RuntimeException("Vertex values have to be specified through a vertices input file"
+					+ "or a user-defined map function.");
+		}
+	}
+
+	/**
+	 * Creates a Graph from CSV input with edge values, but without vertex values.
+	 * @param vertexKey the type of the vertex IDs
+	 * @param edgeValue the type of the edge values
+	 * @return a Graph where the edges are read from an edges CSV file (with values).
+	 */
+	public <K, EV> Graph<K, NullValue, EV> edgeTypes(Class<K> vertexKey, Class<EV> edgeValue) {
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, EV>> edges = edgeReader.types(vertexKey, vertexKey, edgeValue);
+
+		return Graph.fromTupleDataSet(edges, executionContext);
+	}
+
+	/**
+	 * Creates a Graph from CSV input without vertex values or edge values.
+	 * @param vertexKey the type of the vertex IDs
+	 * @return a Graph where the vertex IDs are read from the edges input file.
+	 */
+	public <K> Graph<K, NullValue, NullValue> keyType(Class<K> vertexKey) {
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		@SuppressWarnings("serial")
+		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
+				.map(new MapFunction<Tuple2<K, K>, Tuple3<K, K, NullValue>>() {
+
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> edge) {
+						return new Tuple3<K, K, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+					}
+				}).withForwardedFields("f0;f1");;
+
+		return Graph.fromTupleDataSet(edges, executionContext);
+	}
+
+	/**
+	 * Creates a Graph from CSV input without edge values.
+	 * The vertex values are specified through a vertices input file or a user-defined map function.
+	 * If no vertices input file is provided, the vertex IDs are automatically created from the edges
+	 * input file.
+	 * @param vertexKey the type of the vertex IDs
+	 * @param vertexValue the type of the vertex values
+	 * @return a Graph where the vertex IDs and vertex values.
+	 */
+	@SuppressWarnings({ "serial", "unchecked" })
+	public <K, VV> Graph<K, VV, NullValue> vertexTypes(Class<K> vertexKey, Class<VV> vertexValue) {
+		
+		DataSet<Tuple2<K, VV>> vertices = null;
+
+		if (edgeReader == null) {
+			throw new RuntimeException("The edges input file cannot be null!");
+		}
+
+		DataSet<Tuple3<K, K, NullValue>> edges = edgeReader.types(vertexKey, vertexKey)
+				.map(new MapFunction<Tuple2<K,K>, Tuple3<K, K, NullValue>>() {
+
+					public Tuple3<K, K, NullValue> map(Tuple2<K, K> input) {
+						return new Tuple3<K, K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+					}
+				}).withForwardedFields("f0;f1");
+
+		// the vertex value can be provided by an input file or a user-defined mapper
+		if (vertexReader != null) {
+			vertices = vertexReader.types(vertexKey, vertexValue);
+			return Graph.fromTupleDataSet(vertices, edges, executionContext);
+		}
+		else if (mapper != null) {
+			return Graph.fromTupleDataSet(edges, (MapFunction<K, VV>) mapper, executionContext);
+		}
+		else {
+			throw new RuntimeException("Vertex values have to be specified through a vertices input file"
+					+ "or a user-defined map function.");
+		}
+	}
+
+	/**
+	 *Configures the Delimiter that separates rows for the CSV reader used to read the edges
+	 *	({@code '\n'}) is used by default.
+	 *
+	 *@param delimiter The delimiter that separates the rows.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader lineDelimiterEdges(String delimiter) {
+		edgeReader.lineDelimiter(delimiter);
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates rows for the CSV reader used to read the vertices
+	 *	({@code '\n'}) is used by default.
+	 *
+	 *@param delimiter The delimiter that separates the rows.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader lineDelimiterVertices(String delimiter) {
+		if(this.vertexReader != null) {
+			this.vertexReader.lineDelimiter(delimiter);
+		}
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates fields in a row for the CSV reader used to read the vertices
+	 * ({@code ','}) is used by default.
+	 *
+	 * @param delimiter The delimiter that separates the fields in a row.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader fieldDelimiterVertices(String delimiter) {
+		if(this.vertexReader != null) {
+			this.vertexReader.fieldDelimiter(delimiter);
+		}
+		return this;
+	}
+
+	/**
+	 *Configures the Delimiter that separates fields in a row for the CSV reader used to read the edges
+	 * ({@code ','}) is used by default.
+	 *
+	 * @param delimiter The delimiter that separates the fields in a row.
+	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader fieldDelimiterEdges(String delimiter) {
+		this.edgeReader.fieldDelimiter(delimiter);
+		return this;
+	}
+
+	/**
+	 * Enables quoted String parsing for Edge Csv Reader. Field delimiters in quoted Strings are ignored.
+	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
+	 * Leading or tailing whitespaces are not allowed.
+	 *
+	 * @param quoteCharacter The character which is used as quoting character.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader parseQuotedStringsEdges(char quoteCharacter) {
+		this.edgeReader.parseQuotedStrings(quoteCharacter);
+		return this;
+	}
+
+	/**
+	 * Enables quoted String parsing for Vertex Csv Reader. Field delimiters in quoted Strings are ignored.
+	 * A String is parsed as quoted if it starts and ends with a quoting character and as unquoted otherwise.
+	 * Leading or tailing whitespaces are not allowed.
+	 *
+	 * @param quoteCharacter The character which is used as quoting character.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
+		if(this.vertexReader != null) {
+			this.vertexReader.parseQuotedStrings(quoteCharacter);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures the string that starts comments for the Vertex Csv Reader.
+	 * By default comments will be treated as invalid lines.
+	 * This function only recognizes comments which start at the beginning of the line!
+	 *
+	 * @param commentPrefix The string that starts the comments.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreComments(commentPrefix);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures the string that starts comments for the Edge Csv Reader.
+	 * By default comments will be treated as invalid lines.
+	 * This function only recognizes comments which start at the beginning of the line!
+	 *
+	 * @param commentPrefix The string that starts the comments.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreCommentsEdges(String commentPrefix) {
+		this.edgeReader.ignoreComments(commentPrefix);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
+	 * array. The parser will skip over all fields where the boolean value at the corresponding position
+	 * in the array is {@code false}. The result contains the fields where the corresponding position in
+	 * the boolean array is {@code true}.
+	 * The number of fields in the result is consequently equal to the number of times that {@code true}
+	 * occurs in the fields array.
+	 *
+	 * @param vertexFields The array of flags that describes which fields are to be included from the CSV file for vertices.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(vertexFields);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the boolean
+	 * array. The parser will skip over all fields where the boolean value at the corresponding position
+	 * in the array is {@code false}. The result contains the fields where the corresponding position in
+	 * the boolean array is {@code true}.
+	 * The number of fields in the result is consequently equal to the number of times that {@code true}
+	 * occurs in the fields array.
+	 *
+	 * @param edgeFields The array of flags that describes which fields are to be included from the CSV file for edges.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(boolean ... edgeFields) {
+		this.edgeReader.includeFields(edgeFields);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * positions in the string (read from position 0 to its length) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
+	 * The parser will skip over all fields where the character at the corresponding position
+	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
+	 * {@code false}). The result contains the fields where the corresponding position in
+	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
+	 *
+	 * @param mask The string mask defining which fields to include and which to skip.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(String mask) {
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(mask);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * positions in the string (read from position 0 to its length) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the length of the mask string
+	 * The parser will skip over all fields where the character at the corresponding position
+	 * in the string is {@code '0'}, {@code 'F'}, or {@code 'f'} (representing the value
+	 * {@code false}). The result contains the fields where the corresponding position in
+	 * the boolean array is {@code '1'}, {@code 'T'}, or {@code 't'} (representing the value {@code true}).
+	 *
+	 * @param mask The string mask defining which fields to include and which to skip.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(String mask) {
+		this.edgeReader.includeFields(mask);
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing vertices data should be included and which should be skipped. The
+	 * bits in the value (read from least significant to most significant) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
+	 * non-zero bit.
+	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
+	 * include the fields where the corresponding bit is one.
+	 * <p>
+	 * Examples:
+	 * <ul>
+	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
+	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
+	 *       two and three, skip fields four and five, and include field six.</li>
+	 * </ul>
+	 *
+	 * @param mask The bit mask defining which fields to include and which to skip.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsVertices(long mask) {
+		if(this.vertexReader != null) {
+			this.vertexReader.includeFields(mask);
+		}
+		return this;
+	}
+
+	/**
+	 * Configures which fields of the CSV file containing edges data should be included and which should be skipped. The
+	 * bits in the value (read from least significant to most significant) define whether the field at
+	 * the corresponding position in the CSV schema should be included.
+	 * parser will look at the first {@code n} fields, where {@code n} is the position of the most significant
+	 * non-zero bit.
+	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
+	 * include the fields where the corresponding bit is one.
+	 * <p>
+	 * Examples:
+	 * <ul>
+	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
+	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
+	 *       two and three, skip fields four and five, and include field six.</li>
+	 * </ul>
+	 *
+	 * @param mask The bit mask defining which fields to include and which to skip.
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader includeFieldsEdges(long mask) {
+		this.edgeReader.includeFields(mask);
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Edges file to ignore the first line. This is useful for files that contain a header line.
+	 *
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreFirstLineEdges() {
+		this.edgeReader.ignoreFirstLine();
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Vertices file to ignore the first line. This is useful for files that contain a header line.
+	 *
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreFirstLineVertices() {
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreFirstLine();
+		}
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader for the Edges file  to ignore any invalid lines.
+	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+	 *
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreInvalidLinesEdges() {
+		this.edgeReader.ignoreInvalidLines();
+		return this;
+	}
+
+	/**
+	 * Sets the CSV reader Vertices file  to ignore any invalid lines.
+	 * This is useful for files that contain an empty line at the end, multiple header lines or comments. This would throw an exception otherwise.
+	 *
+	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
+	 */
+	public GraphCsvReader ignoreInvalidLinesVertices() {
+		if(this.vertexReader != null) {
+			this.vertexReader.ignoreInvalidLines();
+		}
+		return this;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
new file mode 100644
index 0000000..3215194
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import com.google.common.base.Preconditions;
+
+/**
+ * This is used as a base class for vertex-centric iteration or gather-sum-apply iteration configuration.
+ */
+public abstract class IterationConfiguration {
+
+	/** the iteration name **/
+	private String name;
+
+	/** the iteration parallelism **/
+	private int parallelism = -1;
+
+	/** the iteration aggregators **/
+	private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+
+	/** flag that defines whether the solution set is kept in managed memory **/
+	private boolean unmanagedSolutionSet = false;
+
+	/** flag that defines whether the number of vertices option is set **/
+	private boolean optNumVertices = false;
+	
+	public IterationConfiguration() {}
+
+	/**
+	 * Sets the name for the iteration. The name is displayed in logs and messages.
+	 * 
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	/**
+	 * Gets the name of the iteration.
+	 * @param defaultName 
+	 * 
+	 * @return The name of the iteration.
+	 */
+	public String getName(String defaultName) {
+		if (name != null) {
+			return name;			
+		}
+		else {
+			return defaultName;
+		}
+	}
+
+	/**
+	 * Sets the parallelism for the iteration.
+	 * 
+	 * @param parallelism The parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+	
+	/**
+	 * Gets the iteration's parallelism.
+	 * 
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	/**
+	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
+	 */
+	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
+		this.unmanagedSolutionSet = unmanaged;
+	}
+	
+	/**
+	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory, false otherwise.
+	 */
+	public boolean isSolutionSetUnmanagedMemory() {
+		return this.unmanagedSolutionSet;
+	}
+
+	/**
+	 * Gets whether the number of vertices option is set.
+	 * By default, the number of vertices option is not set.
+	 *
+	 * @return True, if the number of vertices option is set, false otherwise.
+	 */
+	public boolean isOptNumVertices() {
+		return optNumVertices;
+	}
+
+	/**
+	 * Sets the number of vertices option.
+	 * By default, the number of vertices option is not set.
+	 *
+	 * @param optNumVertices True, to set this option, false otherwise.
+	 */
+	public void setOptNumVertices(boolean optNumVertices) {
+		this.optNumVertices = optNumVertices;
+	}
+
+	/**
+	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+	 * via {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getIterationAggregator(String)} and
+	 * {@link org.apache.flink.graph.spargel.VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * 
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
+	 * @param aggregator The aggregator.
+	 */
+	public void registerAggregator(String name, Aggregator<?> aggregator) {
+		this.aggregators.put(name, aggregator);
+	}
+
+	/**
+	 * Gets the set of aggregators that are registered for this vertex-centric iteration.
+	 *
+	 * @return a Map of the registered aggregators, where the key is the aggregator name
+	 * and the value is the Aggregator object
+	 */
+	public Map<String, Aggregator<?>> getAggregators() {
+		return this.aggregators;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
new file mode 100644
index 0000000..a21b23d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable {
+
+	void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
new file mode 100644
index 0000000..fdf54fa
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <O> the type of the return value
+ */
+public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
+
+	void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
new file mode 100644
index 0000000..84eec51
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnEdges(org.apache.flink.graph.ReduceEdgesFunction, EdgeDirection)} method.
+ *
+ * @param <EV> the edge value type
+ */
+public interface ReduceEdgesFunction<EV> extends Function, Serializable {
+
+	EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
new file mode 100644
index 0000000..fc5295d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.Function;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
+ * method.
+ *
+ * @param <VV> the vertex value type
+ */
+public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
+
+	VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
new file mode 100644
index 0000000..dee3480
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple5;
+
+/**
+ * A Triplet stores and retrieves the edges along with their corresponding source and target vertices.
+ * Triplets can be obtained from the input graph via the {@link org.apache.flink.graph.Graph#getTriplets()} method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Triplet() {}
+
+	/**
+	 * Constructs a Triplet from a given source vertex, target vertex and edge
+	 *
+	 * @param srcVertex
+	 * @param trgVertex
+	 * @param edge
+	 */
+	public Triplet(Vertex<K, VV> srcVertex, Vertex<K, VV> trgVertex, Edge<K, EV> edge) {
+		this.f0 = srcVertex.f0;
+		this.f2 = srcVertex.f1;
+		this.f1 = trgVertex.f0;
+		this.f3 = trgVertex.f1;
+		this.f4 = edge.f2;
+	}
+
+	/**
+	 * Constructs a Triplet from its src vertex id, src target id, src vertex value,
+	 * src target value and edge value respectively.
+	 *
+	 * @param srcId
+	 * @param trgId
+	 * @param srcVal
+	 * @param trgVal
+	 * @param edgeVal
+	 */
+	public Triplet(K srcId, K trgId, VV srcVal, VV trgVal, EV edgeVal) {
+		super(srcId, trgId, srcVal, trgVal, edgeVal);
+	}
+
+	public Vertex<K, VV> getSrcVertex() {
+		return new Vertex<K, VV>(this.f0, this.f2);
+	}
+
+	public Vertex<K, VV> getTrgVertex() {
+		return new Vertex<K, VV>(this.f1, this.f3);
+	}
+
+	public Edge<K, EV> getEdge() {
+		return new Edge<K, EV>(this.f0, this.f1, this.f4);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
new file mode 100644
index 0000000..c5eb973
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * Represents the graph's nodes. It carries an ID and a value.
+ * For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type.
+ *
+ * @param <K>
+ * @param <V>
+ */
+public class Vertex<K, V> extends Tuple2<K, V> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Vertex(){}
+
+	public Vertex(K k, V val) {
+		this.f0 = k;
+		this.f1 = val;
+	}
+
+	public K getId() {
+		return this.f0;
+	}
+
+	public V getValue() {
+		return this.f1;
+	}
+
+	public void setId(K id) {
+		this.f0 = id;
+	}
+
+	public void setValue(V val) {
+		this.f1 = val;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
new file mode 100644
index 0000000..cd52e04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example shows how to use Gelly's library methods.
+ * You can find all available library methods in {@link org.apache.flink.graph.library}. 
+ * 
+ * In particular, this example uses the {@link org.apache.flink.graph.library.GSAConnectedComponents}
+ * library method to compute the connected components of the input graph.
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\n1\t3\n</code> defines two edges,
+ * 1-2 with and 1-3.
+ *
+ * Usage <code>ConnectedComponents &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; </code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.ConnectedComponentsDefaultData}
+ */
+public class ConnectedComponents implements ProgramDescription {
+
+	@SuppressWarnings("serial")
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new MapFunction<Long, Long>() {
+			@Override
+			public Long map(Long value) throws Exception {
+				return value;
+			}
+		}, env);
+
+		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
+				.run(new GSAConnectedComponents<Long, NullValue>(maxIterations));
+
+		// emit result
+		if (fileOutput) {
+			verticesWithMinIds.writeAsCsv(outputPath, "\n", ",");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("Connected Components Example");
+		} else {
+			verticesWithMinIds.print();
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Connected Components Example";
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static Integer maxIterations = ConnectedComponentsDefaultData.MAX_ITERATIONS;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 3) {
+				System.err.println("Usage ConnectedComponents <edge path> <output path> " +
+						"<num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+
+		} else {
+			System.out.println("Executing ConnectedComponents example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage ConnectedComponents <edge path> <output path> " +
+					"<num iterations>");
+		}
+
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		} else {
+			return ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
new file mode 100644
index 0000000..b7e3385
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/EuclideanGraphWeighing.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.EuclideanGraphData;
+
+import java.io.Serializable;
+
+/**
+ * This example shows how to use Gelly's {@link Graph#getTriplets()} and
+ * {@link Graph#joinWithEdges(DataSet, MapFunction)} methods.
+ * 
+ * Given a directed, unweighted graph, with vertex values representing points in a plan,
+ * return a weighted graph where the edge weights are equal to the Euclidean distance between the
+ * src and the trg vertex values.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * 	<li> Vertices are represented by their vertexIds and vertex values and are separated by newlines,
+ * 	the value being formed of two doubles separated by a comma.
+ * 	For example: <code>1,1.0,1.0\n2,2.0,2.0\n3,3.0,3.0\n</code> defines a data set of three vertices
+ * 	<li> Edges are represented by pairs of srcVertexId, trgVertexId separated by commas.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1,2\n1,3\n</code> defines two edges 1-2 and 1-3.
+ * </ul>
+ * </p>
+ *
+ * Usage <code>EuclideanGraphWeighing &lt;vertex path&gt; &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.EuclideanGraphData}
+ */
+@SuppressWarnings("serial")
+public class EuclideanGraphWeighing implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Point>> vertices = getVerticesDataSet(env);
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, Point, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// the edge value will be the Euclidean distance between its src and trg vertex
+		DataSet<Tuple3<Long, Long, Double>> edgesWithEuclideanWeight = graph.getTriplets()
+				.map(new MapFunction<Triplet<Long, Point, Double>, Tuple3<Long, Long, Double>>() {
+
+					@Override
+					public Tuple3<Long, Long, Double> map(Triplet<Long, Point, Double> triplet)
+							throws Exception {
+
+						Vertex<Long, Point> srcVertex = triplet.getSrcVertex();
+						Vertex<Long, Point> trgVertex = triplet.getTrgVertex();
+
+						return new Tuple3<Long, Long, Double>(srcVertex.getId(), trgVertex.getId(),
+								srcVertex.getValue().euclideanDistance(trgVertex.getValue()));
+					}
+				});
+
+		Graph<Long, Point, Double> resultedGraph = graph.joinWithEdges(edgesWithEuclideanWeight,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> distance) throws Exception {
+						return distance.f1;
+					}
+				});
+
+		// retrieve the edges from the final result
+		DataSet<Edge<Long, Double>> result = resultedGraph.getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("Euclidean Graph Weighing Example");
+		} else {
+			result.print();
+		}
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Weighing a graph by computing the Euclidean distance " +
+				"between its vertices";
+	}
+
+	// *************************************************************************
+	//     DATA TYPES
+	// *************************************************************************
+
+	/**
+	 * A simple two-dimensional point.
+	 */
+	public static class Point implements Serializable {
+
+		public double x, y;
+
+		public Point() {}
+
+		public Point(double x, double y) {
+			this.x = x;
+			this.y = y;
+		}
+
+		public double euclideanDistance(Point other) {
+			return Math.sqrt((x-other.x)*(x-other.x) + (y-other.y)*(y-other.y));
+		}
+
+		@Override
+		public String toString() {
+			return x + " " + y;
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String verticesInputPath = null;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if (args.length == 3) {
+				fileOutput = true;
+				verticesInputPath = args[0];
+				edgesInputPath = args[1];
+				outputPath = args[2];
+			} else {
+				System.out.println("Executing Euclidean Graph Weighing example with default parameters and built-in default data.");
+				System.out.println("Provide parameters to read input data from files.");
+				System.out.println("See the documentation for the correct format of input files.");
+				System.err.println("Usage: EuclideanGraphWeighing <input vertices path> <input edges path>" +
+						" <output path>");
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Point>> getVerticesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(verticesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Double, Double>, Vertex<Long, Point>>() {
+
+						@Override
+						public Vertex<Long, Point> map(Tuple3<Long, Double, Double> value) throws Exception {
+							return new Vertex<Long, Point>(value.f0, new Point(value.f1, value.f2));
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, 0.0);
+						}
+					});
+		} else {
+			return EuclideanGraphData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..635a099
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPaths.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use Gelly's Gather-Sum-Apply iterations.
+ * 
+ * It is an implementation of the Single-Source-Shortest-Paths algorithm.
+ * For a vertex-centric implementation of the same algorithm, please refer to {@link SingleSourceShortestPaths}. 
+ *
+ * The input file is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, distance which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t0.1\n1\t3\t1.4\n</code> defines two edges,
+ * edge 1-2 with distance 0.1, and edge 1-3 with distance 1.4.
+ *
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SingleSourceShortestPathsData}
+ */
+public class GSASingleSourceShortestPaths implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
+
+		// Execute the GSA iteration
+		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(
+				new CalculateDistances(), new ChooseMinDistance(), new UpdateDistance(), maxIterations);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if(fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", ",");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("GSA Single Source Shortest Paths");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
+
+		private long srcId;
+
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
+		}
+
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
+			}
+			else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+		public Double sum(Double newValue, Double currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+
+		if (args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+				return false;
+			}
+
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+				System.out.println("Executing GSASingle Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Single Source Shortest Paths";
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
new file mode 100644
index 0000000..b808e76
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/GraphMetrics.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.aggregation.Aggregations;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.ExampleUtils;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This example illustrates how to use Gelly metrics methods and get simple statistics
+ * from the input graph.  
+ * 
+ * The program creates a random graph and computes and prints
+ * the following metrics:
+ * - number of vertices
+ * - number of edges
+ * - average node degree
+ * - the vertex ids with the max/min in- and out-degrees
+ *
+ * The input file is expected to contain one edge per line,
+ * with long IDs and no values, in the following format:
+ * "<sourceVertexID>\t<targetVertexID>".
+ * If no arguments are provided, the example runs with a random graph of 100 vertices.
+ *
+ */
+public class GraphMetrics implements ProgramDescription {
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		/** create the graph **/
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
+		
+		/** get the number of vertices **/
+		long numVertices = graph.numberOfVertices();
+		
+		/** get the number of edges **/
+		long numEdges = graph.numberOfEdges();
+		
+		/** compute the average node degree **/
+		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
+
+		DataSet<Double> avgNodeDegree = verticesWithDegrees
+				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
+		
+		/** find the vertex with the maximum in-degree **/
+		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
+
+		/** find the vertex with the minimum in-degree **/
+		DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId());
+
+		/** find the vertex with the maximum out-degree **/
+		DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId());
+
+		/** find the vertex with the minimum out-degree **/
+		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
+		
+		/** print the results **/
+		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
+		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
+		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
+		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
+		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
+		ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree");
+		ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree");
+
+		env.execute();
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
+
+		private long numberOfVertices;
+
+		public AvgNodeDegreeMapper(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
+		}
+
+		public Double map(Tuple2<Long, Long> sumTuple) {
+			return (double) (sumTuple.f1 / numberOfVertices) ;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
+		public Long map(Tuple2<Long, Long> value) { return value.f0; }
+	}
+
+	@Override
+	public String getDescription() {
+		return "Graph Metrics Example";
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String edgesInputPath = null;
+
+	static final int NUM_VERTICES = 100;
+
+	static final long SEED = 9876;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			if(args.length != 1) {
+				System.err.println("Usage: GraphMetrics <input edges>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgesInputPath = args[0];
+		} else {
+			System.out.println("Executing Graph Metrics example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("Usage: GraphMetrics <input edges>");
+		}
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n").fieldDelimiter("\t")
+					.types(Long.class, Long.class).map(
+							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+
+								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
+									return new Edge<Long, NullValue>(value.f0, value.f1, 
+											NullValue.getInstance());
+								}
+					});
+		} else {
+			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
new file mode 100644
index 0000000..c03937d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSP.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This example illustrates how to 
+ * <ul>
+ *  <li> create a Graph directly from CSV files
+ *  <li> use the vertex-centric iteration's messaging direction configuration option
+ * </ul>
+ * 
+ * Incremental Single Sink Shortest Paths Example. Shortest Paths are incrementally updated
+ * upon edge removal.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was computed).
+ * In the following description, SP-graph is used as an abbreviation for
+ * the graph resulted from the SSSP computation. We denote the edges that belong to this
+ * graph by SP-edges.
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge <u, v> is removed, v checks if it has another out-going SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message.
+ * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE message.
+ * The propagation stops when a vertex with an alternative shortest path is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage <code>IncrementalSSSP &lt;vertex path&gt; &lt;edge path&gt; &lt;edges in SSSP&gt;
+ * &lt;src id edge to be removed&gt; &lt;trg id edge to be removed&gt; &lt;val edge to be removed&gt;
+ * &lt;result path&gt; &lt;number of iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings("serial")
+public class IncrementalSSSP implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
+
+		Graph<Long, Double, Double> graph = IncrementalSSSP.getGraph(env);
+
+		// Assumption: all minimum weight paths are kept
+		Graph<Long, Double, Double> ssspGraph = IncrementalSSSP.getSSSPGraph(env);
+
+		// remove the edge
+		graph.removeEdge(edgeToBeRemoved);
+
+		// configure the iteration
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+
+		if(isInSSSP(edgeToBeRemoved, ssspGraph.getEdges())) {
+
+			parameters.setDirection(EdgeDirection.IN);
+			parameters.setOptDegrees(true);
+
+			// run the vertex centric iteration to propagate info
+			Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
+					new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+
+			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+			// Emit results
+			if(fileOutput) {
+				resultedVertices.writeAsCsv(outputPath, "\n", ",");
+				env.execute("Incremental SSSP Example");
+			} else {
+				resultedVertices.print();
+			}
+		} else {
+			// print the vertices
+			if(fileOutput) {
+				graph.getVertices().writeAsCsv(outputPath, "\n", ",");
+				env.execute("Incremental SSSP Example");
+			} else {
+				graph.getVertices().print();
+			}
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Incremental Single Sink Shortest Paths Example";
+	}
+
+	// ******************************************************************************************************************
+	// IncrementalSSSP METHODS
+	// ******************************************************************************************************************
+
+	/**
+	 * Function that verifies whether the edge to be removed is part of the SSSP or not.
+	 * If it is, the src vertex will be invalidated.
+	 *
+	 * @param edgeToBeRemoved
+	 * @param edgesInSSSP
+	 * @return
+	 */
+	public static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
+
+		return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() {
+			@Override
+			public boolean filter(Edge<Long, Double> edge) throws Exception {
+				return edge.equals(edgeToBeRemoved);
+			}
+		}).count() > 0;
+	}
+
+	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
+			if (inMessages.hasNext()) {
+				Long outDegree = getOutDegree() - 1;
+				// check if the vertex has another SP-Edge
+				if (outDegree > 0) {
+					// there is another shortest path from the source to this vertex
+				} else {
+					// set own value to infinity
+					setNewVertexValue(Double.MAX_VALUE);
+				}
+			}
+		}
+	}
+
+	public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+		private Edge<Long, Double> edgeToBeRemoved;
+
+		public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
+			this.edgeToBeRemoved = edgeToBeRemoved;
+		}
+
+		@Override
+		public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
+
+
+			if(getSuperstepNumber() == 1) {
+				if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
+					// activate the edge target
+					sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
+				}
+			}
+
+			if(getSuperstepNumber() > 1) {
+				// invalidate all edges
+				for(Edge<Long, Double> edge : getEdges()) {
+					sendMessageTo(edge.getSource(), Double.MAX_VALUE);
+				}
+			}
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String verticesInputPath = null;
+
+	private static String edgesInputPath = null;
+
+	private static String edgesInSSSPInputPath = null;
+
+	private static Long srcEdgeToBeRemoved = null;
+
+	private static Long trgEdgeToBeRemoved = null;
+
+	private static Double valEdgeToBeRemoved = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length > 0) {
+			if (args.length == 8) {
+				fileOutput = true;
+				verticesInputPath = args[0];
+				edgesInputPath = args[1];
+				edgesInSSSPInputPath = args[2];
+				srcEdgeToBeRemoved = Long.parseLong(args[3]);
+				trgEdgeToBeRemoved = Long.parseLong(args[4]);
+				valEdgeToBeRemoved = Double.parseDouble(args[5]);
+				outputPath = args[6];
+				maxIterations = Integer.parseInt(args[7]);
+			} else {
+				System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
+				System.out.println("Provide parameters to read input data from files.");
+				System.out.println("See the documentation for the correct format of input files.");
+				System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> " +
+						"<src id edge to be removed> <trg id edge to be removed> <val edge to be removed> " +
+						"<output path> <max iterations>");
+
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static Graph<Long, Double, Double> getGraph(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return Graph.fromCsvReader(verticesInputPath, edgesInputPath, env).lineDelimiterEdges("\n")
+					.types(Long.class, Double.class, Double.class);
+		} else {
+			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgeDataSet(env), env);
+		}
+	}
+
+	private static Graph<Long, Double, Double> getSSSPGraph(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return Graph.fromCsvReader(verticesInputPath, edgesInSSSPInputPath, env).lineDelimiterEdges("\n")
+					.types(Long.class, Double.class, Double.class);
+		} else {
+			return Graph.fromDataSet(IncrementalSSSPData.getDefaultVertexDataSet(env), IncrementalSSSPData.getDefaultEdgesInSSSP(env), env);
+		}
+	}
+
+	private static Edge<Long, Double> getEdgeToBeRemoved() {
+		if (fileOutput) {
+			return new Edge<Long, Double>(srcEdgeToBeRemoved, trgEdgeToBeRemoved, valEdgeToBeRemoved);
+		} else {
+			return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
new file mode 100644
index 0000000..0f84dbb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasure.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+
+import java.util.HashSet;
+
+/**
+ * This example shows how to use
+ * <ul>
+ *  <li> neighborhood methods
+ *  <li> join with vertices
+ *  <li> triplets
+ * </ul>
+ * 
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * 	Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasure &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasure implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, HashSet<Long>, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, HashSet<Long>>() {
+
+					@Override
+					public HashSet<Long> map(Long id) throws Exception {
+						HashSet<Long> neighbors = new HashSet<Long>();
+						neighbors.add(id);
+
+						return new HashSet<Long>(neighbors);
+					}
+				}, env);
+
+		// create the set of neighbors
+		DataSet<Tuple2<Long, HashSet<Long>>> computedNeighbors =
+				graph.reduceOnNeighbors(new GatherNeighbors(), EdgeDirection.ALL);
+
+		// join with the vertices to update the node values
+		Graph<Long, HashSet<Long>, Double> graphWithVertexValues =
+				graph.joinWithVertices(computedNeighbors, new MapFunction<Tuple2<HashSet<Long>, HashSet<Long>>,
+						HashSet<Long>>() {
+
+					@Override
+					public HashSet<Long> map(Tuple2<HashSet<Long>, HashSet<Long>> tuple2) throws Exception {
+						return tuple2.f1;
+					}
+				});
+
+		// compare neighbors, compute Jaccard
+		DataSet<Edge<Long, Double>> edgesWithJaccardValues =
+				graphWithVertexValues.getTriplets().map(new ComputeJaccard());
+
+		// emit result
+		if (fileOutput) {
+			edgesWithJaccardValues.writeAsCsv(outputPath, "\n", ",");
+
+			// since file sinks are lazy, we trigger the execution explicitly
+			env.execute("Executing Jaccard Similarity Measure");
+		} else {
+			edgesWithJaccardValues.print();
+		}
+
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex Jaccard Similarity Measure";
+	}
+
+	/**
+	 * Each vertex will have a HashSet containing its neighbor ids as value.
+	 */
+	private static final class GatherNeighbors implements ReduceNeighborsFunction<HashSet<Long>> {
+
+		@Override
+		public HashSet<Long> reduceNeighbors(HashSet<Long> first, HashSet<Long> second) {
+			first.addAll(second);
+			return new HashSet<Long>(first);
+		}
+	}
+
+	/**
+	 * The edge weight will be the Jaccard coefficient, which is computed as follows:
+	 *
+	 * Consider the edge x-y
+	 * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+	 * sizeX+sizeY = union + intersection of neighborhoods
+	 * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+	 * The intersection can then be deduced.
+	 *
+	 * The Jaccard similarity coefficient is then, the intersection/union.
+	 */
+	private static final class ComputeJaccard implements
+			MapFunction<Triplet<Long, HashSet<Long>, Double>, Edge<Long, Double>> {
+
+		@Override
+		public Edge<Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet) throws Exception {
+
+			Vertex<Long, HashSet<Long>> srcVertex = triplet.getSrcVertex();
+			Vertex<Long, HashSet<Long>> trgVertex = triplet.getTrgVertex();
+
+			Long x = srcVertex.getId();
+			Long y = trgVertex.getId();
+			HashSet<Long> neighborSetY = trgVertex.getValue();
+
+			double unionPlusIntersection = srcVertex.getValue().size() + neighborSetY.size();
+			// within a HashSet, all elements are distinct
+			HashSet<Long> unionSet = new HashSet<Long>();
+			unionSet.addAll(srcVertex.getValue());
+			unionSet.addAll(neighborSetY);
+			double union = unionSet.size();
+			double intersection = unionPlusIntersection - union;
+
+			return new Edge<Long, Double>(x, y, intersection/union);
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 2) {
+				System.err.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+		} else {
+			System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage JaccardSimilarityMeasure <edge path> <output path>");
+		}
+
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+						}
+					});
+		} else {
+			return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+		}
+	}
+}


Mime
View raw message