flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [11/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there
Date Fri, 09 Oct 2015 16:05:49 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
deleted file mode 100644
index 3dc90fc..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.scala.utils.EdgeToTuple3Map
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesInputDataset {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
-        EdgeToTuple3Map[Long, Long]), new AddValuesMapper)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5,"
+
-      "90\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesInputDatasetSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new
-        EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) =>
-      originalValue + tupleValue)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5,"
+
-      "90\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesOnSource {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
-      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
-      originalValue + tupleValue)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5,"
+
-      "90\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesOnSourceSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges
-      .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) =>
-      originalValue + tupleValue)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5,"
+
-      "90\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesOnTarget {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
-      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
-      originalValue + tupleValue)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5,"
+
-      "80\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithEdgesOnTargetSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges
-      .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) =>
-      originalValue + tupleValue)
-    val res = result.getEdges.collect().toList
-    expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5,"
+
-      "80\n" + "5,1,102\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-
-  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
-    @throws(classOf[Exception])
-    def map(tuple: (Long, Long)): Long = {
-      tuple._1 + tuple._2
-    }
-  }
-
-  final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)]
{
-    @throws(classOf[Exception])
-    def map(edge: Edge[Long, Long]): (Long, Long) = {
-      (edge.getSource, edge.getValue)
-    }
-  }
-
-  final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)]
{
-    @throws(classOf[Exception])
-    def map(edge: Edge[Long, Long]): (Long, Long) = {
-      (edge.getTarget, edge.getValue)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
deleted file mode 100644
index 98ee8b6..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.scala.utils.VertexToTuple2Map
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testJoinWithVertexSet {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new
-        VertexToTuple2Map[Long, Long]), new AddValuesMapper)
-    val res = result.getVertices.collect().toList
-    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testJoinWithVertexSetSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long])
-    val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet,
-      (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue)
-    val res = result.getVertices.collect().toList
-    expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-
-  final class AddValuesMapper extends MapFunction[(Long, Long), Long] {
-    @throws(classOf[Exception])
-    def map(tuple: (Long, Long)): Long = {
-      tuple._1 + tuple._2
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
deleted file mode 100644
index bdfd569..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.Edge
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithSameValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList
-    expectedResult = "1,2,13\n" +
-      "1,3,14\n" + "" +
-      "2,3,24\n" +
-      "3,4,35\n" +
-      "3,5,36\n" +
-      "4,5,46\n" +
-      "5,1,52\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithSameValueSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.mapEdges(edge => edge.getValue + 1)
-      .getEdges.collect().toList
-    expectedResult = "1,2,13\n" +
-      "1,3,14\n" + "" +
-      "2,3,24\n" +
-      "3,4,35\n" +
-      "3,5,36\n" +
-      "4,5,46\n" +
-      "5,1,52\n"
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] {
-    @throws(classOf[Exception])
-    def map(edge: Edge[Long, Long]): Long = {
-      edge.getValue + 1
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
deleted file mode 100644
index 2e51d90..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.common.functions.MapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.Vertex
-import org.apache.flink.graph.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends
-MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithSameValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList
-    expectedResult = "1,2\n" +
-      "2,3\n" +
-      "3,4\n" +
-      "4,5\n" +
-      "5,6\n";
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testWithSameValueSugar {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList
-    expectedResult = "1,2\n" +
-      "2,3\n" +
-      "3,4\n" +
-      "4,5\n" +
-      "5,6\n";
-    TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult)
-  }
-
-  final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] {
-    @throws(classOf[Exception])
-    def map(vertex: Vertex[Long, Long]): Long = {
-      vertex.getValue + 1
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
deleted file mode 100644
index dcd1deb..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala
+++ /dev/null
@@ -1,150 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.apache.flink.util.Collector
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testAllNeighborsWithValueGreaterThanFour {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour,
-      EdgeDirection.ALL).collect().toList
-    expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-
-  @Test
-  @throws(classOf[Exception])
-  def testAllNeighbors {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL)
-    .collect().toList
-    expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" +
-      "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" +
-      "(5,1)\n" + "(5,3)\n" + "(5,4)"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testLowestWeightOutNeighborNoValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
-        SelectMinWeightNeighborNoValue, EdgeDirection.OUT)
-    val res = verticesWithLowestOutNeighbor.collect().toList
-    expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testLowestWeightInNeighborNoValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new
-        SelectMinWeightNeighborNoValue, EdgeDirection.IN)
-    val res = verticesWithLowestOutNeighbor.collect().toList
-    expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testMaxWeightAllNeighbors {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new
-        SelectMaxWeightNeighborNoValue, EdgeDirection.ALL)
-    val res = verticesWithMaxEdgeWeight.collect().toList
-    expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long,
-    Long, Long, (Long, Long)] {
-    @throws(classOf[Exception])
-    override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out:
-    Collector[(Long, Long)]): Unit = {
-      for (edge <- edges) {
-        if (v.getValue > 4) {
-          if (v.getId == edge.getTarget) {
-            out.collect((v.getId, edge.getSource))
-          }
-          else {
-            out.collect((v.getId, edge.getTarget))
-          }
-        }
-      }
-    }
-  }
-
-  final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] {
-    @throws(classOf[Exception])
-    override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[
-      (Long, Long)]) {
-      for (edge <- edges) {
-        if (edge._1.equals(edge._2.getTarget)) {
-          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource))
-        }
-        else {
-          out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget))
-        }
-      }
-    }
-  }
-
-  final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
-    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
-      Math.min(firstEdgeValue, secondEdgeValue)
-    }
-  }
-
-  final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] {
-    override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = {
-      Math.max(firstEdgeValue, secondEdgeValue)
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
deleted file mode 100644
index aef5493..0000000
--- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala
+++ /dev/null
@@ -1,126 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.test.operations
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala.test.TestGraphUtils
-import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _}
-import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex}
-import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils}
-import org.apache.flink.util.Collector
-import org.junit.rules.TemporaryFolder
-import org.junit.runner.RunWith
-import org.junit.runners.Parameterized
-import org.junit.{After, Before, Rule, Test}
-import _root_.scala.collection.JavaConverters._
-
-@RunWith(classOf[Parameterized])
-class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode)
-  extends MultipleProgramsTestBase(mode) {
-
-  private var expectedResult: String = null
-
-  @Test
-  @throws(classOf[Exception])
-  def testSumOfAllNeighborsNoValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL)
-    .collect().toList
-    expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testSumOfOutNeighborsNoValue {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList
-    expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testSumOfAllNeighbors {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL)
-    val res = result.collect().toList
-    expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  @Test
-  @throws(classOf[Exception])
-  def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = {
-    val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
-    val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils
-      .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env)
-    val result = graph.groupReduceOnNeighbors(new
-        SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN)
-    val res = result.collect().toList
-    expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" +
"(5,285)"
-    TestBaseUtils.compareResultAsText(res.asJava, expectedResult)
-  }
-
-  final class SumNeighbors extends ReduceNeighborsFunction[Long] {
-    override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = {
-      firstNeighbor + secondNeighbor
-    }
-  }
-
-  final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long,
(Long,
-    Long)] {
-    @throws(classOf[Exception])
-    def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long],
-      Vertex[Long, Long])], out: Collector[(Long, Long)]) {
-      var sum: Long = 0
-      for (neighbor <- neighbors) {
-        sum += neighbor._2.getValue
-      }
-      out.collect((vertex.getId, sum + vertex.getValue))
-    }
-  }
-
-  final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends
-  NeighborsFunction[Long, Long, Long, (Long, Long)] {
-    @throws(classOf[Exception])
-    def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])],
-                         out: Collector[(Long, Long)]) {
-      var sum: Long = 0
-      var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null
-      val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] =
-        neighbors.iterator
-      while (neighborsIterator.hasNext) {
-        next = neighborsIterator.next
-        sum += next._3.getValue * next._2.getValue
-      }
-      if (next._1 > 2) {
-        out.collect(new Tuple2[Long, Long](next._1, sum))
-        out.collect(new Tuple2[Long, Long](next._1, sum * 2))
-      }
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
deleted file mode 100644
index 9dce170..0000000
--- a/flink-staging/flink-gelly/pom.xml
+++ /dev/null
@@ -1,67 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-staging</artifactId>
-		<version>0.10-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-	
-	<artifactId>flink-gelly</artifactId>
-	<name>flink-gelly</name>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-java</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-optimizer</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>com.google.guava</groupId>
-			<artifactId>guava</artifactId>
-			<version>${guava.version}</version>
-		</dependency>
-	</dependencies>
-</project>

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
deleted file mode 100644
index d84badb..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-
-/**
- * An Edge represents a link between two {@link Vertex vertices},
- * the source and the target and can carry an attached value.
- * For edges with no value, use {@link org.apache.flink.types.NullValue} as the value type.
- *
- * @param <K> the key type for the sources and target vertices
- * @param <V> the edge value type
- */
-public class Edge<K, V> extends Tuple3<K, K, V>{
-
-	private static final long serialVersionUID = 1L;
-
-	public Edge(){}
-
-	public Edge(K src, K trg, V val) {
-		this.f0 = src;
-		this.f1 = trg;
-		this.f2 = val;
-	}
-
-	/**
-	 * Reverses the direction of this Edge.
-	 * @return a new Edge, where the source is the original Edge's target
-	 * and the target is the original Edge's source.
-	 */
-	public Edge<K, V> reverse() {
-			return new Edge<K, V>(this.f1, this.f0, this.f2);
-	}
-
-	public void setSource(K src) {
-		this.f0 = src;
-	}
-
-	public K getSource() {
-		return this.f0;
-	}
-
-	public void setTarget(K target) {
-		this.f1 = target;
-	}
-
-	public K getTarget() {
-		return f1;
-	}
-
-	public void setValue(V value) {
-		this.f2 = value;
-	}
-
-	public V getValue() {
-		return f2;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
deleted file mode 100644
index 0a055bb..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-/**
- * The EdgeDirection is used to select a node's neighborhood
- * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
- * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
- * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)},
- * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)},
- * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and
- * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
- * methods.
- */
-public enum EdgeDirection {
-	IN,
-	OUT,
-	ALL
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
deleted file mode 100644
index bf1d6a2..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
- *
- * @param <K> the vertex key type
- * @param <EV> the edge value type
- * @param <O> the type of the return value
- */
-public interface EdgesFunction<K, EV, O> extends Function, Serializable {
-
-	void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O>
out) throws Exception;
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
deleted file mode 100644
index 0b0ab0e..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.util.Collector;
-
-/**
- * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
- * method.
- *
- * @param <K> the vertex key type
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- * @param <O> the type of the return value
- */
-public interface EdgesFunctionWithVertexValue<K, VV, EV, O> extends Function, Serializable
{
-
-	void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O>
out) throws Exception;
-}


Mime
View raw message