flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [5/7] flink git commit: [gelly] refactored tests; removed duplicate data from TestGraphUtils
Date Wed, 04 Mar 2015 20:33:07 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
new file mode 100644
index 0000000..dfb315e
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithEdgesITCase.java
@@ -0,0 +1,520 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithEdgesITCase extends MultipleProgramsTestBase {
+
+	public JoinWithEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testWithEdgesInputDataset() throws Exception {
+		/*
+		 * Test joinWithEdges with the input DataSet parameter identical
+		 * to the edge DataSet
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,68\n" +
+	                "3,5,70\n" +
+	                "4,5,90\n" +
+	                "5,1,102\n";
+    }
+
+	@Test
+	public void testWithLessElements() throws Exception {
+	    /*
+		 * Test joinWithEdges with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet, but of the same type
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new EdgeToTuple3Map<Long, Long>()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testWithLessElementsDifferentType() throws Exception {
+	    /*
+		 * Test joinWithEdges with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet and of a different type(Boolean)
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+                        .map(new BooleanEdgeValueMapper()), new DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testWithNoCommonKeys() throws Exception {
+	    /*
+		 * Test joinWithEdges with the input DataSet containing different keys than the edge DataSet
+		 * - the iterator becomes empty.
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,68\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testWithCustomType() throws Exception {
+	    /*
+	     * Test joinWithEdges with a DataSet containing custom parametrised type input values
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+	                "1,3,20\n" +
+	                "2,3,30\n" +
+	                "3,4,40\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testWithEdgesOnSource() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnSource with the input DataSet parameter identical
+		 * to the edge DataSet
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
+                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,25\n" +
+	                "2,3,46\n" +
+	                "3,4,68\n" +
+	                "3,5,69\n" +
+	                "4,5,90\n" +
+	                "5,1,102\n";
+    }
+
+	@Test
+	public void testOnSourceWithLessElements() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet, but of the same type
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceAndValueMapper()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,25\n" +
+	                "2,3,46\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testOnSourceWithDifferentType() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet and of a different type(Boolean)
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                        .map(new ProjectSourceWithTrueMapper()), new DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testOnSourceWithNoCommonKeys() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnSource with the input DataSet containing different keys than the edge DataSet
+		 * - the iterator becomes empty.
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+	                "1,3,20\n" +
+	                "2,3,60\n" +
+	                "3,4,80\n" +
+	                "3,5,80\n" +
+	                "4,5,120\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testOnSourceWithCustom() throws Exception {
+	    /*
+	     * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type input values
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+	                "1,3,10\n" +
+	                "2,3,30\n" +
+	                "3,4,40\n" +
+	                "3,5,40\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testWithEdgesOnTarget() throws Exception {
+    /*
+	 * Test joinWithEdgesOnTarget with the input DataSet parameter identical
+	 * to the edge DataSet
+	 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
+                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,36\n" +
+	                "3,4,68\n" +
+	                "3,5,70\n" +
+	                "4,5,80\n" +
+	                "5,1,102\n";
+    }
+
+	@Test
+	public void testWithOnTargetWithLessElements() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet, but of the same type
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetAndValueMapper()), new AddValuesMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,36\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testOnTargetWithDifferentType() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+		 * less elements than the edge DataSet and of a different type(Boolean)
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                        .map(new ProjectTargetWithTrueMapper()), new DoubleIfTrueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,24\n" +
+	                "1,3,26\n" +
+	                "2,3,46\n" +
+	                "3,4,34\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@Test
+	public void testOnTargetWithNoCommonKeys() throws Exception {
+	    /*
+		 * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the edge DataSet
+		 * - the iterator becomes empty.
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+                new DoubleValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,20\n" +
+	                "1,3,40\n" +
+	                "2,3,40\n" +
+	                "3,4,80\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,140\n";
+    }
+
+	@Test
+	public void testOnTargetWithCustom() throws Exception {
+	    /*
+	     * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type input values
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+                new CustomValueMapper());
+
+        result.getEdges().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2,10\n" +
+	                "1,3,20\n" +
+	                "2,3,20\n" +
+	                "3,4,40\n" +
+	                "3,5,35\n" +
+	                "4,5,45\n" +
+	                "5,1,51\n";
+    }
+
+	@SuppressWarnings("serial")
+	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+		public Long map(Tuple2<Long, Long> tuple) throws Exception {
+			return tuple.f0 + tuple.f1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class BooleanEdgeValueMapper implements MapFunction<Edge<Long, Long>, Tuple3<Long, Long, Boolean>> {
+        public Tuple3<Long, Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
+                    edge.getTarget(), true);
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
+            }
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class DoubleValueMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1 * 2;
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class ProjectSourceAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getSource(), edge.getValue());
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class ProjectSourceWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Boolean>(edge.getSource(), true);
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class ProjectTargetAndValueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Long>> {
+        public Tuple2<Long, Long> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Long>(edge.getTarget(), edge.getValue());
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class ProjectTargetWithTrueMapper implements MapFunction<Edge<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Edge<Long, Long> edge) throws Exception {
+            return new Tuple2<Long, Boolean>(edge.getTarget(), true);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
new file mode 100644
index 0000000..28a0441
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/JoinWithVerticesITCase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class JoinWithVerticesITCase extends MultipleProgramsTestBase {
+
+	public JoinWithVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testJoinWithVertexSet() throws Exception {
+		/*
+		 * Test joinWithVertices with the input DataSet parameter identical
+		 * to the vertex DataSet
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices()
+                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+       expectedResult = "1,2\n" +
+	                "2,4\n" +
+	                "3,6\n" +
+	                "4,8\n" +
+	                "5,10\n";
+    }
+
+	@Test
+	public void testWithLessElements() throws Exception {
+	/*
+	 * Test joinWithVertices with the input DataSet passed as a parameter containing
+	 * less elements than the vertex DataSet, but of the same type
+	 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new VertexToTuple2Map<Long, Long>()), new AddValuesMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+	                "2,4\n" +
+	                "3,6\n" +
+	                "4,4\n" +
+	                "5,5\n";
+    }
+
+	@Test
+	public void testWithDifferentType() throws Exception {
+	/*
+	 * Test joinWithVertices with the input DataSet passed as a parameter containing
+	 * less elements than the vertex DataSet and of a different type(Boolean)
+	 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithVertices(graph.getVertices().first(3)
+                        .map(new ProjectIdWithTrue()), new DoubleIfTrueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,2\n" +
+	                "2,4\n" +
+	                "3,6\n" +
+	                "4,4\n" +
+	                "5,5\n";
+    }
+
+	@Test
+	public void testWithDifferentKeys() throws Exception {
+		/*
+		 * Test joinWithVertices with an input DataSet containing different keys than the vertex DataSet
+		 * - the iterator becomes empty.
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongLongTuple2Data(env),
+                new ProjectSecondMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+	                "2,20\n" +
+	                "3,30\n" +
+	                "4,40\n" +
+	                "5,5\n";
+    }
+
+	@Test
+	public void testWithCustomType() throws Exception {
+		/*
+		 * Test joinWithVertices with a DataSet containing custom parametrised type input values
+		 */
+        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+        Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+                TestGraphUtils.getLongLongEdgeData(env), env);
+
+        Graph<Long, Long, Long> result = graph.joinWithVertices(TestGraphUtils.getLongCustomTuple2Data(env),
+                new CustomValueMapper());
+
+        result.getVertices().writeAsCsv(resultPath);
+        env.execute();
+
+        expectedResult = "1,10\n" +
+	                "2,20\n" +
+	                "3,30\n" +
+	                "4,40\n" +
+	                "5,5\n";
+    }
+
+	@SuppressWarnings("serial")
+	private static final class AddValuesMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+		public Long map(Tuple2<Long, Long> tuple) throws Exception {
+			return tuple.f0 + tuple.f1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ProjectIdWithTrue implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Boolean>> {
+        public Tuple2<Long, Boolean> map(Vertex<Long, Long> vertex) throws Exception {
+            return new Tuple2<Long, Boolean>(vertex.getId(), true);
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class DoubleIfTrueMapper implements MapFunction<Tuple2<Long, Boolean>, Long> {
+        public Long map(Tuple2<Long, Boolean> tuple) throws Exception {
+            if(tuple.f1) {
+                return tuple.f0 * 2;
+            }
+            else {
+                return tuple.f0;
+            }
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class ProjectSecondMapper implements MapFunction<Tuple2<Long, Long>, Long> {
+        public Long map(Tuple2<Long, Long> tuple) throws Exception {
+            return tuple.f1;
+        }
+    }
+
+	@SuppressWarnings("serial")
+	private static final class CustomValueMapper implements MapFunction<Tuple2<Long, DummyCustomParameterizedType<Float>>, Long> {
+        public Long map(Tuple2<Long, DummyCustomParameterizedType<Float>> tuple) throws Exception {
+            return (long) tuple.f1.getIntField();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
new file mode 100644
index 0000000..a5c01cf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapEdgesITCase.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapEdgesITCase extends MultipleProgramsTestBase {
+
+	public MapEdgesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testWithSameValue() throws Exception {
+		/*
+		 * Test mapEdges() keeping the same value type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Edge<Long, Long>> mappedEdges = graph.mapEdges(new AddOneMapper()).getEdges();
+		
+		mappedEdges.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,2,13\n" +
+				"1,3,14\n" +
+				"2,3,24\n" +
+				"3,4,35\n" +
+				"3,5,36\n" + 
+				"4,5,46\n" + 
+				"5,1,52\n";
+	}
+
+	@Test
+	public void testWithStringValue() throws Exception {
+		/*
+		 * Test mapEdges() and change the value type to String
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Edge<Long, String>> mappedEdges = graph.mapEdges(new ToStringMapper()).getEdges();
+		
+		mappedEdges.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,2,string(12)\n" +
+				"1,3,string(13)\n" +
+				"2,3,string(23)\n" +
+				"3,4,string(34)\n" +
+				"3,5,string(35)\n" + 
+				"4,5,string(45)\n" + 
+				"5,1,string(51)\n";
+	}
+
+	@Test
+	public void testWithTuple1Type() throws Exception {
+		/*
+		 * Test mapEdges() and change the value type to a Tuple1
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Edge<Long, Tuple1<Long>>> mappedEdges = graph.mapEdges(new ToTuple1Mapper()).getEdges();
+		
+		mappedEdges.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,(12)\n" +
+				"1,3,(13)\n" +
+				"2,3,(23)\n" +
+				"3,4,(34)\n" +
+				"3,5,(35)\n" + 
+				"4,5,(45)\n" + 
+				"5,1,(51)\n";
+	}
+
+	@Test
+	public void testWithCustomType() throws Exception {
+		/*
+		 * Test mapEdges() and change the value type to a custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Edge<Long, DummyCustomType>> mappedEdges = graph.mapEdges(new ToCustomTypeMapper()).getEdges();
+		
+		mappedEdges.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2,(T,12)\n" +
+			"1,3,(T,13)\n" +
+			"2,3,(T,23)\n" +
+			"3,4,(T,34)\n" +
+			"3,5,(T,35)\n" + 
+			"4,5,(T,45)\n" + 
+			"5,1,(T,51)\n";
+	}
+
+	@Test
+	public void testWithParametrizedCustomType() throws Exception {
+		/*
+		 * Test mapEdges() and change the value type to a parameterized custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Edge<Long, DummyCustomParameterizedType<Double>>> mappedEdges = graph.mapEdges(
+				new ToCustomParametrizedTypeMapper()).getEdges();
+		
+		mappedEdges.writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,2,(12.0,12)\n" +
+			"1,3,(13.0,13)\n" +
+			"2,3,(23.0,23)\n" +
+			"3,4,(34.0,34)\n" +
+			"3,5,(35.0,35)\n" + 
+			"4,5,(45.0,45)\n" + 
+			"5,1,(51.0,51)\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AddOneMapper implements MapFunction<Edge<Long, Long>, Long> {
+		public Long map(Edge<Long, Long> edge) throws Exception {
+			return edge.getValue()+1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToStringMapper implements MapFunction<Edge<Long, Long>, String> {
+		public String map(Edge<Long, Long> edge) throws Exception {
+			return String.format("string(%d)", edge.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToTuple1Mapper implements MapFunction<Edge<Long, Long>, Tuple1<Long>> {
+		public Tuple1<Long> map(Edge<Long, Long> edge) throws Exception {
+			Tuple1<Long> tupleValue = new Tuple1<Long>();
+			tupleValue.setFields(edge.getValue());
+			return tupleValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToCustomTypeMapper implements MapFunction<Edge<Long, Long>, DummyCustomType> {
+		public DummyCustomType map(Edge<Long, Long> edge) throws Exception {
+			DummyCustomType dummyValue = new DummyCustomType();
+			dummyValue.setIntField(edge.getValue().intValue());						
+			return dummyValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Edge<Long, Long>, 
+		DummyCustomParameterizedType<Double>> {
+
+		public DummyCustomParameterizedType<Double> map(Edge<Long, Long> edge) throws Exception {
+			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+			dummyValue.setIntField(edge.getValue().intValue());
+			dummyValue.setTField(new Double(edge.getValue()));						
+			return dummyValue;
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
new file mode 100644
index 0000000..0d92fc9
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/MapVerticesITCase.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType;
+import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class MapVerticesITCase extends MultipleProgramsTestBase {
+
+	public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testWithSameValue() throws Exception {
+		/*
+		 * Test mapVertices() keeping the same value type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, Long>> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,2\n" +
+			"2,3\n" +
+			"3,4\n" +
+			"4,5\n" +
+			"5,6\n";
+	}
+
+	@Test
+	public void testWithStringValue() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to String
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, String>> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,one\n" +
+			"2,two\n" +
+			"3,three\n" +
+			"4,four\n" +
+			"5,five\n";
+	}
+
+	@Test
+	public void testWithtuple1Value() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a Tuple1
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, Tuple1<Long>>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,(1)\n" +
+			"2,(2)\n" +
+			"3,(3)\n" +
+			"4,(4)\n" +
+			"5,(5)\n";
+	}
+
+	@Test
+	public void testWithCustomType() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, DummyCustomType>> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,(T,1)\n" +
+			"2,(T,2)\n" +
+			"3,(T,3)\n" +
+			"4,(T,4)\n" +
+			"5,(T,5)\n";
+	}
+
+	@Test
+	public void testWithCustomParametrizedType() throws Exception {
+		/*
+		 * Test mapVertices() and change the value type to a parameterized custom type
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+		
+		DataSet<Vertex<Long, DummyCustomParameterizedType<Double>>> mappedVertices = graph.mapVertices(
+				new ToCustomParametrizedTypeMapper()).getVertices();
+		
+		mappedVertices.writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,(1.0,1)\n" +
+			"2,(2.0,2)\n" +
+			"3,(3.0,3)\n" +
+			"4,(4.0,4)\n" +
+			"5,(5.0,5)\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AddOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+		public Long map(Vertex<Long, Long> value) throws Exception {
+			return value.getValue()+1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToStringMapper implements MapFunction<Vertex<Long, Long>, String> {
+		public String map(Vertex<Long, Long> vertex) throws Exception {
+			String stringValue;
+			if (vertex.getValue() == 1) {
+				stringValue = "one";
+			}
+			else if (vertex.getValue() == 2) {
+				stringValue = "two";
+			}
+			else if (vertex.getValue() == 3) {
+				stringValue = "three";
+			}
+			else if (vertex.getValue() == 4) {
+				stringValue = "four";
+			}
+			else if (vertex.getValue() == 5) {
+				stringValue = "five";
+			}
+			else {
+				stringValue = "";
+			}
+			return stringValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToTuple1Mapper implements MapFunction<Vertex<Long, Long>, Tuple1<Long>> {
+		public Tuple1<Long> map(Vertex<Long, Long> vertex) throws Exception {
+			Tuple1<Long> tupleValue = new Tuple1<Long>();
+			tupleValue.setFields(vertex.getValue());
+			return tupleValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToCustomTypeMapper implements MapFunction<Vertex<Long, Long>, DummyCustomType> {
+		public DummyCustomType map(Vertex<Long, Long> vertex) throws Exception {
+			DummyCustomType dummyValue = new DummyCustomType();
+			dummyValue.setIntField(vertex.getValue().intValue());						
+			return dummyValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ToCustomParametrizedTypeMapper implements MapFunction<Vertex<Long, Long>, 
+		DummyCustomParameterizedType<Double>> {
+		
+		public DummyCustomParameterizedType<Double> map(Vertex<Long, Long> vertex) throws Exception {
+			DummyCustomParameterizedType<Double> dummyValue = new DummyCustomParameterizedType<Double>();
+			dummyValue.setIntField(vertex.getValue().intValue());
+			dummyValue.setTField(new Double(vertex.getValue()));						
+			return dummyValue;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
new file mode 100644
index 0000000..73dec2a
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -0,0 +1,318 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.EdgesFunctionWithVertexValue;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
+
+	public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testLowestWeightOutNeighbor() throws Exception {
+		/*
+		 * Get the lowest-weight out-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,2\n" +
+				"2,3\n" + 
+				"3,4\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testLowestWeightInNeighbor() throws Exception {
+		/*
+		 * Get the lowest-weight in-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+					"2,1\n" + 
+					"3,1\n" +
+					"4,3\n" + 
+					"5,3\n";
+	}
+
+	@Test
+	public void testMaxWeightEdge() throws Exception {
+		/*
+		 * Get the maximum weight among all edges
+		 * of a vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+				graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,51\n" +
+				"2,23\n" + 
+				"3,35\n" +
+				"4,45\n" + 
+				"5,51\n";
+	}
+
+	@Test
+	public void testLowestWeightOutNeighborNoValue() throws Exception {
+		/*
+		 * Get the lowest-weight out-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"2,3\n" + 
+				"3,4\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testLowestWeightInNeighborNoValue() throws Exception {
+		/*
+		 * Get the lowest-weight in-neighbor
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
+				graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,1\n" + 
+				"3,1\n" +
+				"4,3\n" + 
+				"5,3\n";
+	}
+
+	@Test
+	public void testMaxWeightAllNeighbors() throws Exception {
+		/*
+		 * Get the maximum weight among all edges
+		 * of a vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
+				graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL);
+		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,51\n" +
+				"2,23\n" + 
+				"3,35\n" +
+				"4,45\n" + 
+				"5,51\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(
+				Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
+			
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() < weight) {
+					weight = edge.getValue();
+					minNeighorId = edge.getTarget();
+				}
+			}
+			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
+			
+			long weight = Long.MIN_VALUE;
+
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() > weight) {
+					weight = edge.getValue();
+				}
+			}
+			return new Tuple2<Long, Long>(v.getId(), weight);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() < weight) {
+					weight = edge.f1.getValue();
+					minNeighorId = edge.f1.getTarget();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
+			}
+			return new Tuple2<Long, Long>(vertexId, minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+			
+			long weight = Long.MIN_VALUE;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() > weight) {
+					weight = edge.f1.getValue();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
+			}
+			return new Tuple2<Long, Long>(vertexId, weight);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(
+				Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges) {
+			
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() < weight) {
+					weight = edge.getValue();
+					minNeighorId = edge.getSource();
+				}
+			}
+			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+			
+			long weight = Long.MAX_VALUE;
+			long minNeighorId = 0;
+			long vertexId = -1;
+			long i=0;
+
+			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+				if (edge.f1.getValue() < weight) {
+					weight = edge.f1.getValue();
+					minNeighorId = edge.f1.getSource();
+				}
+				if (i==0) {
+					vertexId = edge.f0;
+				} i++;
+			}
+			return new Tuple2<Long, Long>(vertexId, minNeighorId);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
new file mode 100644
index 0000000..c1e982f
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.operations;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.NeighborsFunction;
+import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.test.TestGraphUtils;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
+
+	public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testSumOfOutNeighbors() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testSumOfInNeighbors() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);		
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,255\n" +
+				"2,12\n" + 
+				"3,59\n" +
+				"4,102\n" + 
+				"5,285\n";
+	}
+
+	@Test
+	public void testSumOfOAllNeighbors() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,11\n" +
+				"2,6\n" + 
+				"3,15\n" +
+				"4,12\n" + 
+				"5,13\n";
+	}
+
+	@Test
+	public void testSumOfOutNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,3\n" + 
+				"3,9\n" +
+				"4,5\n" + 
+				"5,1\n";
+	}
+
+	@Test
+	public void testSumOfInNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum = 
+				graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,255\n" +
+				"2,12\n" + 
+				"3,59\n" +
+				"4,102\n" + 
+				"5,285\n";
+	}
+
+	@Test
+	public void testSumOfAllNeighborsNoValue() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+				graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+	
+		expectedResult = "1,10\n" +
+				"2,4\n" + 
+				"3,12\n" +
+				"4,8\n" + 
+				"5,8\n";
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+	Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+			
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			return new Tuple2<Long, Long>(vertex.getId(), sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+		
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			return new Tuple2<Long, Long>(vertex.getId(), sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
+		Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+	
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
+			}
+			return new Tuple2<Long, Long>(next.f0, sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+		
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue() * next.f1.getValue();
+			}
+			return new Tuple2<Long, Long>(next.f0, sum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
+		Tuple2<Long, Long>> {
+
+		public Tuple2<Long, Long> iterateNeighbors(
+				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+	
+			long sum = 0;
+			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
+			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
+					neighbors.iterator();
+			while(neighborsIterator.hasNext()) {
+				next = neighborsIterator.next();
+				sum += next.f2.getValue();
+			}
+			return new Tuple2<Long, Long>(next.f0, sum);
+		}
+	}
+}
\ No newline at end of file


Mime
View raw message