flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [5/5] flink git commit: [FLINK-3965] [gelly] Delegating GraphAlgorithm
Date Fri, 01 Jul 2016 19:14:16 GMT
[FLINK-3965] [gelly] Delegating GraphAlgorithm

A delegating GraphAlgorithm wraps a GraphAlgorithm result with a
delegating proxy object. The delegated object can be replaced when the
same algorithm is run on the same input with a mergeable configuration.
This allows algorithms to be composed of implicitly reusable algorithms
without publicly sharing intermediate DataSets.

This closes #2032


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/149e7a01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/149e7a01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/149e7a01

Branch: refs/heads/master
Commit: 149e7a01445b4ba494409472dc8b0b15c7221e9e
Parents: 7ab6837
Author: Greg Hogan <code@greghogan.com>
Authored: Wed May 25 06:43:41 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Fri Jul 1 14:35:42 2016 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/api/java/DataSet.java |   2 +-
 .../annotate/directed/EdgeDegreesPair.java      |  27 ++-
 .../annotate/directed/EdgeSourceDegrees.java    |  27 ++-
 .../annotate/directed/EdgeTargetDegrees.java    |  27 ++-
 .../degree/annotate/directed/VertexDegrees.java |  43 ++++-
 .../annotate/directed/VertexInDegree.java       |  42 ++++-
 .../annotate/directed/VertexOutDegree.java      |  42 ++++-
 .../annotate/undirected/EdgeDegreePair.java     |  36 +++-
 .../annotate/undirected/EdgeSourceDegree.java   |  34 +++-
 .../annotate/undirected/EdgeTargetDegree.java   |  34 +++-
 .../annotate/undirected/VertexDegree.java       |  52 +++++-
 .../degree/filter/undirected/MaximumDegree.java |  57 ++++--
 .../graph/asm/simple/directed/Simplify.java     |  26 ++-
 .../graph/asm/simple/undirected/Simplify.java   |  34 +++-
 .../asm/translate/TranslateEdgeValues.java      |  35 +++-
 .../graph/asm/translate/TranslateGraphIds.java  |  35 +++-
 .../asm/translate/TranslateVertexValues.java    |  35 +++-
 .../directed/LocalClusteringCoefficient.java    |  26 ++-
 .../clustering/directed/TriangleListing.java    |  35 +++-
 .../undirected/LocalClusteringCoefficient.java  |  27 ++-
 .../clustering/undirected/TriangleListing.java  |  34 +++-
 .../flink/graph/library/link_analysis/HITS.java |  32 +++-
 .../graph/library/similarity/AdamicAdar.java    |  35 +++-
 .../graph/library/similarity/JaccardIndex.java  |  39 +++-
 .../flink/graph/utils/proxy/Delegate.java       |  95 ++++++++++
 .../proxy/GraphAlgorithmDelegatingDataSet.java  | 150 +++++++++++++++
 .../proxy/GraphAlgorithmDelegatingGraph.java    | 160 ++++++++++++++++
 .../graph/utils/proxy/OptionalBoolean.java      | 135 ++++++++++++++
 .../annotate/undirected/VertexDegreeTest.java   |   2 +-
 .../graph/utils/proxy/OptionalBooleanTest.java  | 181 +++++++++++++++++++
 30 files changed, 1424 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
index 530de4b..e3b2ec2 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java
@@ -1749,7 +1749,7 @@ public abstract class DataSet<T> {
 	// --------------------------------------------------------------------------------------------
 	
 	protected static void checkSameExecutionContext(DataSet<?> set1, DataSet<?> set2) {
-		if (set1.context != set2.context) {
+		if (set1.getExecutionEnvironment() != set2.getExecutionEnvironment()) {
 			throw new IllegalArgumentException("The two inputs have different execution contexts.");
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 40af5ce..be19613 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -24,10 +24,11 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -40,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreesPair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;
@@ -58,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeDegreesPair.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeDegreesPair rhs = (EdgeDegreesPair) other;
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, t, d(s)
 		DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index e08ee56..ee3175e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;
@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeSourceDegrees.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, d(s)
 		DataSet<Vertex<K, Degrees>> vertexDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 5110513..6ba47f2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -23,10 +23,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -39,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;
@@ -57,7 +58,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeTargetDegrees.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// t, d(t)
 		DataSet<Vertex<K, Degrees>> vertexDegrees = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 1f1d4ab..363ad2e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -30,13 +30,15 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
@@ -48,10 +50,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegrees<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = false;
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -65,7 +67,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
 	 * @return this
 	 */
 	public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
-		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
 		return this;
 	}
@@ -83,7 +85,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
 	}
 
 	@Override
-	public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return VertexOutDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		VertexDegrees rhs = (VertexDegrees) other;
+
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Degrees>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, t, bitmask
 		DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges()
@@ -103,7 +134,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
 				.setParallelism(parallelism)
 				.name("Degree count");
 
-		if (includeZeroDegreeVertices) {
+		if (includeZeroDegreeVertices.get()) {
 			vertexDegrees = input.getVertices()
 				.leftOuterJoin(vertexDegrees)
 				.where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 1541abd..75f2369 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexInDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = false;
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexInDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
-		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
 		return this;
 	}
@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	}
 
 	@Override
-	public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return VertexInDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		VertexInDegree rhs = (VertexInDegree) other;
+
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// t
 		DataSet<Vertex<K, LongValue>> targetIds = input
@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.setParallelism(parallelism)
 				.name("Degree count");
 
-		if (includeZeroDegreeVertices) {
+		if (includeZeroDegreeVertices.get()) {
 			targetDegree = input.getVertices()
 				.leftOuterJoin(targetDegree)
 				.where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 22c0a67..b0576f8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -20,11 +20,12 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,10 +39,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexOutDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = false;
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -55,7 +56,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexOutDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
-		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
 		return this;
 	}
@@ -76,7 +77,36 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	}
 
 	@Override
-	public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return VertexOutDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		VertexOutDegree rhs = (VertexOutDegree) other;
+
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s
 		DataSet<Vertex<K, LongValue>> sourceIds = input
@@ -92,7 +122,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.setParallelism(parallelism)
 				.name("Degree count");
 
-		if (includeZeroDegreeVertices) {
+		if (includeZeroDegreeVertices.get()) {
 			sourceDegree = input.getVertices()
 				.leftOuterJoin(sourceDegree)
 				.where(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index f27ea54..1f78566 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -24,9 +24,10 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -41,10 +42,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeDegreePair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
 
 	// Optional configuration
-	protected boolean reduceOnTargetId = false;
+	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -58,7 +59,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 	 * @return this
 	 */
 	public EdgeDegreePair<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
-		this.reduceOnTargetId = reduceOnTargetId;
+		this.reduceOnTargetId.set(reduceOnTargetId);
 
 		return this;
 	}
@@ -79,18 +80,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeDegreePair.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeDegreePair rhs = (EdgeDegreePair) other;
+
+		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, t, d(s)
 		DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input
 			.run(new EdgeSourceDegree<K, VV, EV>()
-				.setReduceOnTargetId(reduceOnTargetId)
+				.setReduceOnTargetId(reduceOnTargetId.get())
 				.setParallelism(parallelism));
 
 		// t, d(t)
 		DataSet<Vertex<K, LongValue>> vertexDegrees = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setReduceOnTargetId(reduceOnTargetId)
+				.setReduceOnTargetId(reduceOnTargetId.get())
 				.setParallelism(parallelism));
 
 		// s, t, (d(s), d(t))

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 2bba645..520723c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeSourceDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
 
 	// Optional configuration
-	private boolean reduceOnTargetId = false;
+	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	 * @return this
 	 */
 	public EdgeSourceDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
-		this.reduceOnTargetId = reduceOnTargetId;
+		this.reduceOnTargetId.set(reduceOnTargetId);
 
 		return this;
 	}
@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeSourceDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeSourceDegree rhs = (EdgeSourceDegree) other;
+
+		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, d(s)
 		DataSet<Vertex<K, LongValue>> vertexDegrees = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setReduceOnTargetId(reduceOnTargetId)
+				.setReduceOnTargetId(reduceOnTargetId.get())
 				.setParallelism(parallelism));
 
 		// s, t, d(s)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 6edaf17..123c1dc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -23,9 +23,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -39,10 +40,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeTargetDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>> {
 
 	// Optional configuration
-	private boolean reduceOnSourceId = false;
+	private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -56,7 +57,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	 * @return this
 	 */
 	public EdgeTargetDegree<K, VV, EV> setReduceOnSourceId(boolean reduceOnSourceId) {
-		this.reduceOnSourceId = reduceOnSourceId;
+		this.reduceOnSourceId.set(reduceOnSourceId);
 
 		return this;
 	}
@@ -77,12 +78,33 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	}
 
 	@Override
-	public DataSet<Edge<K, Tuple2<EV, LongValue>>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return EdgeTargetDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		EdgeTargetDegree rhs = (EdgeTargetDegree) other;
+
+		reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, LongValue>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// t, d(t)
 		DataSet<Vertex<K, LongValue>> vertexDegrees = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setReduceOnTargetId(!reduceOnSourceId)
+				.setReduceOnTargetId(!reduceOnSourceId.get())
 				.setParallelism(parallelism));
 
 		// s, t, d(t)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index a2c8e03..ec72222 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -20,10 +20,11 @@ package org.apache.flink.graph.asm.degree.annotate.undirected;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
@@ -41,12 +42,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = false;
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private boolean reduceOnTargetId = false;
+	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -60,7 +61,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexDegree<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
-		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
 
 		return this;
 	}
@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
-		this.reduceOnTargetId = reduceOnTargetId;
+		this.reduceOnTargetId.set(reduceOnTargetId);
 
 		return this;
 	}
@@ -96,9 +97,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	}
 
 	@Override
-	public DataSet<Vertex<K, LongValue>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return VertexDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! VertexDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		VertexDegree rhs = (VertexDegree) other;
+
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Vertex<K, LongValue>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
-		MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId ?
+		MapFunction<Edge<K, EV>, Vertex<K, LongValue>> mapEdgeToId = reduceOnTargetId.get() ?
 			new MapEdgeToTargetId<K, EV>() : new MapEdgeToSourceId<K, EV>();
 
 		// v
@@ -115,8 +146,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.setParallelism(parallelism)
 				.name("Degree count");
 
-		if (includeZeroDegreeVertices) {
-			degree = input.getVertices()
+		if (includeZeroDegreeVertices.get()) {
+			degree = input
+				.getVertices()
 				.leftOuterJoin(degree)
 				.where(0)
 				.equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index e7d78bb..f9cfae9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -27,9 +27,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFir
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -46,15 +47,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class MaximumDegree<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
 	// Required configuration
 	private long maximumDegree;
 
 	// Optional configuration
-	private boolean reduceOnTargetId = false;
+	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private boolean broadcastHighDegreeVertices = false;
+	private OptionalBoolean broadcastHighDegreeVertices = new OptionalBoolean(false, false);
 
 	private int parallelism = PARALLELISM_DEFAULT;
 
@@ -79,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 	 * @return this
 	 */
 	public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
-		this.reduceOnTargetId = reduceOnTargetId;
+		this.reduceOnTargetId.set(reduceOnTargetId);
 
 		return this;
 	}
@@ -96,7 +97,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 	 * @return this
 	 */
 	public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices) {
-		this.broadcastHighDegreeVertices = broadcastHighDegreeVertices;
+		this.broadcastHighDegreeVertices.set(broadcastHighDegreeVertices);
 
 		return this;
 	}
@@ -113,6 +114,36 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return MaximumDegree.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		MaximumDegree rhs = (MaximumDegree) other;
+
+		// verify that configurations can be merged
+
+		if (maximumDegree != rhs.maximumDegree) {
+			return false;
+		}
+
+		// merge configurations
+
+		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
+		broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -121,12 +152,12 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 	 */
 
 	@Override
-	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+	public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, d(u)
 		DataSet<Vertex<K, LongValue>> vertexDegree = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setReduceOnTargetId(reduceOnTargetId)
+				.setReduceOnTargetId(reduceOnTargetId.get())
 				.setParallelism(parallelism));
 
 		// u, d(u) if d(u) > maximumDegree
@@ -135,7 +166,7 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 				.setParallelism(parallelism)
 				.name("Filter high-degree vertices");
 
-		JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
+		JoinHint joinHint = broadcastHighDegreeVertices.get() ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
 
 		// Vertices
 		DataSet<Vertex<K, VV>> vertices = input
@@ -151,17 +182,17 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 		DataSet<Edge<K, EV>> edges = input
 			.getEdges()
 			.leftOuterJoin(highDegreeVertices, joinHint)
-			.where(reduceOnTargetId ? 1 : 0)
+			.where(reduceOnTargetId.get() ? 1 : 0)
 			.equalTo(0)
 				.with(new ProjectEdge<K, EV>())
 				.setParallelism(parallelism)
-				.name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source"))
+				.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "target" : "source"))
 			.leftOuterJoin(highDegreeVertices, joinHint)
-			.where(reduceOnTargetId ? 0 : 1)
+			.where(reduceOnTargetId.get() ? 0 : 1)
 			.equalTo(0)
 			.with(new ProjectEdge<K, EV>())
 				.setParallelism(parallelism)
-				.name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target"));
+				.name("Project low-degree edges by " + (reduceOnTargetId.get() ? "source" : "target"));
 
 		// Graph
 		return Graph.fromDataSet(vertices, edges, input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 7362a3e..a7fa9b6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
@@ -38,7 +38,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
 	// Optional configuration
 	private int parallelism = PARALLELISM_DEFAULT;
@@ -59,7 +59,27 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 	}
 
 	@Override
-	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return Simplify.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! Simplify.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		Simplify rhs = (Simplify) other;
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// Edges
 		DataSet<Edge<K, EV>> edges = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 13ac470..d006756 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -24,7 +24,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, EV, K, VV, EV> {
 
 	// Required configuration
 	private boolean clipAndFlip;
@@ -77,7 +77,35 @@ implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
 	}
 
 	@Override
-	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return Simplify.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! Simplify.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		Simplify rhs = (Simplify) other;
+
+		// verify that configurations can be merged
+
+		if (clipAndFlip != rhs.clipAndFlip) {
+			return false;
+		}
+
+		// merge configurations
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// Edges
 		DataSet<Edge<K, EV>> edges = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 47ec077..6003c9a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,7 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues
  * @param <NEW> new edge value type
  */
 public class TranslateEdgeValues<K, VV, OLD, NEW>
-implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
+extends GraphAlgorithmDelegatingGraph<K, VV, OLD, K, VV, NEW> {
 
 	// Required configuration
 	private TranslateFunction<OLD,NEW> translator;
@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
 	}
 
 	@Override
-	public Graph<K, VV, NEW> run(Graph<K, VV, OLD> input) throws Exception {
+	protected String getAlgorithmName() {
+		return TranslateEdgeValues.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TranslateEdgeValues rhs = (TranslateEdgeValues) other;
+
+		// verify that configurations can be merged
+
+		if (translator != rhs.translator) {
+			return false;
+		}
+
+		// merge configurations
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public Graph<K, VV, NEW> runInternal(Graph<K, VV, OLD> input)
+			throws Exception {
 		DataSet<Edge<K, NEW>> translatedEdges = translateEdgeValues(input.getEdges(), translator, parallelism);
 
 		return Graph.fromDataSet(input.getVertices(), translatedEdges, input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 6a06feb..6ea56eb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -21,8 +21,8 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -38,7 +38,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
  * @param <EV> edge value type
  */
 public class TranslateGraphIds<OLD, NEW, VV, EV>
-implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
+extends GraphAlgorithmDelegatingGraph<OLD, VV, EV, NEW, VV, EV> {
 
 	// Required configuration
 	private TranslateFunction<OLD,NEW> translator;
@@ -73,7 +73,36 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
 	}
 
 	@Override
-	public Graph<NEW, VV, EV> run(Graph<OLD, VV, EV> input) throws Exception {
+	protected String getAlgorithmName() {
+		return TranslateGraphIds.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TranslateGraphIds rhs = (TranslateGraphIds) other;
+
+		// verify that configurations can be merged
+
+		if (translator != rhs.translator) {
+			return false;
+		}
+
+		// merge configurations
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public Graph<NEW, VV, EV> runInternal(Graph<OLD, VV, EV> input)
+			throws Exception {
 		// Vertices
 		DataSet<Vertex<NEW, VV>> translatedVertices = translateVertexIds(input.getVertices(), translator, parallelism);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 3d0133a..3a49324 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -20,8 +20,8 @@ package org.apache.flink.graph.asm.translate;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingGraph;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -36,7 +36,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateVertexValu
  * @param <EV> edge value type
  */
 public class TranslateVertexValues<K, OLD, NEW, EV>
-implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
+extends GraphAlgorithmDelegatingGraph<K, OLD, EV, K, NEW, EV> {
 
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
@@ -71,7 +71,36 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
 	}
 
 	@Override
-	public Graph<K, NEW, EV> run(Graph<K, OLD, EV> input) throws Exception {
+	protected String getAlgorithmName() {
+		return TranslateVertexValues.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingGraph other) {
+		Preconditions.checkNotNull(other);
+
+		if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TranslateVertexValues rhs = (TranslateVertexValues) other;
+
+		// verify that configurations can be merged
+
+		if (translator != rhs.translator) {
+			return false;
+		}
+
+		// merge configurations
+
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+	@Override
+	public Graph<K, NEW, EV> runInternal(Graph<K, OLD, EV> input)
+			throws Exception {
 		DataSet<Vertex<K, NEW>> translatedVertices = translateVertexValues(input.getVertices(), translator, parallelism);
 
 		return Graph.fromDataSet(translatedVertices, input.getEdges(), input.getContext());

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index bbc167e..537ad0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -25,12 +25,12 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
 	private int littleParallelism = PARALLELISM_DEFAULT;
@@ -74,6 +74,25 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 
 		return this;
 	}
+	@Override
+	protected String getAlgorithmName() {
+		return LocalClusteringCoefficient.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
+
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
 
 	/*
 	 * Implementation notes:
@@ -86,12 +105,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 */
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v, w, bitmask
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K,VV,EV>()
-				.setSortTriangleVertices(false)
 				.setLittleParallelism(littleParallelism));
 
 		// u, edge count

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 5c364f5..14c731a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -33,9 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
@@ -60,10 +62,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
-	private boolean sortTriangleVertices = false;
+	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
 
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -75,7 +77,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
 	 * @return this
 	 */
 	public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
-		this.sortTriangleVertices = sortTriangleVertices;
+		this.sortTriangleVertices.set(sortTriangleVertices);
 
 		return this;
 	}
@@ -95,6 +97,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return TriangleListing.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TriangleListing rhs = (TriangleListing) other;
+
+		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -106,7 +129,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
 	 */
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v, bitmask where u < v
 		DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
@@ -151,7 +174,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<TriangleListing.Result<K>>> {
 				.setParallelism(littleParallelism)
 				.name("Triangle listing");
 
-		if (sortTriangleVertices) {
+		if (sortTriangleVertices.get()) {
 			triangles = triangles
 				.map(new SortTriangleVertices<K>())
 					.name("Sort triangle vertices");

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 6858818..8f707fd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -26,11 +26,11 @@ import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -55,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
 	private int littleParallelism = PARALLELISM_DEFAULT;
@@ -75,6 +75,26 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return LocalClusteringCoefficient.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
+
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -86,12 +106,11 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 */
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v, w
 		DataSet<Tuple3<K,K,K>> triangles = input
 			.run(new TriangleListing<K,VV,EV>()
-				.setSortTriangleVertices(false)
 				.setLittleParallelism(littleParallelism));
 
 		// u, 1

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 4f8ce7a..89b86fe 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -32,8 +32,9 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -62,10 +63,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 
 	// Optional configuration
-	private boolean sortTriangleVertices = false;
+	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, false);
 
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
@@ -77,7 +78,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 	 * @return this
 	 */
 	public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
-		this.sortTriangleVertices = sortTriangleVertices;
+		this.sortTriangleVertices.set(sortTriangleVertices);
 
 		return this;
 	}
@@ -97,6 +98,27 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return TriangleListing.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TriangleListing rhs = (TriangleListing) other;
+
+		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -108,7 +130,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 	 */
 
 	@Override
-	public DataSet<Tuple3<K, K, K>> run(Graph<K, VV, EV> input)
+	public DataSet<Tuple3<K, K, K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// u, v where u < v
 		DataSet<Tuple2<K, K>> filteredByID = input
@@ -145,7 +167,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 				.setParallelism(littleParallelism)
 				.name("Triangle listing");
 
-		if (sortTriangleVertices) {
+		if (sortTriangleVertices.get()) {
 			triangles = triangles
 				.map(new SortTriangleVertices<K>())
 					.name("Sort triangle vertices");

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 00035e4..b88badb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -36,9 +36,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.DoubleValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -62,7 +63,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class HITS<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	private static final String CHANGE_IN_SCORES = "change in scores";
 
@@ -128,7 +129,32 @@ implements GraphAlgorithm<K, VV, EV, DataSet<HITS.Result<K>>> {
 	}
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	protected String getAlgorithmName() {
+		return HITS.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! HITS.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		HITS rhs = (HITS) other;
+
+		// merge configurations
+
+		maxIterations = Math.max(maxIterations, rhs.maxIterations);
+		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+		parallelism = Math.min(parallelism, rhs.parallelism);
+
+		return true;
+	}
+
+
+	@Override
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		DataSet<Tuple2<K, K>> edges = input
 			.getEdges()

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index a164a5f..512a7a0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -33,11 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.FloatValue;
 import org.apache.flink.types.IntValue;
@@ -71,7 +71,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	private static final int GROUP_SIZE = 64;
 
@@ -127,6 +127,35 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return AdamicAdar.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		AdamicAdar rhs = (AdamicAdar) other;
+
+		// verify that configurations can be merged
+
+		if (minimumRatio != rhs.minimumRatio ||
+			minimumScore != rhs.minimumScore) {
+			return false;
+		}
+
+		// merge configurations
+
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -136,7 +165,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 */
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, d(s), 1/log(d(s))
 		DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index c731984..7783e6b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -61,7 +61,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
+extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	public static final int DEFAULT_GROUP_SIZE = 64;
 
@@ -153,6 +153,39 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 		return this;
 	}
 
+	@Override
+	protected String getAlgorithmName() {
+		return JaccardIndex.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		JaccardIndex rhs = (JaccardIndex) other;
+
+		// verify that configurations can be merged
+
+		if (unboundedScores != rhs.unboundedScores ||
+			minimumScoreNumerator != rhs.minimumScoreNumerator ||
+			minimumScoreDenominator != rhs.minimumScoreDenominator ||
+			maximumScoreNumerator != rhs.maximumScoreNumerator ||
+			maximumScoreDenominator != rhs.maximumScoreDenominator) {
+			return false;
+		}
+
+		// merge configurations
+
+		groupSize = Math.max(groupSize, rhs.groupSize);
+		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
+
+		return true;
+	}
+
 	/*
 	 * Implementation notes:
 	 *
@@ -162,7 +195,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 */
 
 	@Override
-	public DataSet<Result<K>> run(Graph<K, VV, EV> input)
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		// s, t, d(t)
 		DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
new file mode 100644
index 0000000..164125c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/Delegate.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import javassist.util.proxy.MethodFilter;
+import javassist.util.proxy.MethodHandler;
+import javassist.util.proxy.ProxyFactory;
+import javassist.util.proxy.ProxyObject;
+import org.objenesis.ObjenesisStd;
+
+import java.lang.reflect.Method;
+
+/**
+ * Wraps an object with a proxy delegate whose method handler invokes all
+ * method calls on the wrapped object. This object can be later replaced.
+ *
+ * @param <X>
+ */
+public class Delegate<X> {
+	private X obj;
+
+	private X proxy = null;
+
+	/**
+	 * Set the initial delegated object.
+	 *
+	 * @param obj delegated object
+	 */
+	public Delegate(X obj) {
+		setObject(obj);
+	}
+
+	/**
+	 * Change the delegated object.
+	 *
+	 * @param obj delegated object
+	 */
+	public void setObject(X obj) {
+		this.obj = obj;
+	}
+
+	/**
+	 * Instantiates and returns a proxy object which subclasses the
+	 * delegated object. The proxy's method handler invokes all methods
+	 * on the delegated object that is set at the time of invocation.
+	 *
+	 * @return delegating proxy
+	 */
+	@SuppressWarnings("unchecked")
+	public X getProxy() {
+		if (proxy != null) {
+			return proxy;
+		}
+
+		ProxyFactory factory = new ProxyFactory();
+		factory.setSuperclass(obj.getClass());
+
+		// create the class and instantiate an instance without calling a constructor
+		Class<? extends X> proxyClass = factory.createClass(new MethodFilter() {
+			@Override
+			public boolean isHandled(Method method) {
+				return true;
+			}
+		});
+		proxy = new ObjenesisStd().newInstance(proxyClass);
+
+		// create and set a handler to invoke all method calls on the delegated object
+		((ProxyObject) proxy).setHandler(new MethodHandler() {
+			@Override
+			public Object invoke(Object self, Method thisMethod, Method proceed, Object[] args) throws Throwable {
+				// method visibility may be restricted
+				thisMethod.setAccessible(true);
+				return thisMethod.invoke(obj, args);
+			}
+		});
+
+		return proxy;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/149e7a01/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
new file mode 100644
index 0000000..8e796e6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmDelegatingDataSet.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils.proxy;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
+ * type {@code T}. A {@code GraphAlgorithmDelegatingDataSet} wraps the resultant
+ * {@link DataSet} with a delegating proxy object. The delegated object can be
+ * replaced when the same algorithm is run on the same input with a mergeable
+ * configuration. This allows algorithms to be composed of implicitly reusable
+ * algorithms without publicly sharing intermediate {@link DataSet}s.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> output type
+ */
+public abstract class GraphAlgorithmDelegatingDataSet<K, VV, EV, T>
+implements GraphAlgorithm<K, VV, EV, DataSet<T>> {
+
+	// each algorithm and input pair may map to multiple configurations
+	private static Map<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>> cache =
+		Collections.synchronizedMap(new HashMap<GraphAlgorithmDelegatingDataSet, List<GraphAlgorithmDelegatingDataSet>>());
+
+	private Graph<K,VV,EV> input;
+
+	private Delegate<DataSet<T>> delegate;
+
+	/**
+	 * Algorithms are identified by name rather than by class to allow subclassing.
+	 *
+	 * @return name of the algorithm, which may be shared by multiple classes
+	 *		 implementing the same algorithm and generating the same output
+	 */
+	protected abstract String getAlgorithmName();
+
+	/**
+	 * An algorithm must first test whether the configurations can be merged
+	 * before merging individual fields.
+	 *
+	 * @param other the algorithm with which to compare and merge
+	 * @return true if and only if configuration has been merged and the
+	 *          algorithm's output can be reused
+	 */
+	protected abstract boolean mergeConfiguration(GraphAlgorithmDelegatingDataSet other);
+
+	/**
+	 * The implementation of the algorithm, renamed from {@link GraphAlgorithm#run(Graph)}.
+	 *
+	 * @param input the input graph
+	 * @return the algorithm's output
+	 * @throws Exception
+	 */
+	protected abstract DataSet<T> runInternal(Graph<K, VV, EV> input) throws Exception;
+
+	@Override
+	public final int hashCode() {
+		return new HashCodeBuilder(17, 37)
+			.append(input)
+			.append(getAlgorithmName())
+			.toHashCode();
+	}
+
+	@Override
+	public final boolean equals(Object obj) {
+		if (obj == null) {
+			return false;
+		}
+
+		if (obj == this) {
+			return true;
+		}
+
+		if (! GraphAlgorithmDelegatingDataSet.class.isAssignableFrom(obj.getClass())) {
+			return false;
+		}
+
+		GraphAlgorithmDelegatingDataSet rhs = (GraphAlgorithmDelegatingDataSet) obj;
+
+		return new EqualsBuilder()
+			.append(input, rhs.input)
+			.append(getAlgorithmName(), rhs.getAlgorithmName())
+			.isEquals();
+	}
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public final DataSet<T> run(Graph<K, VV, EV> input)
+			throws Exception {
+		this.input = input;
+
+		if (cache.containsKey(this)) {
+			for (GraphAlgorithmDelegatingDataSet<K, VV, EV, T> other : cache.get(this)) {
+				if (mergeConfiguration(other)) {
+					// configuration has been merged so generate new output
+					DataSet<T> output = runInternal(input);
+
+					// update delegatee object and reuse delegate
+					other.delegate.setObject(output);
+					delegate = other.delegate;
+
+					return delegate.getProxy();
+				}
+			}
+		}
+
+		// no mergeable configuration found so generate new output
+		DataSet<T> output = runInternal(input);
+
+		// create a new delegate to wrap the algorithm output
+		delegate = new Delegate<>(output);
+
+		// cache this result
+		if (cache.containsKey(this)) {
+			cache.get(this).add(this);
+		} else {
+			cache.put(this, new ArrayList(Collections.singletonList(this)));
+		}
+
+		return delegate.getProxy();
+	}
+}


Mime
View raw message