flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [18/50] [abbrv] flink git commit: [FLINK-1201] [gelly] joinWithEdges implemented and tested
Date Wed, 11 Feb 2015 10:49:20 GMT
[FLINK-1201] [gelly] joinWithEdges implemented and tested


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e0c10ec8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e0c10ec8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e0c10ec8

Branch: refs/heads/master
Commit: e0c10ec8f8aff10eaa186d04dda252e105b4eb0b
Parents: b2c89cc
Author: andralungu <lungu.andra@gmail.com>
Authored: Thu Jan 8 18:16:31 2015 +0100
Committer: Stephan Ewen <sewen@apache.org>
Committed: Wed Feb 11 10:46:14 2015 +0100

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 123 ++++
 .../apache/flink/graph/test/TestGraphUtils.java |  90 +++
 .../flink/graph/test/TestJoinWithEdges.java     | 584 +++++++++++++++++++
 3 files changed, 797 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 1cd5c90..71a701b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -222,6 +222,129 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	/**
+	 * Method that joins the edge DataSet with an input DataSet on a composite key of both source
and target
+	 * and applies a UDF on the resulted values.
+	 * @param inputDataSet
+	 * @param mapper - the UDF applied
+	 * @param <T>
+	 * @return - a new graph where the edge values have been updated.
+	 */
+	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>>
inputDataSet,
+											  final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+				.coGroup(inputDataSet).where(0,1).equalTo(0,1)
+				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
+
+		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+	}
+
+	private static final class ApplyCoGroupToEdgeValues<K extends Comparable<K> &
Serializable,
+			EV extends Serializable, T>
+			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>>
{
+
+		private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+		public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
+			this.mapper = mapper;
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
+							Iterable<Tuple3<K, K, T>> iterableDS2,
+							Collector<Edge<K, EV>> collector) throws Exception {
+
+			Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator();
+			Iterator<Tuple3<K, K, T>> iteratorDS2 = iterableDS2.iterator();
+
+			if(iteratorDS2.hasNext() && iteratorDS1.hasNext()) {
+				Tuple3<K, K, T> iteratorDS2Next = iteratorDS2.next();
+
+				collector.collect(new Edge<K, EV>(iteratorDS2Next.f0, iteratorDS2Next.f1, mapper
+						.map(new Tuple2<EV, T>(iteratorDS1.next().f2, iteratorDS2Next.f2))));
+
+			} else if(iteratorDS1.hasNext()) {
+				collector.collect(iteratorDS1.next());
+			}
+		}
+	}
+
+	/**
+	 * Method that joins the edge DataSet with an input DataSet on the source key of the edges
and the first attribute
+	 * of the input DataSet and applies a UDF on the resulted values.
+	 * Should the inputDataSet contain the same key more than once, only the first value will
be considered.
+	 * @param inputDataSet
+	 * @param mapper - the UDF applied
+	 * @param <T>
+	 * @return - a new graph where the edge values have been updated.
+	 */
+	public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>>
inputDataSet,
+												 final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+				.coGroup(inputDataSet).where(0).equalTo(0)
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+	}
+
+	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K extends Comparable<K>
& Serializable,
+			EV extends Serializable, T>
+			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>>
{
+
+		private MapFunction<Tuple2<EV, T>, EV> mapper;
+
+		public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(MapFunction<Tuple2<EV, T>,
EV> mapper) {
+			this.mapper = mapper;
+		}
+
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> iterableDS1,
+							Iterable<Tuple2<K, T>> iterableDS2,
+							Collector<Edge<K, EV>> collector) throws Exception {
+
+			Iterator<Edge<K, EV>> iteratorDS1 = iterableDS1.iterator();
+			Iterator<Tuple2<K, T>> iteratorDS2 = iterableDS2.iterator();
+
+			if(iteratorDS2.hasNext()) {
+				Tuple2<K, T> iteratorDS2Next = iteratorDS2.next();
+
+				while(iteratorDS1.hasNext()) {
+					Edge<K, EV> iteratorDS1Next = iteratorDS1.next();
+
+					collector.collect(new Edge<K, EV>(iteratorDS1Next.f0, iteratorDS1Next.f1, mapper
+							.map(new Tuple2<EV, T>(iteratorDS1Next.f2, iteratorDS2Next.f1))));
+				}
+
+			} else {
+				while(iteratorDS1.hasNext()) {
+					collector.collect(iteratorDS1.next());
+				}
+			}
+		}
+	}
+
+	/**
+	 * Method that joins the edge DataSet with an input DataSet on the target key of the edges
and the first attribute
+	 * of the input DataSet and applies a UDF on the resulted values.
+	 * Should the inputDataSet contain the same key more than once, only the first value will
be considered.
+	 * @param inputDataSet
+	 * @param mapper - the UDF applied
+	 * @param <T>
+	 * @return - a new graph where the edge values have been updated.
+	 */
+	public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>>
inputDataSet,
+													  final MapFunction<Tuple2<EV, T>, EV> mapper) {
+
+		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
+				.coGroup(inputDataSet).where(1).equalTo(0)
+				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
+
+		return Graph.create(this.getVertices(), resultedEdges, this.getContext());
+	}
+
+	/**
      * Apply value-based filtering functions to the graph 
      * and return a sub-graph that satisfies the predicates
      * for both vertex values and edge values.

http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
index 9816619..d5062c5 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java
@@ -7,6 +7,7 @@ import java.util.List;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 
 public class TestGraphUtils {
 
@@ -62,6 +63,48 @@ public class TestGraphUtils {
 		return env.fromCollection(tuples);
 	}
 
+	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2SourceData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+		tuples.add(new Tuple2<Long, Long>(1L, 10L));
+		tuples.add(new Tuple2<Long, Long>(1L, 20L));
+		tuples.add(new Tuple2<Long, Long>(2L, 30L));
+		tuples.add(new Tuple2<Long, Long>(3L, 40L));
+		tuples.add(new Tuple2<Long, Long>(3L, 50L));
+		tuples.add(new Tuple2<Long, Long>(4L, 60L));
+		tuples.add(new Tuple2<Long, Long>(6L, 70L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, Long>> getLongLongTuple2TargetData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, Long>> tuples = new ArrayList<Tuple2<Long, Long>>();
+		tuples.add(new Tuple2<Long, Long>(2L, 10L));
+		tuples.add(new Tuple2<Long, Long>(3L, 20L));
+		tuples.add(new Tuple2<Long, Long>(3L, 30L));
+		tuples.add(new Tuple2<Long, Long>(4L, 40L));
+		tuples.add(new Tuple2<Long, Long>(6L, 50L));
+		tuples.add(new Tuple2<Long, Long>(6L, 60L));
+		tuples.add(new Tuple2<Long, Long>(1L, 70L));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple3<Long, Long, Long>> getLongLongLongTuple3Data(
+			ExecutionEnvironment env) {
+		List<Tuple3<Long, Long, Long>> tuples = new ArrayList<>();
+		tuples.add(new Tuple3<Long, Long, Long>(1L, 2L, 12L));
+		tuples.add(new Tuple3<Long, Long, Long>(1L, 3L, 13L));
+		tuples.add(new Tuple3<Long, Long, Long>(2L, 3L, 23L));
+		tuples.add(new Tuple3<Long, Long, Long>(3L, 4L, 34L));
+		tuples.add(new Tuple3<Long, Long, Long>(3L, 6L, 36L));
+		tuples.add(new Tuple3<Long, Long, Long>(4L, 6L, 46L));
+		tuples.add(new Tuple3<Long, Long, Long>(6L, 1L, 61L));
+
+		return env.fromCollection(tuples);
+	}
+
 	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>>
getLongCustomTuple2Data(
 			ExecutionEnvironment env) {
 		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new
ArrayList<Tuple2<Long,
@@ -77,6 +120,53 @@ public class TestGraphUtils {
 		return env.fromCollection(tuples);
 	}
 
+	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>>
getLongCustomTuple2SourceData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new
ArrayList<Tuple2<Long,
+				DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(1L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple2<Long, DummyCustomParameterizedType<Float>>>
getLongCustomTuple2TargetData(
+			ExecutionEnvironment env) {
+		List<Tuple2<Long, DummyCustomParameterizedType<Float>>> tuples = new
ArrayList<Tuple2<Long,
+				DummyCustomParameterizedType<Float>>>();
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(2L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(3L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple2<Long, DummyCustomParameterizedType<Float>>(4L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
+	public static final DataSet<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>>
getLongLongCustomTuple3Data(
+			ExecutionEnvironment env) {
+		List<Tuple3<Long, Long, DummyCustomParameterizedType<Float>>> tuples
= new ArrayList<>();
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L,
2L,
+				new DummyCustomParameterizedType<Float>(10, 10f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(1L,
3L,
+				new DummyCustomParameterizedType<Float>(20, 20f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(2L,
3L,
+				new DummyCustomParameterizedType<Float>(30, 30f)));
+		tuples.add(new Tuple3<Long, Long, DummyCustomParameterizedType<Float>>(3L,
4L,
+				new DummyCustomParameterizedType<Float>(40, 40f)));
+
+		return env.fromCollection(tuples);
+	}
+
 	/**
 	 * A graph with invalid vertex ids
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/e0c10ec8/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
new file mode 100644
index 0000000..711cd61
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestJoinWithEdges.java
@@ -0,0 +1,584 @@
+package flink.graphs;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.test.util.JavaProgramTestBase;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.LinkedList;
+
+@RunWith(Parameterized.class)
+public class TestJoinWithEdges extends JavaProgramTestBase {
+
+    private static int NUM_PROGRAMS = 15;
+
+    private int curProgId = config.getInteger("ProgramId", -1);
+    private String resultPath;
+    private String expectedResult;
+
+    public TestJoinWithEdges(Configuration config) {
+        super(config);
+    }
+
+    @Override
+    protected void preSubmit() throws Exception {
+        resultPath = getTempDirPath("result");
+    }
+
+    @Override
+    protected void testProgram() throws Exception {
+        expectedResult = GraphProgs.runProgram(curProgId, resultPath);
+    }
+
+    @Override
+    protected void postSubmit() throws Exception {
+        compareResultsByLinesInMemory(expectedResult, resultPath);
+    }
+
+    @Parameterized.Parameters
+    public static Collection<Object[]> getConfigurations() throws IOException {
+
+        LinkedList<Configuration> tConfigs = new LinkedList<Configuration>();
+
+        for(int i=1; i <= NUM_PROGRAMS; i++) {
+            Configuration config = new Configuration();
+            config.setInteger("ProgramId", i);
+            tConfigs.add(config);
+        }
+
+        return toParameterList(tConfigs);
+    }
+
+    private static class GraphProgs {
+
+        @SuppressWarnings("serial")
+        public static String runProgram(int progId, String resultPath) throws Exception {
+
+            switch (progId) {
+                case 1: {
+				/*
+				 * Test joinWithEdges with the input DataSet parameter identical
+				 * to the edge DataSet
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple3<Long,
Long, Long>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, Long>(edge.getSource(),
+                                                    edge.getTarget(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,70\n" +
+                            "4,5,90\n" +
+                            "5,1,102\n";
+                }
+                case 2: {
+                /*
+				 * Test joinWithEdges with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet, but of the same type
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple3<Long,
Long, Long>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, Long>(edge.getSource(),
+                                                    edge.getTarget(), edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 3: {
+                /*
+				 * Test joinWithEdges with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet and of a different type(Boolean)
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdges(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple3<Long,
Long, Boolean>>() {
+                                        @Override
+                                        public Tuple3<Long, Long, Boolean> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple3<Long, Long, Boolean>(edge.getSource(),
+                                                    edge.getTarget(), true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) throws
Exception {
+                                    if(tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    }
+                                    else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 4: {
+                /*
+				 * Test joinWithEdges with the input DataSet containing different keys than the edge
DataSet
+				 * - the iterator becomes empty.
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongLongTuple3Data(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 5: {
+                /*
+    		     * Test joinWithEdges with a DataSet containing custom parametrised type input
values
+    			 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdges(TestGraphUtils.getLongLongCustomTuple3Data(env),
+                            new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>,
Long>() {
+                                public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>
tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,20\n" +
+                            "2,3,30\n" +
+                            "3,4,40\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 6: {
+                /*
+				 * Test joinWithEdgesOnSource with the input DataSet parameter identical
+				 * to the edge DataSet
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Long>(edge.getSource(),
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,25\n" +
+                            "2,3,46\n" +
+                            "3,4,68\n" +
+                            "3,5,69\n" +
+                            "4,5,90\n" +
+                            "5,1,102\n";
+                }
+                case 7: {
+                /*
+				 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet, but of the same type
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Long>(edge.getSource(),
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,25\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 8: {
+                /*
+				 * Test joinWithEdgesOnSource with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet and of a different type(Boolean)
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Boolean>>() {
+                                        @Override
+                                        public Tuple2<Long, Boolean> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Boolean>(edge.getSource(),
true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) throws
Exception {
+                                    if (tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    } else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 9: {
+                /*
+				 * Test joinWithEdgesOnSource with the input DataSet containing different keys than the
edge DataSet
+				 * - the iterator becomes empty.
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongLongTuple2SourceData(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,20\n" +
+                            "1,3,20\n" +
+                            "2,3,60\n" +
+                            "3,4,80\n" +
+                            "3,5,80\n" +
+                            "4,5,120\n" +
+                            "5,1,51\n";
+                }
+                case 10: {
+                /*
+    		     * Test joinWithEdgesOnSource with a DataSet containing custom parametrised type
input values
+    			 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnSource(TestGraphUtils.getLongCustomTuple2SourceData(env),
+                            new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>,
Long>() {
+                                public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>
tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,10\n" +
+                            "2,3,30\n" +
+                            "3,4,40\n" +
+                            "3,5,40\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 11: {
+                /*
+				 * Test joinWithEdgesOnTarget with the input DataSet parameter identical
+				 * to the edge DataSet
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges()
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Long>(edge.getTarget(),
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,36\n" +
+                            "3,4,68\n" +
+                            "3,5,70\n" +
+                            "4,5,80\n" +
+                            "5,1,102\n";
+                }
+                case 12: {
+                /*
+				 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet, but of the same type
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Long>>() {
+                                        @Override
+                                        public Tuple2<Long, Long> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Long>(edge.getTarget(),
edge.getValue());
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f0 + tuple.f1;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,36\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 13: {
+                /*
+				 * Test joinWithEdgesOnTarget with the input DataSet passed as a parameter containing
+				 * less elements than the edge DataSet and of a different type(Boolean)
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(graph.getEdges().first(3)
+                                    .map(new MapFunction<Edge<Long, Long>, Tuple2<Long,
Boolean>>() {
+                                        @Override
+                                        public Tuple2<Long, Boolean> map(Edge<Long,
Long> edge) throws Exception {
+                                            return new Tuple2<Long, Boolean>(edge.getTarget(),
true);
+                                        }
+                                    }),
+                            new MapFunction<Tuple2<Long, Boolean>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Boolean> tuple) throws
Exception {
+                                    if (tuple.f1) {
+                                        return tuple.f0 * 2;
+                                    } else {
+                                        return tuple.f0;
+                                    }
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,24\n" +
+                            "1,3,26\n" +
+                            "2,3,46\n" +
+                            "3,4,34\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                case 14: {
+                /*
+				 * Test joinWithEdgesOnTarget with the input DataSet containing different keys than the
edge DataSet
+				 * - the iterator becomes empty.
+				 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongLongTuple2TargetData(env),
+                            new MapFunction<Tuple2<Long, Long>, Long>() {
+
+                                @Override
+                                public Long map(Tuple2<Long, Long> tuple) throws Exception
{
+                                    return tuple.f1 * 2;
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,20\n" +
+                            "1,3,40\n" +
+                            "2,3,40\n" +
+                            "3,4,80\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,140\n";
+                }
+                case 15: {
+                /*
+    		     * Test joinWithEdgesOnTarget with a DataSet containing custom parametrised type
input values
+    			 */
+                    final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+                    Graph<Long, Long, Long> graph = Graph.create(TestGraphUtils.getLongLongVertexData(env),
+                            TestGraphUtils.getLongLongEdgeData(env), env);
+
+                    Graph<Long, Long, Long> result = graph.joinWithEdgesOnTarget(TestGraphUtils.getLongCustomTuple2TargetData(env),
+                            new MapFunction<Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>,
Long>() {
+                                public Long map(Tuple2<Long, TestGraphUtils.DummyCustomParameterizedType<Float>>
tuple) throws Exception {
+                                    return (long) tuple.f1.getIntField();
+                                }
+                            });
+
+                    result.getEdges().writeAsCsv(resultPath);
+                    env.execute();
+
+                    return "1,2,10\n" +
+                            "1,3,20\n" +
+                            "2,3,20\n" +
+                            "3,4,40\n" +
+                            "3,5,35\n" +
+                            "4,5,45\n" +
+                            "5,1,51\n";
+                }
+                default:
+                    throw new IllegalArgumentException("Invalid program id");
+            }
+        }
+    }
+}
+


Mime
View raw message