flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [6/7] flink git commit: [FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples
Date Tue, 11 Jul 2017 13:57:24 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index e444b5c..a9f1752 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
 
 /**
@@ -42,9 +41,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Edge} values using the given {@link TranslateFunction}.
 	 *
@@ -56,43 +52,15 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateEdgeValues<K, VV, OLD, NEW> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		TranslateEdgeValues rhs = (TranslateEdgeValues) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 3eb5113..5568775 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeIds;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
 
@@ -44,9 +43,6 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}.
 	 *
@@ -58,43 +54,13 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateGraphIds<OLD, NEW, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TranslateGraphIds rhs = (TranslateGraphIds) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 38f415c..c9c94d7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
 
 /**
@@ -42,9 +41,6 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Vertex} values using the given {@link TranslateFunction}.
 	 *
@@ -56,43 +52,13 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateVertexValues<K, OLD, NEW, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TranslateVertexValues rhs = (TranslateVertexValues) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
index 1c4d097..a1906e1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.generator;
 
+import org.apache.flink.util.Preconditions;
+
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
@@ -35,6 +37,9 @@ implements GraphGenerator<K, VV, EV> {
 
 	@Override
 	public GraphGenerator<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
index 7357fbc..9e26dfe 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -22,9 +22,6 @@ package org.apache.flink.graph.library.clustering;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Common configuration for directed and undirected Triangle Listing algorithms.
@@ -40,8 +37,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 	// Optional configuration
 	protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
 
-	protected int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Normalize the triangle listing such that for each result (K0, K1, K2)
 	 * the vertex IDs are sorted K0 < K1 < K2.
@@ -55,37 +50,12 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriangleListingBase<K, VV, EV, R> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected final boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TriangleListingBase.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected final void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TriangleListingBase rhs = (TriangleListingBase) other;
 
-		// merge configurations
-
 		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index bfeb3d5..2ff5dc2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The average clustering coefficient measures the mean connectedness of a
  * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -52,21 +50,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *
@@ -81,7 +64,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
 			.run(new LocalClusteringCoefficient<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index 03f06b1..a6b0baa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -30,8 +31,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The global clustering coefficient measures the connectedness of a graph.
  * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -45,22 +44,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	/*
 	 * Implementation notes:
@@ -79,12 +63,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 03c8808..981110f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -41,9 +41,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * The local clustering coefficient measures the connectedness of each vertex's
@@ -67,8 +64,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default the vertex set is checked for zero degree vertices. When this
 	 * flag is disabled only clustering coefficient scores for vertices with
@@ -84,45 +79,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -141,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// u, v, w, bitmask
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, edge count
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
@@ -159,7 +133,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Vertex<K, Degrees>> vertexDegree = input
 			.run(new VertexDegrees<K, VV, EV>()
 				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree
@@ -167,7 +141,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.where(0)
 			.equalTo(0)
 			.with(new JoinVertexDegreeWithTriangleCount<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 6ec9f0f..93eadc5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -37,8 +37,6 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
@@ -56,21 +54,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private VertexDegreesHelper<K> vertexDegreesHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
 	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {
@@ -80,7 +63,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.output(triangleListingHelper)
 				.name("Triangle counts");
 
@@ -88,7 +71,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		input
 			.run(new VertexDegrees<K, VV, EV>()
-				.setParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.output(vertexDegreesHelper)
 				.name("Edge and triplet counts");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 23e88d1..beddd24 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -84,26 +84,26 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
 			.getEdges()
 			.map(new OrderByID<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Order by ID")
 			.groupBy(0, 1)
 			.reduceGroup(new ReduceBitmask<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Flatten by ID");
 
 		// u, v, (deg(u), deg(v))
 		DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
 			.run(new EdgeDegreesPair<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
 			.map(new OrderByDegree<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Order by degree")
 			.groupBy(0, 1)
 			.reduceGroup(new ReduceBitmask<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Flatten by degree");
 
 		// u, v, w, bitmask where (u, v) and (u, w) are edges in graph

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index c2c2ad2..c0ddb05 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The average clustering coefficient measures the mean connectedness of a
  * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -52,21 +50,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *
@@ -81,7 +64,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
 			.run(new LocalClusteringCoefficient<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index 5377c1f..ef212ae 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -30,8 +31,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The global clustering coefficient measures the connectedness of a graph.
  * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -45,22 +44,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	/*
 	 * Implementation notes:
@@ -79,12 +63,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 3cb4b87..0beb989 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -40,9 +40,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * The local clustering coefficient measures the connectedness of each vertex's
@@ -66,8 +63,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default the vertex set is checked for zero degree vertices. When this
 	 * flag is disabled only clustering coefficient scores for vertices with
@@ -83,44 +78,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -139,7 +114,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// u, v, w
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, 1
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
@@ -157,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Vertex<K, LongValue>> vertexDegree = input
 			.run(new VertexDegree<K, VV, EV>()
 				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree
@@ -165,7 +140,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.where(0)
 			.equalTo(0)
 			.with(new JoinVertexDegreeWithTriangleCount<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index c5a323d..f440098 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -34,8 +35,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.math.BigInteger;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
@@ -54,22 +53,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	@Override
 	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
@@ -81,12 +65,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 9f93b4b..c714bed 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -84,18 +84,18 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple2<K, K>> filteredByID = input
 			.getEdges()
 			.flatMap(new FilterByID<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Filter by ID");
 
 		// u, v, (edge value, deg(u), deg(v))
 		DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = input
 			.run(new EdgeDegreePair<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple2<K, K>> filteredByDegree = pairDegree
 			.flatMap(new FilterByDegree<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Filter by degree");
 
 		// u, v, w where (u, v) and (u, w) are edges in graph, v < w

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index 8bc30d3..6b41ee4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -49,8 +49,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
  * vertex in a directed graph. A good "hub" links to good "authorities" and
@@ -79,9 +77,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private double convergenceThreshold;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Hyperlink-Induced Topic Search with a fixed number of iterations.
 	 *
@@ -119,36 +114,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		this.convergenceThreshold = convergenceThreshold;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public HITS<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!HITS.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		HITS rhs = (HITS) other;
 
-		// merge configurations
-
 		maxIterations = Math.max(maxIterations, rhs.maxIterations);
 		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override
@@ -172,7 +145,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Sum");
 
 		IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
+			.iterate(maxIterations)
+			.setParallelism(parallelism);
 
 		// ID, hubbiness
 		DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index c82f68c..af56e50 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -56,8 +56,6 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.Iterator;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * PageRank computes a per-vertex score which is the sum of PageRank scores
  * transmitted over in-edges. Each vertex's score is divided evenly among
@@ -86,9 +84,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private double convergenceThreshold;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * PageRank with a fixed number of iterations.
 	 *
@@ -131,36 +126,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		this.convergenceThreshold = convergenceThreshold;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public PageRank<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!PageRank.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		PageRank rhs = (PageRank) other;
 
-		// merge configurations
-
 		maxIterations = Math.max(maxIterations, rhs.maxIterations);
 		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override
@@ -197,7 +170,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Initialize scores");
 
 		IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
+			.iterate(maxIterations)
+			.setParallelism(parallelism);
 
 		// s, projected pagerank(s)
 		DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 30d3563..c88e401 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -46,8 +46,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following edge metrics in a directed graph.
  *  - number of triangle triplets
@@ -72,20 +70,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private EdgeMetricsHelper<K> edgeMetricsHelper;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 3931f65..97ee6fa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -36,8 +36,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following vertex metrics in a directed graph.
  *  - number of vertices
@@ -79,8 +77,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 	// Optional configuration
 	private boolean includeZeroDegreeVertices = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -96,18 +92,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
 	public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index d1c7fb7..4c0d654 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -41,8 +41,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following edge metrics in an undirected graph.
  *  - number of triangle triplets
@@ -70,8 +68,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 	// Optional configuration
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -87,18 +83,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 000f4e0..1116149 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -36,8 +36,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following vertex metrics in an undirected graph.
  *  - number of vertices
@@ -71,8 +69,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -103,18 +99,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
 	public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 5f1ac3c..78dff0b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -51,8 +51,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
  *
@@ -84,8 +82,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private float minimumRatio = 0.0f;
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Filter out Adamic-Adar scores less than the given minimum.
 	 *
@@ -114,44 +110,15 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AdamicAdar<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!AdamicAdar.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		AdamicAdar rhs = (AdamicAdar) other;
 
-		// verify that configurations can be merged
-
-		if (minimumRatio != rhs.minimumRatio ||
-			minimumScore != rhs.minimumScore) {
-			return false;
-		}
-
-		// merge configurations
-
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
+		return minimumRatio == rhs.minimumRatio && minimumScore == rhs.minimumScore;
 	}
 
 	/*
@@ -168,9 +135,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// s, d(s), 1/log(d(s))
 		DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.map(new VertexInverseLogDegree<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Vertex score");
 
 		// s, t, 1/log(d(s))
@@ -181,7 +148,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.equalTo(0)
 			.projectFirst(0, 1)
 			.<Tuple3<K, K, FloatValue>>projectSecond(2)
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Edge score");
 
 		// group span, s, t, 1/log(d(s))
@@ -189,16 +156,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
 			.reduceGroup(new GenerateGroupSpans<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate group spans");
 
 		// group, s, t, 1/log(d(s))
 		DataSet<Tuple4<IntValue, K, K, FloatValue>> groups = groupSpans
 			.rebalance()
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Rebalance")
 			.flatMap(new GenerateGroups<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate groups");
 
 		// t, u, 1/log(d(s)) where (s, t) and (s, u) are edges in graph
@@ -218,7 +185,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			// total score, number of pairs of neighbors
 			DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
 				.map(new ComputeScoreFromVertex<K>())
-					.setParallelism(littleParallelism)
+					.setParallelism(parallelism)
 					.name("Average score")
 				.sum(0)
 				.andSum(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 64f325c..8e820ac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -45,8 +45,6 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The Jaccard Index measures the similarity between vertex neighborhoods and
  * is computed as the number of shared neighbors divided by the number of
@@ -84,8 +82,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private boolean mirrorResults;
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Override the default group size for the quadratic expansion of neighbor
 	 * pairs. Small groups generate more data whereas large groups distribute
@@ -158,49 +154,29 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public JaccardIndex<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!JaccardIndex.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		JaccardIndex rhs = (JaccardIndex) other;
 
-		// verify that configurations can be merged
+		return unboundedScores == rhs.unboundedScores &&
+			minimumScoreNumerator == rhs.minimumScoreNumerator &&
+			minimumScoreDenominator == rhs.minimumScoreDenominator &&
+			maximumScoreNumerator == rhs.maximumScoreNumerator &&
+			maximumScoreDenominator == rhs.maximumScoreDenominator &&
+			mirrorResults == rhs.mirrorResults;
+	}
 
-		if (unboundedScores != rhs.unboundedScores ||
-			minimumScoreNumerator != rhs.minimumScoreNumerator ||
-			minimumScoreDenominator != rhs.minimumScoreDenominator ||
-			maximumScoreNumerator != rhs.maximumScoreNumerator ||
-			maximumScoreDenominator != rhs.maximumScoreDenominator ||
-			mirrorResults != rhs.mirrorResults) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		JaccardIndex rhs = (JaccardIndex) other;
 
 		groupSize = Math.max(groupSize, rhs.groupSize);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -217,23 +193,23 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// s, t, d(t)
 		DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
 			.run(new EdgeTargetDegree<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// group span, s, t, d(t)
 		DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = neighborDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
 			.reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate group spans");
 
 		// group, s, t, d(t)
 		DataSet<Tuple4<IntValue, K, K, IntValue>> groups = groupSpans
 			.rebalance()
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Rebalance")
 			.flatMap(new GenerateGroups<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate groups");
 
 		// t, u, d(t)+d(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
index 21aa971..2f5aa83 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
@@ -22,6 +22,9 @@ package org.apache.flink.graph.utils.proxy;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
@@ -39,6 +42,29 @@ import org.apache.flink.graph.GraphAlgorithm;
 public abstract class GraphAlgorithmWrappingBase<K, VV, EV, R>
 implements GraphAlgorithm<K, VV, EV, R> {
 
+	protected int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Set the parallelism for this algorithm's operators. This parameter is
+	 * necessary because processing a small amount of data with high operator
+	 * parallelism is slow and wasteful with memory and buffers.
+	 *
+	 * <p>Operator parallelism should be set to this given value unless
+	 * processing asymptotically more data, in which case the default job
+	 * parallelism should be inherited.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public GraphAlgorithmWrappingBase<K, VV, EV, R> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
 	/**
 	 * Algorithms are identified by name rather than by class to allow subclassing.
 	 *
@@ -50,12 +76,34 @@ implements GraphAlgorithm<K, VV, EV, R> {
 	}
 
 	/**
-	 * An algorithm must first test whether the configurations can be merged
-	 * before merging individual fields.
+	 * First test whether the algorithm configurations can be merged before the
+	 * call to {@link #mergeConfiguration}.
 	 *
-	 * @param other the algorithm with which to compare and merge
-	 * @return true if and only if configuration has been merged and the
-	 *          algorithm's output can be reused
+	 * @param other the algorithm with which to compare configuration
+	 * @return true if and only if configuration can be merged and the
+	 *         algorithm's output can be reused
+	 *
+	 * @see #mergeConfiguration(GraphAlgorithmWrappingBase)
+	 */
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		Preconditions.checkNotNull(other);
+
+		return this.getClass().equals(other.getClass());
+	}
+
+	/**
+	 * Merge the other configuration into this algorithm's after the call to
+	 * {@link #canMergeConfigurationWith} has checked that the configurations
+	 * can be merged.
+	 *
+	 * @param other the algorithm from which to merge configuration
+	 *
+	 * @see #canMergeConfigurationWith(GraphAlgorithmWrappingBase)
 	 */
-	protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingBase<K, VV, EV, R> other);
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		Preconditions.checkNotNull(other);
+
+		parallelism = (parallelism == PARALLELISM_DEFAULT) ? other.parallelism :
+			((other.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, other.parallelism));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
index b329f1e..78a79ab 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -101,7 +101,9 @@ extends GraphAlgorithmWrappingBase<K, VV, EV, DataSet<T>> {
 
 		if (cache.containsKey(this)) {
 			for (GraphAlgorithmWrappingDataSet<K, VV, EV, T> other : cache.get(this)) {
-				if (mergeConfiguration(other)) {
+				if (canMergeConfigurationWith(other)) {
+					mergeConfiguration(other);
+
 					// configuration has been merged so generate new output
 					DataSet<T> output = runInternal(input);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
index 09b5f5e..4f77fb7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
@@ -106,7 +106,9 @@ extends GraphAlgorithmWrappingBase<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_
 
 		if (cache.containsKey(this)) {
 			for (GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
-				if (mergeConfiguration(other)) {
+				if (canMergeConfigurationWith(other)) {
+					mergeConfiguration(other);
+
 					// configuration has been merged so generate new output
 					Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
 


Mime
View raw message