flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [3/7] flink git commit: [FLINK-1962] Add Gelly Scala API
Date Tue, 18 Aug 2015 14:38:17 GMT
[FLINK-1962] Add Gelly Scala API

This closes #1004


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/240e8895
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/240e8895
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/240e8895

Branch: refs/heads/master
Commit: 240e8895c6e1d5ea6f67370d276cb58fd0ecddb8
Parents: d2e88ff
Author: Pieter-Jan Van Aeken <pieterjan.vanaeken@euranova.eu>
Authored: Mon Aug 10 14:06:52 2015 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Tue Aug 18 16:37:41 2015 +0200

----------------------------------------------------------------------
 .../flink/api/scala/ExecutionEnvironment.scala  |   4 +
 flink-staging/flink-gelly-scala/pom.xml         | 198 +++++
 .../flink/graph/scala/EdgesFunction.scala       |  35 +
 .../scala/EdgesFunctionWithVertexValue.scala    |  33 +
 .../org/apache/flink/graph/scala/Graph.scala    | 734 +++++++++++++++++++
 .../flink/graph/scala/NeighborsFunction.scala   |  37 +
 .../NeighborsFunctionWithVertexValue.scala      |  40 +
 .../org/apache/flink/graph/scala/package.scala  |  30 +
 .../graph/scala/utils/EdgeToTuple3Map.scala     |  31 +
 .../graph/scala/utils/VertexToTuple2Map.scala   |  31 +
 .../flink/graph/scala/test/TestGraphUtils.scala |  55 ++
 .../scala/test/operations/DegreesITCase.scala   |  88 +++
 .../test/operations/GraphMutationsITCase.scala  | 171 +++++
 .../test/operations/GraphOperationsITCase.scala | 238 ++++++
 .../test/operations/JoinWithEdgesITCase.scala   | 170 +++++
 .../operations/JoinWithVerticesITCase.scala     |  93 +++
 .../scala/test/operations/MapEdgesITCase.scala  | 102 +++
 .../test/operations/MapVerticesITCase.scala     |  99 +++
 .../operations/ReduceOnEdgesMethodsITCase.scala | 173 +++++
 .../ReduceOnNeighborMethodsITCase.scala         | 144 ++++
 .../main/java/org/apache/flink/graph/Graph.java | 179 +++++
 flink-staging/pom.xml                           |   1 +
 22 files changed, 2686 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
----------------------------------------------------------------------
diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
index d53c54c..17311e9 100644
--- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
+++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala
@@ -65,6 +65,10 @@ import scala.reflect.ClassTag
 class ExecutionEnvironment(javaEnv: JavaEnv) {
 
   /**
+   * @return the Java Execution environment.
+   */
+  def getJavaEnv: JavaEnv = javaEnv
+  /**
    * Gets the config object.
    */
   def getConfig: ExecutionConfig = {

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/pom.xml b/flink-staging/flink-gelly-scala/pom.xml
new file mode 100644
index 0000000..390dbb8
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/pom.xml
@@ -0,0 +1,198 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>flink-staging</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>0.10-SNAPSHOT</version>
+        <relativePath>..</relativePath>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>flink-gelly-scala</artifactId>
+
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-gelly</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
+            <version>${guava.version}</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Scala Compiler -->
+            <plugin>
+                <groupId>net.alchim31.maven</groupId>
+                <artifactId>scala-maven-plugin</artifactId>
+                <version>3.1.4</version>
+                <executions>
+                    <!-- Run scala compiler in the process-resources phase, so that dependencies on
+                        scala classes can be resolved later in the (Java) compile phase -->
+                    <execution>
+                        <id>scala-compile-first</id>
+                        <phase>process-resources</phase>
+                        <goals>
+                            <goal>compile</goal>
+                        </goals>
+                    </execution>
+
+                    <!-- Run scala compiler in the process-test-resources phase, so that dependencies on
+                         scala classes can be resolved later in the (Java) test-compile phase -->
+                    <execution>
+                        <id>scala-test-compile</id>
+                        <phase>process-test-resources</phase>
+                        <goals>
+                            <goal>testCompile</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <jvmArgs>
+                        <jvmArg>-Xms128m</jvmArg>
+                        <jvmArg>-Xmx512m</jvmArg>
+                    </jvmArgs>
+                    <compilerPlugins combine.children="append">
+                        <compilerPlugin>
+                            <groupId>org.scalamacros</groupId>
+                            <artifactId>paradise_${scala.version}</artifactId>
+                            <version>${scala.macros.version}</version>
+                        </compilerPlugin>
+                    </compilerPlugins>
+                </configuration>
+            </plugin>
+
+            <!-- Eclipse Integration -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-eclipse-plugin</artifactId>
+                <version>2.8</version>
+                <configuration>
+                    <downloadSources>true</downloadSources>
+                    <projectnatures>
+                        <projectnature>org.scala-ide.sdt.core.scalanature</projectnature>
+                        <projectnature>org.eclipse.jdt.core.javanature</projectnature>
+                    </projectnatures>
+                    <buildcommands>
+                        <buildcommand>org.scala-ide.sdt.core.scalabuilder</buildcommand>
+                    </buildcommands>
+                    <classpathContainers>
+                        <classpathContainer>org.scala-ide.sdt.launching.SCALA_CONTAINER</classpathContainer>
+                        <classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
+                    </classpathContainers>
+                    <excludes>
+                        <exclude>org.scala-lang:scala-library</exclude>
+                        <exclude>org.scala-lang:scala-compiler</exclude>
+                    </excludes>
+                    <sourceIncludes>
+                        <sourceInclude>**/*.scala</sourceInclude>
+                        <sourceInclude>**/*.java</sourceInclude>
+                    </sourceIncludes>
+                </configuration>
+            </plugin>
+
+            <!-- Adding scala source directories to build path -->
+            <plugin>
+                <groupId>org.codehaus.mojo</groupId>
+                <artifactId>build-helper-maven-plugin</artifactId>
+                <version>1.7</version>
+                <executions>
+                    <!-- Add src/main/scala to eclipse build path -->
+                    <execution>
+                        <id>add-source</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>add-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/main/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                    <!-- Add src/test/scala to eclipse build path -->
+                    <execution>
+                        <id>add-test-source</id>
+                        <phase>generate-test-sources</phase>
+                        <goals>
+                            <goal>add-test-source</goal>
+                        </goals>
+                        <configuration>
+                            <sources>
+                                <source>src/test/scala</source>
+                            </sources>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.scalastyle</groupId>
+                <artifactId>scalastyle-maven-plugin</artifactId>
+                <version>0.5.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>check</goal>
+                        </goals>
+                    </execution>
+                </executions>
+                <configuration>
+                    <verbose>false</verbose>
+                    <failOnViolation>true</failOnViolation>
+                    <includeTestSourceDirectory>true</includeTestSourceDirectory>
+                    <failOnWarning>false</failOnWarning>
+                    <sourceDirectory>${basedir}/src/main/scala</sourceDirectory>
+                    <testSourceDirectory>${basedir}/src/test/scala</testSourceDirectory>
+                    <configLocation>${project.basedir}/../../tools/maven/scalastyle-config.xml</configLocation>
+                    <outputFile>${project.basedir}/scalastyle-output.xml</outputFile>
+                    <outputEncoding>UTF-8</outputEncoding>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
new file mode 100644
index 0000000..70a5fdf
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunction.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.graph.Edge
+import org.apache.flink.util.Collector
+
+abstract class EdgesFunction[K, EV, T] extends org.apache.flink.graph.EdgesFunction[K, EV, T] {
+
+  def iterateEdges(edges: Iterable[(K, Edge[K, EV])], out: Collector[T])
+
+  override def iterateEdges(edges: java.lang.Iterable[Tuple2[K, Edge[K, EV]]], out:
+  Collector[T]): Unit = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(edges)
+      .map(jtuple => (jtuple.f0, jtuple.f1))
+    iterateEdges(scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
new file mode 100644
index 0000000..82589b6
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/EdgesFunctionWithVertexValue.scala
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+abstract class EdgesFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
+.EdgesFunctionWithVertexValue[K, VV, EV, T] {
+  @throws(classOf[Exception])
+  def iterateEdges(v: Vertex[K, VV], edges: Iterable[Edge[K, EV]], out: Collector[T])
+
+  override def iterateEdges(v: Vertex[K, VV], edges: java.lang.Iterable[Edge[K, EV]], out:
+  Collector[T]) = {
+    iterateEdges(v, scala.collection.JavaConversions.iterableAsScalaIterable(edges), out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
new file mode 100644
index 0000000..738fd90
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -0,0 +1,734 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.{tuple => jtuple}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph._
+import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
+import org.apache.flink.graph.spargel.{MessagingFunction, VertexCentricConfiguration, VertexUpdateFunction}
+import org.apache.flink.{graph => jg}
+
+import _root_.scala.collection.JavaConverters._
+import _root_.scala.reflect.ClassTag
+
+object Graph {
+  def fromDataSet[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: DataSet[Vertex[K, VV]], edges: DataSet[Edge[K, EV]],
+                              env: ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromDataSet[K, VV, EV](vertices.javaSet, edges.javaSet, env.getJavaEnv))
+  }
+
+  def fromCollection[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+  TypeInformation : ClassTag](vertices: Seq[Vertex[K, VV]], edges: Seq[Edge[K, EV]], env:
+  ExecutionEnvironment): Graph[K, VV, EV] = {
+    wrapGraph(jg.Graph.fromCollection[K, VV, EV](vertices.asJavaCollection, edges
+      .asJavaCollection, env.getJavaEnv))
+  }
+}
+
+/**
+ * Represents a graph consisting of {@link Edge edges} and {@link Vertex vertices}.
+ * @param jgraph the underlying java api Graph.
+ * @tparam K the key type for vertex and edge identifiers
+ * @tparam VV the value type for vertices
+ * @tparam EV the value type for edges
+ * @see org.apache.flink.graph.Edge
+ * @see org.apache.flink.graph.Vertex
+ */
+final class Graph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag, EV:
+TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
+
+  private[flink] def getWrappedGraph = jgraph
+
+
+  private[flink] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
+    if (jgraph.getContext.getConfig.isClosureCleanerEnabled) {
+      ClosureCleaner.clean(f, checkSerializable)
+    }
+    ClosureCleaner.ensureSerializable(f)
+    f
+  }
+
+  /**
+   * @return the vertex DataSet.
+   */
+  def getVertices = wrap(jgraph.getVertices)
+
+  /**
+   * @return the edge DataSet.
+   */
+  def getEdges = wrap(jgraph.getEdges)
+
+  /**
+   * @return the vertex DataSet as Tuple2.
+   */
+  def getVerticesAsTuple2(): DataSet[(K, VV)] = {
+    wrap(jgraph.getVerticesAsTuple2).map(jtuple => (jtuple.f0, jtuple.f1))
+  }
+
+  /**
+   * @return the edge DataSet as Tuple3.
+   */
+  def getEdgesAsTuple3(): DataSet[(K, K, EV)] = {
+    wrap(jgraph.getEdgesAsTuple3).map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+  }
+
+  /**
+   * Apply a function to the attribute of each vertex in the graph.
+   *
+   * @param mapper the map function to apply.
+   * @return a new graph
+   */
+  def mapVertices[NV: TypeInformation : ClassTag](mapper: MapFunction[Vertex[K, VV], NV]):
+  Graph[K, NV, EV] = {
+    new Graph[K, NV, EV](jgraph.mapVertices[NV](
+      mapper,
+      createTypeInformation[Vertex[K, NV]]
+    ))
+  }
+
+  /**
+   * Apply a function to the attribute of each vertex in the graph.
+   *
+   * @param fun the map function to apply.
+   * @return a new graph
+   */
+  def mapVertices[NV: TypeInformation : ClassTag](fun: Vertex[K, VV] => NV): Graph[K, NV, EV] = {
+    val mapper: MapFunction[Vertex[K, VV], NV] = new MapFunction[Vertex[K, VV], NV] {
+      val cleanFun = clean(fun)
+
+      def map(in: Vertex[K, VV]): NV = cleanFun(in)
+    }
+    new Graph[K, NV, EV](jgraph.mapVertices[NV](mapper, createTypeInformation[Vertex[K, NV]]))
+  }
+
+  /**
+   * Apply a function to the attribute of each edge in the graph.
+   *
+   * @param mapper the map function to apply.
+   * @return a new graph
+   */
+  def mapEdges[NV: TypeInformation : ClassTag](mapper: MapFunction[Edge[K, EV], NV]): Graph[K,
+    VV, NV] = {
+    new Graph[K, VV, NV](jgraph.mapEdges[NV](
+      mapper,
+      createTypeInformation[Edge[K, NV]]
+    ))
+  }
+
+  /**
+   * Apply a function to the attribute of each edge in the graph.
+   *
+   * @param fun the map function to apply.
+   * @return a new graph
+   */
+  def mapEdges[NV: TypeInformation : ClassTag](fun: Edge[K, EV] => NV): Graph[K, VV, NV] = {
+    val mapper: MapFunction[Edge[K, EV], NV] = new MapFunction[Edge[K, EV], NV] {
+      val cleanFun = clean(fun)
+
+      def map(in: Edge[K, EV]): NV = cleanFun(in)
+    }
+    new Graph[K, VV, NV](jgraph.mapEdges[NV](mapper, createTypeInformation[Edge[K, NV]]))
+  }
+
+  /**
+   * Joins the vertex DataSet of this graph with an input DataSet and applies
+   * a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @return a new graph where the vertex values have been updated.
+   */
+  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper: MapFunction[
+    (VV, T), VV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+      override def map(value: jtuple.Tuple2[VV, T]): VV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the vertex DataSet of this graph with an input DataSet and applies
+   * a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @return a new graph where the vertex values have been updated.
+   */
+  def joinWithVertices[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (VV, T) => VV):
+  Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[VV, T], VV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[VV, T]): VV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithVertices[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on a composite key of both
+   * source and target and applies a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], mapper: MapFunction[
+    (EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
+      scalatuple._2, scalatuple._3)).javaSet
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on a composite key of both
+   * source and target and applies a UDF on the resulted values.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdges[T: TypeInformation](inputDataSet: DataSet[(K, K, T)], fun: (EV, T) => EV):
+  Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple3(scalatuple._1,
+      scalatuple._2, scalatuple._3)).javaSet
+    wrapGraph(jgraph.joinWithEdges[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the source key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. In case the inputDataSet contains the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
+  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the source key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. In case the inputDataSet contains the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnSource[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
+    EV): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnSource[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the target key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. Should the inputDataSet contain the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param mapper the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], mapper:
+  MapFunction[(EV, T), EV]): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        mapper.map((value.f0, value.f1))
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Joins the edge DataSet with an input DataSet on the target key of the
+   * edges and the first attribute of the input DataSet and applies a UDF on
+   * the resulted values. Should the inputDataSet contain the same key more
+   * than once, only the first value will be considered.
+   *
+   * @param inputDataSet the DataSet to join with.
+   * @param fun the UDF map function to apply.
+   * @tparam T the return type
+   * @return a new graph where the edge values have been updated.
+   */
+  def joinWithEdgesOnTarget[T: TypeInformation](inputDataSet: DataSet[(K, T)], fun: (EV, T) =>
+    EV): Graph[K, VV, EV] = {
+    val newmapper = new MapFunction[jtuple.Tuple2[EV, T], EV]() {
+      val cleanFun = clean(fun)
+
+      override def map(value: jtuple.Tuple2[EV, T]): EV = {
+        cleanFun(value.f0, value.f1)
+      }
+    }
+    val javaTupleSet = inputDataSet.map(scalatuple => new jtuple.Tuple2(scalatuple._1,
+      scalatuple._2)).javaSet
+    wrapGraph(jgraph.joinWithEdgesOnTarget[T](javaTupleSet, newmapper))
+  }
+
+  /**
+   * Apply filtering functions to the graph and return a sub-graph that
+   * satisfies the predicates for both vertices and edges.
+   *
+   * @param vertexFilter the filter function for vertices.
+   * @param edgeFilter the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def subgraph(vertexFilter: FilterFunction[Vertex[K, VV]], edgeFilter: FilterFunction[Edge[K,
+    EV]]) = {
+    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+  }
+
+  /**
+   * Apply filtering functions to the graph and return a sub-graph that
+   * satisfies the predicates for both vertices and edges.
+   *
+   * @param vertexFilterFun the filter function for vertices.
+   * @param edgeFilterFun the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def subgraph(vertexFilterFun: Vertex[K, VV] => Boolean, edgeFilterFun: Edge[K, EV] =>
+    Boolean) = {
+    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+      val cleanVertexFun = clean(vertexFilterFun)
+
+      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
+    }
+
+    val edgeFilter = new FilterFunction[Edge[K, EV]] {
+      val cleanEdgeFun = clean(edgeFilterFun)
+
+      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
+    }
+
+    wrapGraph(jgraph.subgraph(vertexFilter, edgeFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the vertices.
+   *
+   * @param vertexFilter the filter function for vertices.
+   * @return the resulting sub-graph.
+   */
+  def filterOnVertices(vertexFilter: FilterFunction[Vertex[K, VV]]) = {
+    wrapGraph(jgraph.filterOnVertices(vertexFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the vertices.
+   *
+   * @param vertexFilterFun the filter function for vertices.
+   * @return the resulting sub-graph.
+   */
+  def filterOnVertices(vertexFilterFun: Vertex[K, VV] => Boolean) = {
+    val vertexFilter = new FilterFunction[Vertex[K, VV]] {
+      val cleanVertexFun = clean(vertexFilterFun)
+
+      override def filter(value: Vertex[K, VV]): Boolean = cleanVertexFun(value)
+    }
+
+    wrapGraph(jgraph.filterOnVertices(vertexFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the edges.
+   *
+   * @param edgeFilter the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def filterOnEdges(edgeFilter: FilterFunction[Edge[K, EV]]) = {
+    wrapGraph(jgraph.filterOnEdges(edgeFilter))
+  }
+
+  /**
+   * Apply a filtering function to the graph and return a sub-graph that
+   * satisfies the predicates only for the edges.
+   *
+   * @param edgeFilterFun the filter function for edges.
+   * @return the resulting sub-graph.
+   */
+  def filterOnEdges(edgeFilterFun: Edge[K, EV] => Boolean) = {
+    val edgeFilter = new FilterFunction[Edge[K, EV]] {
+      val cleanEdgeFun = clean(edgeFilterFun)
+
+      override def filter(value: Edge[K, EV]): Boolean = cleanEdgeFun(value)
+    }
+
+    wrapGraph(jgraph.filterOnEdges(edgeFilter))
+  }
+
+  /**
+   * Return the in-degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, inDegree>
+   */
+  def inDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * Return the out-degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, outDegree>
+   */
+  def outDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * Return the degree of all vertices in the graph
+   *
+   * @return A DataSet of Tuple2<vertexId, degree>
+   */
+  def getDegrees(): DataSet[(K, Long)] = {
+    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
+  }
+
+  /**
+   * This operation adds all inverse-direction edges to the graph.
+   *
+   * @return the undirected graph.
+   */
+  def getUndirected(): Graph[K, VV, EV] = {
+    new Graph(jgraph.getUndirected)
+  }
+
+  /**
+   * Reverse the direction of the edges in the graph
+   *
+   * @return a new graph with all edges reversed
+   * @throws UnsupportedOperationException
+   */
+  def reverse(): Graph[K, VV, EV] = {
+    new Graph(jgraph.reverse())
+  }
+
+  /**
+   * Compute an aggregate over the edges of each vertex. The function applied
+   * on the edges has access to the vertex value.
+   *
+   * @param edgesFunction the function to apply to the neighborhood
+   * @param direction     the edge direction (in-, out-, all-)
+   * @tparam T           the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction:
+                                                        EdgesFunctionWithVertexValue[K, VV, EV,
+                                                          T], direction: EdgeDirection):
+  DataSet[T] = {
+    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the edges of each vertex. The function applied
+   * on the edges has access to the vertex value.
+   *
+   * @param edgesFunction the function to apply to the neighborhood
+   * @param direction     the edge direction (in-, out-, all-)
+   * @tparam T           the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnEdges[T: TypeInformation : ClassTag](edgesFunction: EdgesFunction[K, EV, T],
+                                                        direction: EdgeDirection): DataSet[T] = {
+    wrap(jgraph.groupReduceOnEdges(edgesFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the neighbors (edges and vertices) of each
+   * vertex. The function applied on the neighbors has access to the vertex
+   * value.
+   *
+   * @param neighborsFunction the function to apply to the neighborhood
+   * @param direction         the edge direction (in-, out-, all-)
+   * @tparam T               the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
+                                                            NeighborsFunctionWithVertexValue[K,
+                                                              VV, EV, T], direction:
+                                                            EdgeDirection): DataSet[T] = {
+    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * Compute an aggregate over the neighbors (edges and vertices) of each
+   * vertex.
+   *
+   * @param neighborsFunction the function to apply to the neighborhood
+   * @param direction         the edge direction (in-, out-, all-)
+   * @tparam T               the output type
+   * @return a dataset of a T
+   */
+  def groupReduceOnNeighbors[T: TypeInformation : ClassTag](neighborsFunction:
+                                                            NeighborsFunction[K, VV, EV, T],
+                                                            direction: EdgeDirection):
+  DataSet[T] = {
+    wrap(jgraph.groupReduceOnNeighbors(neighborsFunction, direction, createTypeInformation[T]))
+  }
+
+  /**
+   * @return a long integer representing the number of vertices
+   */
+  def numberOfVertices(): Long = {
+    jgraph.numberOfVertices()
+  }
+
+  /**
+   * @return a long integer representing the number of edges
+   */
+  def numberOfEdges(): Long = {
+    jgraph.numberOfEdges()
+  }
+
+  /**
+   * @return The IDs of the vertices as DataSet
+   */
+  def getVertexIds(): DataSet[K] = {
+    wrap(jgraph.getVertexIds)
+  }
+
+  /**
+   * @return The IDs of the edges as DataSet
+   */
+  def getEdgeIds(): DataSet[(K, K)] = {
+    wrap(jgraph.getEdgeIds).map(jtuple => (jtuple.f0, jtuple.f1))
+  }
+
+  /**
+   * Adds the input vertex to the graph. If the vertex already
+   * exists in the graph, it will not be added again.
+   *
+   * @param vertex the vertex to be added
+   * @return the new graph containing the existing vertices as well as the one just added
+   */
+  def addVertex(vertex: Vertex[K, VV]) = {
+    wrapGraph(jgraph.addVertex(vertex))
+  }
+
+  /**
+   * Adds the given edge to the graph. If the source and target vertices do
+   * not exist in the graph, they will also be added.
+   *
+   * @param source the source vertex of the edge
+   * @param target the target vertex of the edge
+   * @param edgeValue the edge value
+   * @return the new graph containing the existing vertices and edges plus the
+   *         newly added edge
+   */
+  def addEdge(source: Vertex[K, VV], target: Vertex[K, VV], edgeValue: EV) = {
+    wrapGraph(jgraph.addEdge(source, target, edgeValue))
+  }
+
+  /**
+   * Removes the given vertex and its edges from the graph.
+   *
+   * @param vertex the vertex to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed vertex and its edges
+   */
+  def removeVertex(vertex: Vertex[K, VV]) = {
+    wrapGraph(jgraph.removeVertex(vertex))
+  }
+
+  /**
+   * Removes all edges that match the given edge from the graph.
+   *
+   * @param edge the edge to remove
+   * @return the new graph containing the existing vertices and edges without
+   *         the removed edges
+   */
+  def removeEdge(edge: Edge[K, EV]) = {
+    wrapGraph(jgraph.removeEdge(edge))
+  }
+
+  /**
+   * Performs union on the vertices and edges sets of the input graphs
+   * removing duplicate vertices but maintaining duplicate edges.
+   *
+   * @param graph the graph to perform union with
+   * @return a new graph
+   */
+  def union(graph: Graph[K, VV, EV]) = {
+    wrapGraph(jgraph.union(graph.getWrappedGraph))
+  }
+
+  /**
+   * Compute an aggregate over the neighbor values of each
+   * vertex.
+   *
+   * @param reduceNeighborsFunction the function to apply to the neighborhood
+   * @param direction               the edge direction (in-, out-, all-)
+   * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
+   */
+  def reduceOnNeighbors(reduceNeighborsFunction: ReduceNeighborsFunction[VV], direction:
+  EdgeDirection): DataSet[(K, VV)] = {
+    wrap(jgraph.reduceOnNeighbors(reduceNeighborsFunction, direction)).map(jtuple => (jtuple
+      .f0, jtuple.f1))
+  }
+
+  /**
+   * Compute an aggregate over the edge values of each vertex.
+   *
+   * @param reduceEdgesFunction the function to apply to the neighborhood
+   * @param direction           the edge direction (in-, out-, all-)
+   * @return a Dataset containing one value per vertex(vertex key, aggegate edge value)
+   * @throws IllegalArgumentException
+   */
+  def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
+  DataSet[(K, EV)] = {
+    wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
+      jtuple.f1))
+  }
+
+  def run(algorithm: GraphAlgorithm[K, VV, EV]) = {
+    wrapGraph(jgraph.run(algorithm))
+  }
+
+  /**
+   * Runs a Vertex-Centric iteration on the graph.
+   * No configuration options are provided.
+   *
+   * @param vertexUpdateFunction the vertex update function
+   * @param messagingFunction the messaging function
+   * @param maxIterations maximum number of iterations to perform
+   *
+   * @return the updated Graph after the vertex-centric iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+                                   maxIterations: Int): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+      maxIterations))
+  }
+
+  /**
+   * Runs a Vertex-Centric iteration on the graph with configuration options.
+   *
+   * @param vertexUpdateFunction the vertex update function
+   * @param messagingFunction the messaging function
+   * @param maxIterations maximum number of iterations to perform
+   * @param parameters the iteration configuration parameters
+   *
+   * @return the updated Graph after the vertex-centric iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runVertexCentricIteration[M](vertexUpdateFunction: VertexUpdateFunction[K, VV, M],
+                                   messagingFunction: MessagingFunction[K, VV, M, EV],
+                                   maxIterations: Int, parameters: VertexCentricConfiguration):
+  Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+      maxIterations, parameters))
+  }
+
+  /**
+   * Runs a Gather-Sum-Apply iteration on the graph.
+   * No configuration options are provided.
+   *
+   * @param gatherFunction the gather function collects information about adjacent
+   *                       vertices and edges
+   * @param sumFunction the sum function aggregates the gathered information
+   * @param applyFunction the apply function updates the vertex values with the aggregates
+   * @param maxIterations maximum number of iterations to perform
+   * @tparam M the intermediate type used between gather, sum and apply
+   *
+   * @return the updated Graph after the gather-sum-apply iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int): Graph[K,
+    VV, EV] = {
+    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
+      maxIterations))
+  }
+
+  /**
+   * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
+   *
+   * @param gatherFunction the gather function collects information about adjacent
+   *                       vertices and edges
+   * @param sumFunction the sum function aggregates the gathered information
+   * @param applyFunction the apply function updates the vertex values with the aggregates
+   * @param maxIterations maximum number of iterations to perform
+   * @param parameters the iteration configuration parameters
+   * @tparam M the intermediate type used between gather, sum and apply
+   *
+   * @return the updated Graph after the gather-sum-apply iteration has converged or
+   *         after maximumNumberOfIterations.
+   */
+  def runGatherSumApplyIteration[M](gatherFunction: GatherFunction[VV, EV, M], sumFunction:
+  SumFunction[VV, EV, M], applyFunction: ApplyFunction[K, VV, M], maxIterations: Int,
+                                    parameters: GSAConfiguration): Graph[K, VV, EV] = {
+    wrapGraph(jgraph.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
+      maxIterations, parameters))
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
new file mode 100644
index 0000000..ca15dab
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunction.scala
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import org.apache.flink.api.java.tuple.Tuple3
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+
+abstract class NeighborsFunction[K, VV, EV, T] extends org.apache.flink.graph
+.NeighborsFunction[K, VV, EV, T] {
+
+  def iterateNeighbors(neighbors: Iterable[(K, Edge[K, EV], Vertex[K, VV])], out: Collector[T])
+
+  override def iterateNeighbors(neighbors: java.lang.Iterable[Tuple3[K, Edge[K, EV], Vertex[K,
+    VV]]], out: Collector[T]) = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
+      .map(jtuple => (jtuple.f0, jtuple.f1, jtuple.f2))
+    iterateNeighbors(scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
new file mode 100644
index 0000000..cefc277
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/NeighborsFunctionWithVertexValue.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala
+
+import java.lang
+
+import org.apache.flink.api.java.tuple.Tuple2
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.util.Collector
+
+
+abstract class NeighborsFunctionWithVertexValue[K, VV, EV, T] extends org.apache.flink.graph
+.NeighborsFunctionWithVertexValue[K, VV, EV, T] {
+
+  def iterateNeighbors(vertex: Vertex[K, VV], neighbors: Iterable[(Edge[K, EV], Vertex[K, VV])
+    ], out: Collector[T]): Unit
+
+  override def iterateNeighbors(vertex: Vertex[K, VV], neighbors: lang.Iterable[Tuple2[Edge[K,
+    EV], Vertex[K, VV]]], out: Collector[T]): Unit = {
+    val scalaIterable = scala.collection.JavaConversions.iterableAsScalaIterable(neighbors)
+      .map(jtuple => (jtuple.f0, jtuple.f1))
+    iterateNeighbors(vertex, scalaIterable, out)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
new file mode 100644
index 0000000..159a100
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/package.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.graph.{Graph => JGraph}
+
+import _root_.scala.reflect.ClassTag
+
+
+package object scala {
+  private[flink] def wrapGraph[K: TypeInformation : ClassTag, VV: TypeInformation : ClassTag,
+  EV: TypeInformation : ClassTag](javagraph: JGraph[K, VV, EV]) = new Graph[K, VV, EV](javagraph)
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
new file mode 100644
index 0000000..0d7d2af
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/EdgeToTuple3Map.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Edge
+
+class EdgeToTuple3Map[K, EV] extends MapFunction[Edge[K, EV], (K, K, EV)] {
+
+  private val serialVersionUID: Long = 1L
+
+  override def map(value: Edge[K, EV]): (K, K, EV) = {
+    (value.getSource, value.getTarget, value.getValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
new file mode 100644
index 0000000..de77832
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/utils/VertexToTuple2Map.scala
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.utils
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.graph.Vertex
+
+class VertexToTuple2Map[K, VV] extends MapFunction[Vertex[K, VV], (K, VV)] {
+
+  private val serialVersionUID: Long = 1L
+
+  override def map(value: Vertex[K, VV]): (K, VV) = {
+    (value.getId, value.getValue)
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
new file mode 100644
index 0000000..1c2cf54
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/TestGraphUtils.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.{Edge, Vertex}
+
+object TestGraphUtils {
+
+    def getLongLongVertexData(env: ExecutionEnvironment): DataSet[Vertex[Long, Long]] = {
+        return env.fromCollection(getLongLongVertices)
+    }
+
+    def getLongLongEdgeData(env: ExecutionEnvironment): DataSet[Edge[Long, Long]] = {
+        return env.fromCollection(getLongLongEdges)
+    }
+
+    def getLongLongVertices: List[Vertex[Long, Long]] = {
+        List(
+            new Vertex[Long, Long](1L, 1L),
+            new Vertex[Long, Long](2L, 2L),
+            new Vertex[Long, Long](3L, 3L),
+            new Vertex[Long, Long](4L, 4L),
+            new Vertex[Long, Long](5L, 5L)
+        )
+    }
+
+    def getLongLongEdges: List[Edge[Long, Long]] = {
+        List(
+            new Edge[Long, Long](1L, 2L, 12L),
+            new Edge[Long, Long](1L, 3L, 13L),
+            new Edge[Long, Long](2L, 3L, 23L),
+            new Edge[Long, Long](3L, 4L, 34L),
+            new Edge[Long, Long](3L, 5L, 35L),
+            new Edge[Long, Long](4L, 5L, 45L),
+            new Edge[Long, Long](5L, 1L, 51L)
+        )
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
new file mode 100644
index 0000000..98dbbe9
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/DegreesITCase.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class DegreesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testInDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.inDegrees().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,1\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,2\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testOutDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.outDegrees().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2\n" + "2,1\n" + "3,2\n" + "4,1\n" + "5,1\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testGetDegrees {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.getDegrees().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,3\n" + "2,2\n" + "3,4\n" + "4,2\n" + "5,3\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
new file mode 100644
index 0000000..687b0a7
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphMutationsITCase.scala
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class GraphMutationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+
+    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertexExisting {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addVertex(new Vertex[Long, Long](1L, 1L))
+    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddVertexNoEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addVertex(new Vertex[Long, Long](6L, 6L))
+    newgraph.getVerticesAsTuple2.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,1\n" + "2,2\n" + "3,3\n" + "4,4\n" + "5,5\n" + "6,6\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertex(new Vertex[Long, Long](5L, 5L))
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveInvalidVertex {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeVertex(new Vertex[Long, Long](6L, 6L))
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdge(new Vertex[Long, Long](6L, 6L), new Vertex[Long, Long](1L,
+      1L), 61L)
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n" + "6,1,61\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testAddExistingEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.addEdge(new Vertex[Long, Long](1L, 1L), new Vertex[Long, Long](2L,
+      2L), 12L)
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5," +
+      "35\n" + "4,5,45\n" + "5,1,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdge(new Edge[Long, Long](5L, 1L, 51L))
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testRemoveInvalidEdge {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val newgraph = graph.removeEdge(new Edge[Long, Long](6L, 1L, 61L))
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
new file mode 100644
index 0000000..d49e565
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/GraphOperationsITCase.scala
@@ -0,0 +1,238 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.functions.FilterFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.{Edge, Vertex}
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class GraphOperationsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testUndirected {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.getUndirected().getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "2,1,12\n" + "1,3,13\n" + "3,1,13\n" + "2,3,23\n" + "3,2," +
+      "23\n" + "3,4,34\n" + "4,3,34\n" + "3,5,35\n" + "5,3,35\n" + "4,5,45\n" + "5,4,45\n" +
+      "5,1,51\n" + "1,5,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testReverse {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.reverse().getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "2,1,12\n" + "3,1,13\n" + "3,2,23\n" + "4,3,34\n" + "5,3,35\n" + "5,4," +
+      "45\n" + "1,5,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSubGraph {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.subgraph(new FilterFunction[Vertex[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(vertex: Vertex[Long, Long]): Boolean = {
+        return (vertex.getValue > 2)
+      }
+    }, new FilterFunction[Edge[Long, Long]] {
+
+      @throws(classOf[Exception])
+      override def filter(edge: Edge[Long, Long]): Boolean = {
+        return (edge.getValue > 34)
+      }
+    }).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,5,35\n" + "4,5,45\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testSubGraphSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.subgraph(
+      vertex => vertex.getValue > 2,
+      edge => edge.getValue > 34
+    ).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,5,35\n" + "4,5,45\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.filterOnVertices(new FilterFunction[Vertex[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(vertex: Vertex[Long, Long]): Boolean = {
+        vertex.getValue > 2
+      }
+    }).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnVerticesSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.filterOnVertices(
+      vertex => vertex.getValue > 2
+    ).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,4,34\n" + "3,5,35\n" + "4,5,45\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.filterOnEdges(new FilterFunction[Edge[Long, Long]] {
+      @throws(classOf[Exception])
+      def filter(edge: Edge[Long, Long]): Boolean = {
+        edge.getValue > 34
+      }
+    }).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testFilterOnEdgesSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.filterOnEdges(
+      edge => edge.getValue > 34
+    ).getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "3,5,35\n" + "4,5,45\n" + "5,1,51\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testNumberOfVertices {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    env.fromElements(graph.numberOfVertices).writeAsText(resultPath)
+    env.execute
+    expectedResult = "5"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testNumberOfEdges {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    env.fromElements(graph.numberOfEdges).writeAsText(resultPath)
+    env.execute
+    expectedResult = "7"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testVertexIds {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.getVertexIds.writeAsText(resultPath)
+    env.execute
+    expectedResult = "1\n2\n3\n4\n5\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testEdgesIds {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    graph.getEdgeIds.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2\n" + "1,3\n" + "2,3\n" + "3,4\n" + "3,5\n" + "4,5\n" + "5,1\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testUnion {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val vertices: List[Vertex[Long, Long]] = List[Vertex[Long, Long]](
+      new Vertex[Long, Long](6L, 6L)
+    )
+    val edges: List[Edge[Long, Long]] = List[Edge[Long, Long]](
+      new Edge[Long, Long](6L, 1L, 61L)
+    )
+
+    val newgraph = graph.union(Graph.fromCollection(vertices, edges, env))
+    newgraph.getEdgesAsTuple3.writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,12\n" + "1,3,13\n" + "2,3,23\n" + "3,4,34\n" + "3,5,35\n" + "4,5," +
+      "45\n" + "5,1,51\n" + "6,1,61\n"
+  }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/240e8895/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
new file mode 100644
index 0000000..e19463e
--- /dev/null
+++ b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.scala.test.operations
+
+import org.apache.flink.api.common.functions.MapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.Edge
+import org.apache.flink.graph.scala._
+import org.apache.flink.graph.scala.test.TestGraphUtils
+import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
+import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
+import org.junit.rules.TemporaryFolder
+import org.junit.runner.RunWith
+import org.junit.runners.Parameterized
+import org.junit.{After, Before, Rule, Test}
+
+@RunWith(classOf[Parameterized])
+class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
+MultipleProgramsTestBase(mode) {
+
+  private var resultPath: String = null
+  private var expectedResult: String = null
+
+  var tempFolder: TemporaryFolder = new TemporaryFolder()
+
+  @Rule
+  def getFolder(): TemporaryFolder = {
+    tempFolder;
+  }
+
+  @Before
+  @throws(classOf[Exception])
+  def before {
+    resultPath = tempFolder.newFile.toURI.toString
+  }
+
+  @After
+  @throws(classOf[Exception])
+  def after {
+    TestBaseUtils.compareResultsByLinesInMemory(expectedResult, resultPath)
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesInputDataset {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+        EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesInputDatasetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
+        EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnSource {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnSourceSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
+      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," +
+      "90\n" + "5,1,102\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnTarget {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "80\n" + "5,1,102\n"
+  }
+
+  @Test
+  @throws(classOf[Exception])
+  def testWithEdgesOnTargetSugar {
+    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
+      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
+    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
+      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
+      originalValue + tupleValue)
+    result.getEdgesAsTuple3().writeAsCsv(resultPath)
+    env.execute
+    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," +
+      "80\n" + "5,1,102\n"
+  }
+
+
+  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
+    @throws(classOf[Exception])
+    def map(tuple: (Long, Long)): Long = {
+      tuple._1 + tuple._2
+    }
+  }
+
+  final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): (Long, Long) = {
+      (edge.getSource, edge.getValue)
+    }
+  }
+
+  final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] {
+    @throws(classOf[Exception])
+    def map(edge: Edge[Long, Long]): (Long, Long) = {
+      (edge.getTarget, edge.getValue)
+    }
+  }
+
+}


Mime
View raw message