flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fhue...@apache.org
Subject [04/10] flink git commit: [FLINK-2857] [gelly] Improve Gelly API and documentation. - Improve javadocs of Graph creation methods - Add fromTuple2 creation methods - Rename mapper parameters to vertexInitializer. - Improve javadocs and parameter names of
Date Mon, 19 Oct 2015 16:01:10 GMT
[FLINK-2857] [gelly] Improve Gelly API and documentation.
- Improve javadocs of Graph creation methods
- Add fromTuple2 creation methods
- Rename mapper parameters to vertexInitializer.
- Improve javadocs and parameter names of joinWith* methods
- Improve javadocs of neighborhood methods
- Update docs to reflect api changes

This closes #1263


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/640e63be
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/640e63be
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/640e63be

Branch: refs/heads/master
Commit: 640e63beef0b60891178affc7a6e8f0d01a5d000
Parents: da248b1
Author: vasia <vasia@apache.org>
Authored: Fri Oct 16 11:11:40 2015 +0200
Committer: Fabian Hueske <fhueske@apache.org>
Committed: Mon Oct 19 15:39:37 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  34 +-
 .../org/apache/flink/graph/scala/Graph.scala    | 372 ++++++++------
 .../operations/GraphCreationWithCsvITCase.scala |   2 +-
 .../test/operations/JoinWithEdgesITCase.scala   |   7 +-
 .../operations/JoinWithVerticesITCase.scala     |   8 +-
 .../apache/flink/graph/EdgeJoinFunction.java    |  45 ++
 .../org/apache/flink/graph/EdgesFunction.java   |  19 +
 .../graph/EdgesFunctionWithVertexValue.java     |  20 +-
 .../main/java/org/apache/flink/graph/Graph.java | 498 +++++++++++--------
 .../apache/flink/graph/NeighborsFunction.java   |  19 +
 .../graph/NeighborsFunctionWithVertexValue.java |  20 +
 .../apache/flink/graph/ReduceEdgesFunction.java |   9 +
 .../flink/graph/ReduceNeighborsFunction.java    |   9 +
 .../apache/flink/graph/VertexJoinFunction.java  |  43 ++
 .../graph/example/EuclideanGraphWeighing.java   |   8 +-
 .../graph/example/JaccardSimilarityMeasure.java |   8 +-
 .../flink/graph/example/MusicProfiles.java      |   7 +-
 .../apache/flink/graph/library/GSAPageRank.java |  11 +-
 .../flink/graph/library/GSATriangleCount.java   |  10 +-
 .../apache/flink/graph/library/PageRank.java    |  11 +-
 .../test/operations/GraphCreationITCase.java    |  59 +++
 .../test/operations/JoinWithEdgesITCase.java    |  41 +-
 .../test/operations/JoinWithVerticesITCase.java |  40 +-
 23 files changed, 887 insertions(+), 413 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 13d304d..646ec7f 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -149,6 +149,30 @@ val graph = Graph.fromDataSet(vertices, edges, env)
 </div>
 </div>
 
+* from a `DataSet` of `Tuple2` representing the edges. Gelly will convert each `Tuple2` to an `Edge`, where the first field will be the source ID and the second field will be the target ID. Both vertex and edge values will be set to `NullValue`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+DataSet<Tuple2<String, String>> edges = ...
+
+Graph<String, NullValue, NullValue> graph = Graph.fromTuple2DataSet(edges, env);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val env = ExecutionEnvironment.getExecutionEnvironment
+
+val edges: DataSet[(String, String)] = ...
+
+val graph = Graph.fromTuple2DataSet(edges, env)
+{% endhighlight %}
+</div>
+</div>
+
 * from a `DataSet` of `Tuple3` and an optional `DataSet` of `Tuple2`. In this case, Gelly will convert each `Tuple3` to an `Edge`, where the first field will be the source ID, the second field will be the target ID and the third field will be the edge value. Equivalently, each `Tuple2` will be converted to a `Vertex`, where the first field will be the vertex ID and the second field will be the vertex value:
 
 <div class="codetabs" markdown="1">
@@ -424,8 +448,8 @@ graph.subgraph((vertex => vertex.getValue > 0), (edge => edge.getValue < 0))
     <img alt="Filter Transformations" width="80%" src="fig/gelly-filter.png"/>
 </p>
 
-* <strong>Join</strong>: Gelly provides specialized methods for joining the vertex and edge datasets with other input datasets. `joinWithVertices` joins the vertices with a `Tuple2` input data set. The join is performed using the vertex ID and the first field of the `Tuple2` input as the join keys. The method returns a new `Graph` where the vertex values have been updated according to a provided user-defined map function.
-Similarly, an input dataset can be joined with the edges, using one of three methods. `joinWithEdges` expects an input `DataSet` of `Tuple3` and joins on the composite key of both source and target vertex IDs. `joinWithEdgesOnSource` expects a `DataSet` of `Tuple2` and joins on the source key of the edges and the first attribute of the input dataset and `joinWithEdgesOnTarget` expects a `DataSet` of `Tuple2` and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a map function on the edge and the input data set values.
+* <strong>Join</strong>: Gelly provides specialized methods for joining the vertex and edge datasets with other input datasets. `joinWithVertices` joins the vertices with a `Tuple2` input data set. The join is performed using the vertex ID and the first field of the `Tuple2` input as the join keys. The method returns a new `Graph` where the vertex values have been updated according to a provided user-defined transformation function.
+Similarly, an input dataset can be joined with the edges, using one of three methods. `joinWithEdges` expects an input `DataSet` of `Tuple3` and joins on the composite key of both source and target vertex IDs. `joinWithEdgesOnSource` expects a `DataSet` of `Tuple2` and joins on the source key of the edges and the first attribute of the input dataset and `joinWithEdgesOnTarget` expects a `DataSet` of `Tuple2` and joins on the target key of the edges and the first attribute of the input dataset. All three methods apply a transformation function on the edge and the input data set values.
 Note that if the input dataset contains a key multiple times, all Gelly join methods will only consider the first value encountered.
 
 <div class="codetabs" markdown="1">
@@ -437,9 +461,9 @@ DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
 
 // assign the transition probabilities as the edge weights
 Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
-				new MapFunction<Tuple2<Double, Long>, Double>() {
-					public Double map(Tuple2<Double, Long> value) {
-						return value.f0 / value.f1;
+				new VertexJoinFunction<Double, Long>() {
+					public Double vertexJoin(Double vertexValue, Long inputValue) {
+						return vertexValue / inputValue;
 					}
 				});
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 28f3f12..e51453e 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -54,12 +54,13 @@ object Graph {
   /**
   * Creates a graph from a DataSet of edges.
   * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
+  * vertexValueInitializer map function to the vertex ids.
   */
   def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, mapper, env.getJavaEnv))
+  TypeInformation : ClassTag](edges: DataSet[Edge[K, EV]],
+  vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](edges.javaSet, vertexValueInitializer,
+      env.getJavaEnv))
   }
 
   /**
@@ -84,16 +85,22 @@ object Graph {
   /**
   * Creates a graph from a Seq of edges.
   * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
+  * vertexValueInitializer map function to the vertex ids.
   */
   def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
-    wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, mapper, env.getJavaEnv))
+  TypeInformation : ClassTag](edges: Seq[Edge[K, EV]], vertexValueInitializer: MapFunction[K, VV],
+  env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, VV, EV](edges.asJavaCollection, vertexValueInitializer,
+      env.getJavaEnv))
   }
 
   /**
-  * Creates a Graph from a DataSets of Tuples.
+   * Creates a graph from DataSets of tuples for vertices and for edges.
+   * The first field of the Tuple2 vertex object will become the vertex ID
+   * and the second field will become the vertex value.
+   * The first field of the Tuple3 object for edges will become the source ID,
+   * the second field will become the target ID, and the third field will become
+   * the edge value. 
   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
   TypeInformation : ClassTag](vertices: DataSet[(K, VV)], edges: DataSet[(K, K, EV)],
@@ -101,11 +108,14 @@ object Graph {
     val javaTupleVertices = vertices.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
     wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleVertices, javaTupleEdges,
-        env.getJavaEnv))
+      env.getJavaEnv))
   }
 
   /**
   * Creates a Graph from a DataSet of Tuples representing the edges.
+  * The first field of the Tuple3 object for edges will become the source ID,
+  * the second field will become the target ID, and the third field will become
+  * the edge value. 
   * Vertices are created automatically and their values are set to NullValue.
   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, EV: TypeInformation : ClassTag]
@@ -116,14 +126,45 @@ object Graph {
 
   /**
   * Creates a Graph from a DataSet of Tuples representing the edges.
+  * The first field of the Tuple3 object for edges will become the source ID,
+  * the second field will become the target ID, and the third field will become
+  * the edge value. 
   * Vertices are created automatically and their values are set by applying the provided
-  * map function to the vertex ids.
+  * vertexValueInitializer map function to the vertex ids.
   */
   def fromTupleDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
-  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)], mapper: MapFunction[K, VV],
-      env: ExecutionEnvironment): Graph[K, VV, EV] = {
+  TypeInformation : ClassTag](edges: DataSet[(K, K, EV)],
+  vertexValueInitializer: MapFunction[K, VV], env: ExecutionEnvironment): Graph[K, VV, EV] = {
     val javaTupleEdges = edges.map(v => new jtuple.Tuple3(v._1, v._2, v._3)).javaSet
-    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, mapper, env.getJavaEnv))
+    wrapGraph(jg.Graph.fromTupleDataSet[K, VV, EV](javaTupleEdges, vertexValueInitializer,
+      env.getJavaEnv))
+  }
+
+    /**
+  * Creates a Graph from a DataSet of Tuple2's representing the edges.
+  * The first field of the Tuple2 object for edges will become the source ID,
+  * the second field will become the target ID. The edge value will be set to NullValue.
+  * Vertices are created automatically and their values are set to NullValue.
+  */
+  def fromTuple2DataSet[K: TypeInformation : ClassTag](edges: DataSet[(K, K)],
+  env: ExecutionEnvironment): Graph[K, NullValue, NullValue] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    wrapGraph(jg.Graph.fromTuple2DataSet[K](javaTupleEdges, env.getJavaEnv))
+  }
+
+  /**
+  * Creates a Graph from a DataSet of Tuple2's representing the edges.
+  * The first field of the Tuple2 object for edges will become the source ID,
+  * the second field will become the target ID. The edge value will be set to NullValue.
+  * Vertices are created automatically and their values are set by applying the provided
+  * vertexValueInitializer map function to the vertex IDs.
+  */
+  def fromTuple2DataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag]
+  (edges: DataSet[(K, K)], vertexValueInitializer: MapFunction[K, VV],
+  env: ExecutionEnvironment): Graph[K, VV, NullValue] = {
+    val javaTupleEdges = edges.map(v => new jtuple.Tuple2(v._1, v._2)).javaSet
+    wrapGraph(jg.Graph.fromTuple2DataSet[K, VV](javaTupleEdges, vertexValueInitializer,
+      env.getJavaEnv))
   }
 
   /**
@@ -160,7 +201,8 @@ object Graph {
   * edges file.
   * @param includedFieldsEdges The fields in the edges file that should be read.
   * By default all fields are read.
-  * @param mapper If no vertex values are provided, this mapper can be used to initialize them.
+  * @param vertexValueInitializer If no vertex values are provided,
+  * this mapper can be used to initialize them, by applying a map transformation on the vertex IDs.
   * 
   */
   // scalastyle:off
@@ -186,7 +228,7 @@ object Graph {
       ignoreCommentsEdges: String = null,
       lenientEdges: Boolean = false,
       includedFieldsEdges: Array[Int] = null,
-      mapper: MapFunction[K, VV] = null) = {
+      vertexValueInitializer: MapFunction[K, VV] = null) = {
 
     // with vertex and edge values
     if (readVertices && hasEdgeValues) {
@@ -229,8 +271,8 @@ object Graph {
         includedFieldsEdges)
 
       // initializer provided
-      if (mapper != null) {
-        fromTupleDataSet[K, VV, EV](edges, mapper, env)
+      if (vertexValueInitializer != null) {
+        fromTupleDataSet[K, VV, EV](edges, vertexValueInitializer, env)
       }
       else {
         fromTupleDataSet[K, EV](edges, env) 
@@ -243,8 +285,8 @@ object Graph {
       lenientEdges, includedFieldsEdges).map(edge => (edge._1, edge._2, NullValue.getInstance))
 
       // no initializer provided
-      if (mapper != null) {
-        fromTupleDataSet[K, VV, NullValue](edges, mapper, env)
+      if (vertexValueInitializer != null) {
+        fromTupleDataSet[K, VV, NullValue](edges, vertexValueInitializer, env)
       }
       else {
         fromTupleDataSet[K, NullValue](edges, env) 
@@ -369,185 +411,215 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-   * Joins the vertex DataSet of this graph with an input DataSet and applies
-   * a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @return a new graph where the vertex values have been updated.
-   */
-  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
-    (VV, T), VV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
-      override def map(value: jtuple.Tuple2[VV, T]): VV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
+   * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+   * a user-defined transformation on the values of the matched records.
+   * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
+   * 
+   * @param inputDataSet the Tuple2 DataSet to join with.
+   * The first field of the Tuple2 is used as the join key and the second field is passed
+   * as a parameter to the transformation function.
+   * @param vertexJoinFunction the transformation function to apply.
+   * The first parameter is the current vertex value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @return a new Graph, where the vertex values have been updated according to the
+   * result of the vertexJoinFunction.
+   * 
+   * @tparam T the type of the second field of the input Tuple2 DataSet.
+  */
+  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+  vertexJoinFunction: VertexJoinFunction[VV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, vertexJoinFunction))
   }
 
   /**
-   * Joins the vertex DataSet of this graph with an input DataSet and applies
-   * a UDF on the resulted values.
-   *
-   * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @return a new graph where the vertex values have been updated.
-   */
+   * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+   * a user-defined transformation on the values of the matched records.
+   * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
+   * 
+   * @param inputDataSet the Tuple2 DataSet to join with.
+   * The first field of the Tuple2 is used as the join key and the second field is passed
+   * as a parameter to the transformation function.
+   * @param fun the transformation function to apply.
+   * The first parameter is the current vertex value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @return a new Graph, where the vertex values have been updated according to the
+   * result of the vertexJoinFunction.
+   * 
+   * @tparam T the type of the second field of the input Tuple2 DataSet.
+  */
   def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
   Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+    val newVertexJoin = new VertexJoinFunction[VV, T]() {
       val cleanFun = clean(fun)
 
-      override def map(value: jtuple.Tuple2[VV, T]): VV = {
-        cleanFun(value.f0, value.f1)
+      override def vertexJoin(vertexValue: VV, inputValue: T): VV = {
+        cleanFun(vertexValue, inputValue)
       }
     }
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newVertexJoin))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on a composite key of both
-   * source and target and applies a UDF on the resulted values.
-   *
+   * Joins the edge DataSet with an input DataSet on the composite key of both
+   * source and target IDs and applies a user-defined transformation on the values
+   * of the matched records. The first two fields of the input DataSet are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
-    (EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
+   * The first two fields of the Tuple3 are used as the composite join key
+   * and the third field is passed as a parameter to the transformation function.
+   * @param edgeJoinFunction the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple3 from the input DataSet.
+   * 
+   * @tparam T the type of the third field of the input Tuple3 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
+  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)],
+  edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
       scalatuple._2, scalatuple._3)).javaSet
-    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, edgeJoinFunction))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on a composite key of both
-   * source and target and applies a UDF on the resulted values.
-   *
+   * Joins the edge DataSet with an input DataSet on the composite key of both
+   * source and target IDs and applies a user-defined transformation on the values
+   * of the matched records. The first two fields of the input DataSet are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
+   * The first two fields of the Tuple3 are used as the composite join key
+   * and the third field is passed as a parameter to the transformation function.
+   * @param fun the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple3 from the input DataSet.
+   * 
+   * @tparam T the type of the third field of the input Tuple3 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
   def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
   Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+    val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
       val cleanFun = clean(fun)
 
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
+      override def edgeJoin(edgeValue: EV, inputValue: T): EV = {
+        cleanFun(edgeValue, inputValue)
       }
     }
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
       scalatuple._2, scalatuple._3)).javaSet
-    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newEdgeJoin))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on the source key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. In case the inputDataSet contains the same key more
-   * than once, only the first value will be considered.
-   *
+   * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+   * on the values of the matched records.
+   * The source ID of the edges input and the first field of the input DataSet
+   * are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
-  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
+   * The first field of the Tuple2 is used as the join key
+   * and the second field is passed as a parameter to the transformation function.
+   * @param edgeJoinFunction the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @tparam T the type of the second field of the input Tuple2 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
+  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+  edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, edgeJoinFunction))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on the source key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. In case the inputDataSet contains the same key more
-   * than once, only the first value will be considered.
-   *
+   * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+   * on the values of the matched records.
+   * The source ID of the edges input and the first field of the input DataSet
+   * are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
+   * The first field of the Tuple2 is used as the join key
+   * and the second field is passed as a parameter to the transformation function.
+   * @param fun the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @tparam T the type of the second field of the input Tuple2 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
   def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+    val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
       val cleanFun = clean(fun)
 
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
+      override def edgeJoin(edgeValue: EV, inputValue: T): EV = {
+        cleanFun(edgeValue, inputValue)
       }
     }
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newEdgeJoin))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on the target key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. Should the inputDataSet contain the same key more
-   * than once, only the first value will be considered.
-   *
+   * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+   * on the values of the matched records.
+   * The target ID of the edges input and the first field of the input DataSet
+   * are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param mapper the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
-  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
-  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        mapper.map((value.f0, value.f1))
-      }
-    }
+   * The first field of the Tuple2 is used as the join key
+   * and the second field is passed as a parameter to the transformation function.
+   * @param edgeJoinFunction the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @param T the type of the second field of the input Tuple2 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
+  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)],
+  edgeJoinFunction: EdgeJoinFunction[EV, T]): Graph[K, VV, EV] = {
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, edgeJoinFunction))
   }
 
   /**
-   * Joins the edge DataSet with an input DataSet on the target key of the
-   * edges and the first attribute of the input DataSet and applies a UDF on
-   * the resulted values. Should the inputDataSet contain the same key more
-   * than once, only the first value will be considered.
-   *
+   * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+   * on the values of the matched records.
+   * The target ID of the edges input and the first field of the input DataSet
+   * are used as join keys.
+   * 
    * @param inputDataSet the DataSet to join with.
-   * @param fun the UDF map function to apply.
-   * @tparam T the return type
-   * @return a new graph where the edge values have been updated.
-   */
+   * The first field of the Tuple2 is used as the join key
+   * and the second field is passed as a parameter to the transformation function.
+   * @param fun the transformation function to apply.
+   * The first parameter is the current edge value and the second parameter is the value
+   * of the matched Tuple2 from the input DataSet.
+   * @param T the type of the second field of the input Tuple2 DataSet.
+   * @return a new Graph, where the edge values have been updated according to the
+   * result of the edgeJoinFunction.
+  */
   def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
     EV): Graph[K, VV, EV] = {
-    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+    val newEdgeJoin = new EdgeJoinFunction[EV, T]() {
       val cleanFun = clean(fun)
 
-      override def map(value: jtuple.Tuple2[EV, T]): EV = {
-        cleanFun(value.f0, value.f1)
+      override def edgeJoin(edgeValue: EV, inputValue:T): EV = {
+        cleanFun(edgeValue, inputValue)
       }
     }
     val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
       scalatuple._2)).javaSet
-    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newEdgeJoin))
   }
 
   /**
@@ -896,12 +968,17 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-   * Compute an aggregate over the neighbor values of each
-   * vertex.
-   *
-   * @param reduceNeighborsFunction the function to apply to the neighborhood
-   * @param direction               the edge direction (in-, out-, all-)
-   * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+   * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+   * For each vertex, the transformation consecutively calls a
+   * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+   * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+   * into one new value of the same type.
+   * 
+   * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
+   * @param direction the edge direction (in-, out-, all-)
+   * @return a Dataset of Tuple2, with one tuple per vertex.
+   * The first field of the Tuple2 is the vertex ID and the second field
+   * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
    */
   def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
   EdgeDirection): DataSet[(K, VV)] = {
@@ -910,13 +987,18 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
   }
 
   /**
-   * Compute an aggregate over the edge values of each vertex.
-   *
-   * @param reduceEdgesFunction the function to apply to the neighborhood
-   * @param direction           the edge direction (in-, out-, all-)
-   * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
-   * @throws IllegalArgumentException
-   */
+   * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+   * For each vertex, the transformation consecutively calls a
+   * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+   * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+   * into one new value of the same type.
+   * 
+   * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
+   * @param direction the edge direction (in-, out-, all-)
+   * @return a Dataset of Tuple2, with one tuple per vertex.
+   * The first field of the Tuple2 is the vertex ID and the second field
+   * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
+  */
   def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
   DataSet[(K, EV)] = {
     wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
index 6ceaf16..a963845 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphCreationWithCsvITCase.scala
@@ -101,7 +101,7 @@ MultipleProgramsTestBase(mode) {
     val graph = Graph.fromCsvReader[Long, Double, Long](
         readVertices = false,
         pathEdges = edgesSplit.getPath.toString,
-        mapper = new VertexDoubleIdAssigner(),
+        vertexValueInitializer = new VertexDoubleIdAssigner(),
         env = env)
     
     val result = graph.getTriplets.collect()

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
index 3dc90fc..83fa61b 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -30,6 +30,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
 import _root_.scala.collection.JavaConverters._
+import org.apache.flink.graph.EdgeJoinFunction
 
 @RunWith(classOf[Parameterized])
 class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
@@ -127,10 +128,10 @@ MultipleProgramsTestBase(mode) {
   }
 
 
-  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+  final class AddValuesMapper extends EdgeJoinFunction[Long, Long] {
     @throws(classOf[Exception])
-    def map(tuple: (Long, Long)): Long = {
-      tuple._1 + tuple._2
+    def edgeJoin(edgeValue: Long, inputValue: Long): Long = {
+      edgeValue + inputValue
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
index 98ee8b6..f2beb7b 100644
--- a/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ b/flink-libraries/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
@@ -29,6 +29,7 @@ import org.junit.runner.RunWith
 import org.junit.runners.Parameterized
 import org.junit.{After, Before, Rule, Test}
 import _root_.scala.collection.JavaConverters._
+import org.apache.flink.graph.VertexJoinFunction
 
 @RunWith(classOf[Parameterized])
 class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
@@ -63,11 +64,10 @@ MultipleProgramsTestBase(mode) {
     TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
   }
 
-
-  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+  final class AddValuesMapper extends VertexJoinFunction[Long, Long] {
     @throws(classOf[Exception])
-    def map(tuple: (Long, Long)): Long = {
-      tuple._1 + tuple._2
+    def vertexJoin(vertexValue: Long, inputValue: Long): Long = {
+      vertexValue + inputValue
     }
   }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
new file mode 100644
index 0000000..68d6e53
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeJoinFunction.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface to be implemented by the transformation function
+ * applied in {@link Graph#joinWithEdges(DataSet, EdgeJoinFunction)},
+ * {@link Graph#joinWithEdgesOnSource(DataSet, EdgeJoinFunction)}, and
+ * {@link Graph#joinWithEdgesOnTarget(DataSet, EdgeJoinFunction)} methods.
+ *
+ * @param <EV> the edge value type
+ * @param <T> the input value type
+ */
+public interface EdgeJoinFunction<EV, T> extends Function, Serializable {
+
+	/**
+	 * Applies a transformation on the current edge value
+	 * and the value of the matched tuple of the input DataSet.
+	 * 
+	 * @param edgeValue the current edge value
+	 * @param inputValue the value of the matched Tuple2 input
+	 * @return the new edge value
+	 */
+	EV edgeJoin(EV edgeValue, T inputValue) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index bf1d6a2..07e14e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -34,5 +34,24 @@ import org.apache.flink.util.Collector;
  */
 public interface EdgesFunction<K, EV, O> extends Function, Serializable {
 
+	/**
+	 * This method is called per vertex and can iterate over all of its neighboring edges
+	 * with the specified direction.
+	 * <p>
+	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 * the out-edges of the grouping vertex.
+	 * If called with {@link EdgeDirection#IN} the group will contain
+	 * the in-edges of the grouping vertex.
+	 * If called with {@link EdgeDirection#ALL} the group will contain
+	 * all edges of the grouping vertex.
+	 * <p>
+	 * The method can emit any number of output elements, including none.
+	 * 
+	 * @param edges the neighboring edges of the grouping vertex.
+	 * The first filed of each Tuple2 is the ID of the grouping vertex.
+	 * The second field is the neighboring edge.
+	 * @param out the collector to emit results to
+	 * @throws Exception
+	*/
 	void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index 0b0ab0e..645bd7c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -35,5 +35,23 @@ import org.apache.flink.util.Collector;
  */
 public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
 
-	void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
+	/**
+	 * This method is called per vertex and can iterate over all of its neighboring edges
+	 * with the specified direction.
+	 * <p>
+	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 * the out-edges of the grouping vertex.
+	 * If called with {@link EdgeDirection#IN} the group will contain
+	 * the in-edges of the grouping vertex.
+	 * If called with {@link EdgeDirection#ALL} the group will contain
+	 * all edges of the grouping vertex.
+	 * <p>
+	 * The method can emit any number of output elements, including none.
+	 * 
+	 * @param vertex the grouping vertex
+	 * @param edges the neighboring edges of the grouping vertex.
+	 * @param out the collector to emit results to
+	 * @throws Exception
+	*/
+	void iterateEdges(Vertex<K, VV> vertex, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index b24f749..6015be4 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -111,11 +111,11 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a Collection of edges, vertices are induced from the
-	 * edges. Vertices are created automatically and their values are set to
+	 * Creates a graph from a Collection of edges.
+	 * Vertices are created automatically and their values are set to
 	 * NullValue.
 	 * 
-	 * @param edges a Collection of vertices.
+	 * @param edges a Collection of edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
@@ -126,20 +126,20 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a Collection of edges, vertices are induced from the
-	 * edges and vertex values are calculated by a mapper function. Vertices are
-	 * created automatically and their values are set by applying the provided
-	 * map function to the vertex ids.
+	 * Creates a graph from a Collection of edges.
+	 * Vertices are created automatically and their values are set 
+	 * by applying the provided map function to the vertex IDs.
 	 * 
 	 * @param edges a Collection of edges.
-	 * @param mapper the mapper function.
+	 * @param vertexValueInitializer a map function that initializes the vertex values.
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value. 
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
 	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
 
-		return fromDataSet(context.fromCollection(edges), mapper, context);
+		return fromDataSet(context.fromCollection(edges), vertexValueInitializer, context);
 	}
 
 	/**
@@ -157,8 +157,8 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of edges, vertices are induced from the
-	 * edges. Vertices are created automatically and their values are set to
+	 * Creates a graph from a DataSet of edges.
+	 * Vertices are created automatically and their values are set to
 	 * NullValue.
 	 * 
 	 * @param edges a DataSet of edges.
@@ -183,23 +183,23 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of edges, vertices are induced from the
-	 * edges and vertex values are calculated by a mapper function. Vertices are
-	 * created automatically and their values are set by applying the provided
-	 * map function to the vertex ids.
+	 * Creates a graph from a DataSet of edges.
+	 * Vertices are created automatically and their values are set
+	 * by applying the provided map function to the vertex IDs.
 	 * 
 	 * @param edges a DataSet of edges.
-	 * @param mapper the mapper function.
+	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
 	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
 
 		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
 
 		TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
-				MapFunction.class, mapper.getClass(), 1, null, null);
+				MapFunction.class, vertexValueInitializer.getClass(), 1, null, null);
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
 		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
@@ -209,7 +209,7 @@ public class Graph<K, VV, EV> {
 				.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
 				.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
 					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-						return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
+						return new Vertex<K, VV>(value.f0, vertexValueInitializer.map(value.f0));
 					}
 				}).returns(returnType).withForwardedFields("f0");
 
@@ -226,13 +226,17 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of Tuple objects for vertices and edges.
-	 * 
-	 * Vertices with value are created from Tuple2, Edges with value are created
-	 * from Tuple3.
+	 * Creates a graph from a DataSet of Tuple2 objects for vertices and 
+	 * Tuple3 objects for edges.
+	 * <p>
+	 * The first field of the Tuple2 vertex object will become the vertex ID
+	 * and the second field will become the vertex value.
+	 * The first field of the Tuple3 object for edges will become the source ID,
+	 * the second field will become the target ID, and the third field will become
+	 * the edge value.
 	 * 
-	 * @param vertices a DataSet of Tuple2.
-	 * @param edges a DataSet of Tuple3.
+	 * @param vertices a DataSet of Tuple2 representing the vertices.
+	 * @param edges a DataSet of Tuple3 representing the edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
@@ -245,13 +249,15 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of Tuple objects for edges, vertices are
-	 * induced from the edges.
+	 * Creates a graph from a DataSet of Tuple3 objects for edges.
+	 * <p>
+	 * The first field of the Tuple3 object will become the source ID,
+	 * the second field will become the target ID, and the third field will become
+	 * the edge value.
+	 * <p>
+	 * Vertices are created automatically and their values are set to NullValue.
 	 * 
-	 * Edges with value are created from Tuple3. Vertices are created
-	 * automatically and their values are set to NullValue.
-	 * 
-	 * @param edges a DataSet of Tuple3.
+	 * @param edges a DataSet of Tuple3 representing the edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
@@ -263,22 +269,78 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of Tuple objects for edges, vertices are
-	 * induced from the edges and vertex values are calculated by a mapper
-	 * function. Edges with value are created from Tuple3. Vertices are created
-	 * automatically and their values are set by applying the provided map
-	 * function to the vertex ids.
+	 * Creates a graph from a DataSet of Tuple3 objects for edges.
+	 * <p>
+	 * Each Tuple3 will become one Edge, where the source ID will be the first field of the Tuple2,
+	 * the target ID will be the second field of the Tuple2
+	 * and the Edge value will be the third field of the Tuple3.
+	 * <p>
+	 * Vertices are created automatically and their values are initialized
+	 * by applying the provided vertexValueInitializer map function to the vertex IDs.
 	 * 
 	 * @param edges a DataSet of Tuple3.
-	 * @param mapper the mapper function.
+	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
 	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
+			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
 
 		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
-		return fromDataSet(edgeDataSet, mapper, context);
+		return fromDataSet(edgeDataSet, vertexValueInitializer, context);
+	}
+
+	/**
+	 * Creates a graph from a DataSet of Tuple2 objects for edges.
+	 * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
+	 * and the target ID will be the second field of the Tuple2.
+	 * <p>
+	 * Edge value types and Vertex values types will be set to NullValue.
+	 * 
+	 * @param edges a DataSet of Tuple2.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K> Graph<K, NullValue, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
+			ExecutionEnvironment context) {
+
+		DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
+				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+
+					public Edge<K, NullValue> map(Tuple2<K, K> input) {
+						return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+					}
+		}).withForwardedFields("f0; f1");
+		return fromDataSet(edgeDataSet, context);
+	}
+
+	/**
+	 * Creates a graph from a DataSet of Tuple2 objects for edges.
+	 * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
+	 * and the target ID will be the second field of the Tuple2.
+	 * <p>
+	 * Edge value types will be set to NullValue.
+	 * Vertex values can be initialized by applying a user-defined map function on the vertex IDs.
+	 * 
+	 * @param edges a DataSet of Tuple2, where the first field corresponds to the source ID
+	 * and the second field corresponds to the target ID.
+	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
+	 * @param context the flink execution environment.
+	 * @return the newly created graph.
+	 */
+	public static <K, VV> Graph<K, VV, NullValue> fromTuple2DataSet(DataSet<Tuple2<K, K>> edges,
+			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
+
+		DataSet<Edge<K, NullValue>> edgeDataSet = edges.map(
+				new MapFunction<Tuple2<K, K>, Edge<K, NullValue>>() {
+
+					public Edge<K, NullValue> map(Tuple2<K, K> input) {
+						return new Edge<K, NullValue>(input.f0, input.f1, NullValue.getInstance());
+					}
+				}).withForwardedFields("f0; f1");
+		return fromDataSet(edgeDataSet, vertexValueInitializer, context);
 	}
 
 	/**
@@ -318,10 +380,11 @@ public class Graph<K, VV, EV> {
 
 	/** 
 	 * Creates a graph from a CSV file of edges. Vertices will be created automatically and
-	 * Vertex values are set by the provided mapper.
+	 * Vertex values can be initialized using a user-defined mapper.
 	 *
 	 * @param edgesPath a path to a CSV file with the Edge data
-	 * @param mapper the mapper function.
+	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
 	 * @param context the execution environment.
 	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
 	 * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
@@ -332,8 +395,8 @@ public class Graph<K, VV, EV> {
 	 * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
 	 */
 	public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-		return new GraphCsvReader(edgesPath, mapper, context);
+			final MapFunction<K, VV> vertexValueInitializer, ExecutionEnvironment context) {
+		return new GraphCsvReader(edgesPath, vertexValueInitializer, context);
 	}
 
 	/**
@@ -501,29 +564,37 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Joins the vertex DataSet of this graph with an input DataSet and applies
-	 * a UDF on the resulted values.
+	 * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
+	 * a user-defined transformation on the values of the matched records.
+	 * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
 	 * 
-	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @return a new graph where the vertex values have been updated.
-	 */
+	 * @param inputDataSet the Tuple2 DataSet to join with.
+	 * The first field of the Tuple2 is used as the join key and the second field is passed
+	 * as a parameter to the transformation function. 
+	 * @param vertexJoinFunction the transformation function to apply.
+	 * The first parameter is the current vertex value and the second parameter is the value
+	 * of the matched Tuple2 from the input DataSet.
+	 * @return a new Graph, where the vertex values have been updated according to the
+	 * result of the vertexJoinFunction.
+	 * 
+	 * @param <T> the type of the second field of the input Tuple2 DataSet.
+	*/
 	public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, 
-			final MapFunction<Tuple2<VV, T>, VV> mapper) {
+			final VertexJoinFunction<VV, T> vertexJoinFunction) {
 
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
 				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
+				.with(new ApplyCoGroupToVertexValues<K, VV, T>(vertexJoinFunction));
 		return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
 	}
 
 	private static final class ApplyCoGroupToVertexValues<K, VV, T>
 			implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
 
-		private MapFunction<Tuple2<VV, T>, VV> mapper;
+		private VertexJoinFunction<VV, T> vertexJoinFunction;
 
-		public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
-			this.mapper = mapper;
+		public ApplyCoGroupToVertexValues(VertexJoinFunction<VV, T> mapper) {
+			this.vertexJoinFunction = mapper;
 		}
 
 		@Override
@@ -537,42 +608,46 @@ public class Graph<K, VV, EV> {
 				if (inputIterator.hasNext()) {
 					final Tuple2<K, T> inputNext = inputIterator.next();
 
-					collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
-							.map(new Tuple2<VV, T>(vertexIterator.next().f1,
-									inputNext.f1))));
+					collector.collect(new Vertex<K, VV>(inputNext.f0, vertexJoinFunction
+							.vertexJoin(vertexIterator.next().f1, inputNext.f1)));
 				} else {
 					collector.collect(vertexIterator.next());
 				}
-
 			}
 		}
 	}
 
 	/**
-	 * Joins the edge DataSet with an input DataSet on a composite key of both
-	 * source and target and applies a UDF on the resulted values.
+	 * Joins the edge DataSet with an input DataSet on the composite key of both
+	 * source and target IDs and applies a user-defined transformation on the values
+	 * of the matched records. The first two fields of the input DataSet are used as join keys.
 	 * 
 	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
+	 * The first two fields of the Tuple3 are used as the composite join key
+	 * and the third field is passed as a parameter to the transformation function. 
+	 * @param edgeJoinFunction the transformation function to apply.
+	 * The first parameter is the current edge value and the second parameter is the value
+	 * of the matched Tuple3 from the input DataSet.
+	 * @param <T> the type of the third field of the input Tuple3 DataSet.
+	 * @return a new Graph, where the edge values have been updated according to the
+	 * result of the edgeJoinFunction.
+	*/
 	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
+			final EdgeJoinFunction<EV, T> edgeJoinFunction) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
-				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
+				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(edgeJoinFunction));
 		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
 	}
 
 	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
 			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
 
-		private MapFunction<Tuple2<EV, T>, EV> mapper;
+		private EdgeJoinFunction<EV, T> edgeJoinFunction;
 
-		public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
-			this.mapper = mapper;
+		public ApplyCoGroupToEdgeValues(EdgeJoinFunction<EV, T> mapper) {
+			this.edgeJoinFunction = mapper;
 		}
 
 		@Override
@@ -587,8 +662,8 @@ public class Graph<K, VV, EV> {
 					final Tuple3<K, K, T> inputNext = inputIterator.next();
 
 					collector.collect(new Edge<K, EV>(inputNext.f0,
-							inputNext.f1, mapper.map(new Tuple2<EV, T>(
-									edgesIterator.next().f2, inputNext.f2))));
+							inputNext.f1, edgeJoinFunction.edgeJoin(
+									edgesIterator.next().f2, inputNext.f2)));
 				} else {
 					collector.collect(edgesIterator.next());
 				}
@@ -597,22 +672,26 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Joins the edge DataSet with an input DataSet on the source key of the
-	 * edges and the first attribute of the input DataSet and applies a UDF on
-	 * the resulted values. In case the inputDataSet contains the same key more
-	 * than once, only the first value will be considered.
+	 * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+	 * on the values of the matched records.
+	 * The source ID of the edges input and the first field of the input DataSet are used as join keys.
 	 * 
 	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
+	 * The first field of the Tuple2 is used as the join key
+	 * and the second field is passed as a parameter to the transformation function. 
+	 * @param edgeJoinFunction the transformation function to apply.
+	 * The first parameter is the current edge value and the second parameter is the value
+	 * of the matched Tuple2 from the input DataSet.
+	 * @param <T> the type of the second field of the input Tuple2 DataSet.
+	 * @return a new Graph, where the edge values have been updated according to the
+	 * result of the edgeJoinFunction.
+	*/
 	public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
+			final EdgeJoinFunction<EV, T> edgeJoinFunction) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
 		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
 	}
@@ -620,11 +699,10 @@ public class Graph<K, VV, EV> {
 	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
 			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
 
-		private MapFunction<Tuple2<EV, T>, EV> mapper;
+		private EdgeJoinFunction<EV, T> edgeJoinFunction;
 
-		public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
-				MapFunction<Tuple2<EV, T>, EV> mapper) {
-			this.mapper = mapper;
+		public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(EdgeJoinFunction<EV, T> mapper) {
+			this.edgeJoinFunction = mapper;
 		}
 
 		@Override
@@ -641,8 +719,7 @@ public class Graph<K, VV, EV> {
 					Edge<K, EV> edgesNext = edgesIterator.next();
 
 					collector.collect(new Edge<K, EV>(edgesNext.f0,
-							edgesNext.f1, mapper.map(new Tuple2<EV, T>(
-									edgesNext.f2, inputNext.f1))));
+							edgesNext.f1, edgeJoinFunction.edgeJoin(edgesNext.f2, inputNext.f1)));
 				}
 
 			} else {
@@ -654,22 +731,26 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Joins the edge DataSet with an input DataSet on the target key of the
-	 * edges and the first attribute of the input DataSet and applies a UDF on
-	 * the resulted values. Should the inputDataSet contain the same key more
-	 * than once, only the first value will be considered.
+	 * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
+	 * on the values of the matched records.
+	 * The target ID of the edges input and the first field of the input DataSet are used as join keys.
 	 * 
 	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
+	 * The first field of the Tuple2 is used as the join key
+	 * and the second field is passed as a parameter to the transformation function. 
+	 * @param edgeJoinFunction the transformation function to apply.
+	 * The first parameter is the current edge value and the second parameter is the value
+	 * of the matched Tuple2 from the input DataSet.
+	 * @param <T> the type of the second field of the input Tuple2 DataSet.
+	 * @return a new Graph, where the edge values have been updated according to the
+	 * result of the edgeJoinFunction.
+	*/
 	public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
+			final EdgeJoinFunction<EV, T> edgeJoinFunction) {
 
 		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
 				.coGroup(inputDataSet).where(1).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(edgeJoinFunction));
 
 		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
 	}
@@ -798,20 +879,21 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges has access to the vertex value.
+	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+	 * The edgesFunction applied on the edges has access to both the id and the value
+	 * of the grouping vertex.
+	 * 
+	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
 	 * 
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @return a dataset of a T
+	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
+	 * @param <T> the output type
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
-											EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -829,21 +911,22 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges has access to the vertex value.
-	 *
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
+	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+	 * The edgesFunction applied on the edges has access to both the id and the value
+	 * of the grouping vertex.
+	 * 
+	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
+	 * @param <T> the output type
 	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
-											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+			EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
 
 		switch (direction) {
 			case IN:
@@ -861,20 +944,21 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges only has access to the vertex id (not the vertex value).
+	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+	 * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
+	 * of the grouping vertex.
 	 * 
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @return a dataset of T
+	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
+	 * @param <T> the output type
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-											EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -894,21 +978,22 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges only has access to the vertex id (not the vertex value).
-	 *
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
+	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
+	 * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
+	 * of the grouping vertex.
+	 * 
+	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
+	 * @param <T> the output type
 	 * @param typeInfo the explicit return type.
-	 * @return a dataset of T
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+			EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
 
 		switch (direction) {
 			case IN:
@@ -1515,18 +1600,22 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors has access to the vertex
-	 * value.
+	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+	 * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
+	 * and the vertex value of the grouping vertex.
 	 * 
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
+	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+	 * of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
-	 * @return a dataset of a T
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1558,19 +1647,23 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors has access to the vertex
-	 * value.
-	 *
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
+	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+	 * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
+	 * and the vertex value of the grouping vertex.
+	 * 
+	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+	 * of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
+	 * @param typeInfo the explicit return type
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+			EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
 		switch (direction) {
 			case IN:
 				// create <edge-sourceVertex> pairs
@@ -1601,20 +1694,23 @@ public class Graph<K, VV, EV> {
 		}
 	}
 
-
 	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors only has access to the
-	 * vertex id (not the vertex value).
+	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+	 * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
+	 * (not the vertex value) of the grouping vertex.
 	 * 
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
+	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+	 * of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
-	 * @return a dataset of a T
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1647,19 +1743,23 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors only has access to the
-	 * vertex id (not the vertex value).
-	 *
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
+	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
+	 * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
+	 * (not the vertex value) of the grouping vertex.
+	 * 
+	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 * with the specified direction, and emit any number of output elements, including none.
+	 * 
+	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
+	 * of each vertex.
+	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
+	 * @param typeInfo the explicit return type
+	 * @return a DataSet containing elements of type T
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
+			EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
 		switch (direction) {
 			case IN:
 				// create <edge-sourceVertex> pairs
@@ -1842,16 +1942,21 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the neighbor values of each
-	 * vertex.
+	 * Compute a reduce transformation over the neighbors' vertex values of each vertex.
+	 * For each vertex, the transformation consecutively calls a
+	 * {@link ReduceNeighborsFunction} until only a single value for each vertex remains.
+	 * The {@link ReduceNeighborsFunction} combines a pair of neighbor vertex values
+	 * into one new value of the same type.
 	 *
-	 * @param reduceNeighborsFunction the function to apply to the neighborhood
+	 * @param reduceNeighborsFunction the reduce function to apply to the neighbors of each vertex.
 	 * @param direction the edge direction (in-, out-, all-)
-	 * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+	 * @return a Dataset of Tuple2, with one tuple per vertex.
+	 * The first field of the Tuple2 is the vertex ID and the second field
+	 * is the aggregate value computed by the provided {@link ReduceNeighborsFunction}.
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction,
-									EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 			case IN:
 				// create <vertex-source value> pairs
@@ -1900,17 +2005,20 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Compute an aggregate over the edge values of each vertex.
+	 * Compute a reduce transformation over the edge values of each vertex.
+	 * For each vertex, the transformation consecutively calls a
+	 * {@link ReduceEdgesFunction} until only a single value for each edge remains.
+	 * The {@link ReduceEdgesFunction} combines two edge values into one new value of the same type.
 	 *
-	 * @param reduceEdgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
+	 * @param reduceEdgesFunction the reduce function to apply to the neighbors of each vertex.
+	 * @param direction the edge direction (in-, out-, all-)
+	 * @return a Dataset of Tuple2, with one tuple per vertex.
+	 * The first field of the Tuple2 is the vertex ID and the second field
+	 * is the aggregate value computed by the provided {@link ReduceEdgesFunction}.
 	 * @throws IllegalArgumentException
-	 */
+	*/
 	public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
-								EdgeDirection direction) throws IllegalArgumentException {
+			EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 			case IN:

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index a21b23d..1a32204 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -36,5 +36,24 @@ import org.apache.flink.util.Collector;
  */
 public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable {
 
+	/**
+	 * This method is called per vertex and can iterate over all of its neighbors
+	 * with the specified direction.
+	 * <p>
+	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 * the out-edges and neighboring vertices of the grouping vertex.
+	 * If called with {@link EdgeDirection#IN} the group will contain
+	 * the in-edges and neighboring vertices of the grouping vertex.
+	 * If called with {@link EdgeDirection#ALL} the group will contain
+	 * all edges and neighboring vertices of the grouping vertex.
+	 * <p>
+	 * The method can emit any number of output elements, including none.
+	 * 
+	 * @param neighbors the neighbors of the grouping vertex.
+	 * The first filed of each Tuple3 is the ID of the grouping vertex.
+	 * The second field is the neighboring edge, and the third field is the neighboring vertex.
+	 * @param out the collector to emit results to
+	 * @throws Exception
+	*/
 	void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index fdf54fa..657238c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -36,5 +36,25 @@ import org.apache.flink.util.Collector;
  */
 public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable {
 
+	/**
+	 * This method is called per vertex and can iterate over all of its neighbors
+	 * with the specified direction.
+	 * <p>
+	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 * the out-edges and neighboring vertices of the grouping vertex.
+	 * If called with {@link EdgeDirection#IN} the group will contain
+	 * the in-edges and neighboring vertices of the grouping vertex.
+	 * If called with {@link EdgeDirection#ALL} the group will contain
+	 * all edges and neighboring vertices of the grouping vertex.
+	 * <p>
+	 * The method can emit any number of output elements, including none.
+	 * 
+	 * @param vertex the grouping Vertex
+	 * @param neighbors the neighbors of the grouping vertex.
+	 * The first filed of each Tuple3 is the ID of the grouping vertex.
+	 * The second field is the neighboring edge, and the third field is the neighboring vertex.
+	 * @param out the collector to emit results to
+	 * @throws Exception
+	 */
 	void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 84eec51..e7631a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -30,5 +30,14 @@ import java.io.Serializable;
  */
 public interface ReduceEdgesFunction<EV> extends Function, Serializable {
 
+	/**
+	 * It combines two neighboring edge values into one new value of the same type.
+	 * For each vertex, this function is consecutively called,
+	 * until only a single value for each edge remains.
+	 * 
+	 * @param firstEdgeValue the first neighboring edge value to combine
+	 * @param secondEdgeValue the second neighboring edge value to combine
+	 * @return the combined value of both input values
+	 */
 	EV reduceEdges(EV firstEdgeValue, EV secondEdgeValue);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index fc5295d..5b423e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -31,5 +31,14 @@ import java.io.Serializable;
  */
 public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
 
+	/**
+	 * It combines two neighboring vertex values into one new value of the same type.
+	 * For each vertex, this function is consecutively called,
+	 * until only a single value for each vertex remains.
+	 * 
+	 * @param firstNeighborValue the first neighboring vertex value to combine
+	 * @param secondNeighborValue the second neighboring vertex value to combine
+	 * @return the combined value of both input values
+	 */
 	VV reduceNeighbors(VV firstNeighborValue, VV secondNeighborValue);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/640e63be/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
new file mode 100644
index 0000000..a30d1a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import java.io.Serializable;
+
+import org.apache.flink.api.common.functions.Function;
+
+/**
+ * Interface to be implemented by the transformation function
+ * applied in {@link Graph#joinWithVertices(DataSet, VertexJoinFunction)} method.
+ *
+ * @param <VV> the vertex value type
+ * @param <T> the input value type
+ */
+public interface VertexJoinFunction<VV, T> extends Function, Serializable {
+
+	/**
+	 * Applies a transformation on the current vertex value
+	 * and the value of the matched tuple of the input DataSet.
+	 * 
+	 * @param vertexValue the current vertex value
+	 * @param inputValue the value of the matched Tuple2 input
+	 * @return the new vertex value
+	 */
+	VV vertexJoin(VV vertexValue, T inputValue) throws Exception;
+}


Mime
View raw message