flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From va...@apache.org
Subject [1/4] flink git commit: [FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration
Date Fri, 10 Apr 2015 08:29:25 GMT
Repository: flink
Updated Branches:
  refs/heads/master e45f13f53 -> c518df944


[FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e98bd853
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e98bd853
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e98bd853

Branch: refs/heads/master
Commit: e98bd85384946f9c6834542a39e0eb63c2c95f15
Parents: e45f13f
Author: vasia <vasilikikalavri@gmail.com>
Authored: Sun Mar 29 23:39:08 2015 +0200
Committer: vasia <vasia@apache.org>
Committed: Fri Apr 10 10:53:02 2015 +0300

----------------------------------------------------------------------
 docs/gelly_guide.md                             |  79 ++++++-
 .../main/java/org/apache/flink/graph/Graph.java |  45 +++-
 .../flink/graph/library/LabelPropagation.java   |   6 +-
 .../apache/flink/graph/library/PageRank.java    |   6 +-
 .../graph/library/SimpleCommunityDetection.java |   8 +-
 .../library/SingleSourceShortestPaths.java      |  10 +-
 .../graph/spargel/IterationConfiguration.java   | 192 +++++++++++++++++
 .../graph/spargel/VertexCentricIteration.java   | 168 ++++-----------
 .../graph/spargel/VertexUpdateFunction.java     |   2 +-
 .../test/CollectionModeSuperstepITCase.java     |   7 +-
 .../test/VertexCentricConfigurationITCase.java  | 213 +++++++++++++++++++
 .../VertexCentricConnectedComponentsITCase.java |   4 +-
 12 files changed, 565 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 9cceafc..203cffa 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -347,29 +347,86 @@ Vertex-centric Iterations
 Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric
iterations.
 Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`,
which defines how a vertex will update its value
 based on the received messages and a `MessagingFunction`, which allows a vertex to send out
messages for the next superstep.
-These functions are given as parameters to Gelly's `createVertexCentricIteration`, which
returns a `VertexCentricIteration`. 
-The user can configure this iteration (set the name, the parallelism, aggregators, etc.)
and then run the computation, using the `runVertexCentricIteration` method:
+These functions and the maximum number of iterations to run are given as parameters to Gelly's
`runVertexCentricIteration`.
+This method will execute the vertex-centric iteration on the input Graph and return a new
Graph, with updated vertex values:
 
 {% highlight java %}
 Graph<Long, Double, Double> graph = ...
 
-// create the vertex-centric iteration
-VertexCentricIteration<Long, Double, Double, Double> iteration = 
-			graph.createVertexCentricIteration(
+// run Single-Source-Shortest-Paths vertex-centric iteration
+Graph<Long, Double, Double> result = 
+			graph.runVertexCentricIteration(
 			new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
 
+// user-defined functions
+public static final class VertexDistanceUpdater {...}
+public static final class MinDistanceMessenger {...}
+
+{% endhighlight %}
+
+### Configuring a Vertex-Centric Iteration
+A vertex-centric iteration can be configured using an `IterationConfiguration` object.
+Currently, the following parameters can be specified:
+
+* <strong>Name</strong>: The name for the vertex-centric iteration. The name
is displayed in logs and messages 
+and can be specified using the `setName()` method.
+
+* <strong>Parallelism</strong>: The parallelism for the iteration. It can be
set using the `setParallelism()` method.	
+
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution
set is kept in managed memory (Flink's internal way of keeping object in serialized form)
or as a simple object map. By default, the solution set runs in managed memory. This property
can be set using the `setSolutionSetUnmanagedMemory()` method.
+
+* <strong>Aggregators</strong>: Iteration aggregators can be registered using
the `registerAggregator()` method. An iteration aggregator combines
+all aggregates globally once per superstep and makes them available in the next superstep.
Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and
`MessagingFunction`.
+
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables)
to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()`
and `addBroadcastSetForMessagingFunction()` methods, respectively.
+
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+IterationConfiguration parameters = new IterationConfiguration();
+
 // set the iteration name
-iteration.setName("Single Source Shortest Paths");
+parameters.setName("Gelly Iteration");
 
 // set the parallelism
-iteration.setParallelism(16);
+parameters.setParallelism(16);
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator());
 
-// run the computation
-graph.runVertexCentricIteration(iteration);
+// run the vertex-centric iteration, also passing the configuration parameters
+Graph<Long, Double, Double> result = 
+			graph.runVertexCentricIteration(
+			new VertexUpdater(), new Messenger(), maxIterations, parameters);
 
 // user-defined functions
-public static final class VertexDistanceUpdater {...}
-public static final class MinDistanceMessenger {...}
+public static final class VertexUpdater extends VertexUpdateFunction {
+
+	LongSumAggregator aggregator = new LongSumAggregator();
+
+	public void preSuperstep() {
+	
+		// retrieve the Aggregator
+		aggregator = getIterationAggregator("sumAggregator");
+	}
+
+
+	public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator inMessages) {
+		
+		//do some computation
+		Long partialValue = ...
+
+		// aggregate the partial value
+		aggregator.aggregate(partialValue);
+
+		// update the vertex value
+		setNewVertexValue(...);
+	}
+}
+
+public static final class Messenger extends MessagingFunction {...}
 
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index a73beaf..8280ba9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.spargel.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -1149,30 +1150,52 @@ public class Graph<K extends Comparable<K> & Serializable,
VV extends Serializab
 	}
 
 	/**
-	 * Create a Vertex-Centric iteration on the graph.
-	 * 
+	 * Runs a Vertex-Centric iteration on the graph.
+	 * No configuration options are provided.
+
 	 * @param vertexUpdateFunction the vertex update function
 	 * @param messagingFunction the messaging function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * @return
+	 * 
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
 	 */
-	public <M> VertexCentricIteration<K, VV, M, EV> createVertexCentricIteration(
+	public <M> Graph<K, VV, EV> runVertexCentricIteration(
 			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
 			MessagingFunction<K, VV, M, EV> messagingFunction,
 			int maximumNumberOfIterations) {
-		return VertexCentricIteration.withEdges(edges, vertexUpdateFunction,
-				messagingFunction, maximumNumberOfIterations);
+
+		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
-    
+
 	/**
-	 * Runs a Vertex-Centric iteration on the graph.
+	 * Runs a Vertex-Centric iteration on the graph with configuration options.
+	 * 
+	 * @param vertexUpdateFunction the vertex update function
+	 * @param messagingFunction the messaging function
+	 * @param maximumNumberOfIterations maximum number of iterations to perform
+	 * @param parameters the iteration configuration parameters
 	 * 
-	 * @param iteration the Vertex-Centric iteration to run
-	 * @return
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runVertexCentricIteration(
-			VertexCentricIteration<K, VV, M, EV> iteration) {
+			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+			MessagingFunction<K, VV, M, EV> messagingFunction,
+			int maximumNumberOfIterations, IterationConfiguration parameters) {
+
+		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+		iteration.configure(parameters);
+
 		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 33a04e7..ff6fe85 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,7 +22,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.NullValue;
 
@@ -56,9 +55,8 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
 
 		// iteratively adopt the most frequent label among the neighbors
 		// of each vertex
-		VertexCentricIteration<K, Long, Long, NullValue> iteration = input.createVertexCentricIteration(
-				new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), maxIterations);
-		return input.runVertexCentricIteration(iteration);
+		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+				maxIterations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 00ae204..48c9a51 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 public class PageRank<K extends Comparable<K> & Serializable> implements
@@ -43,11 +42,8 @@ public class PageRank<K extends Comparable<K> & Serializable>
implements
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws
Exception {
 
 		final long numberOfVertices = network.numberOfVertices();
-
-		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
-				new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+		return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
new RankMessenger<K>(numberOfVertices),
 				maxIterations);
-		return network.runVertexCentricIteration(iteration);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
index fb32781..e3d3e1c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.util.Map;
@@ -65,11 +64,8 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long,
Long, Doub
 		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
 				.mapVertices(new AddScoreToVertexValuesMapper());
 
-		VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>,
Double>
-				iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta),
-				new LabelMessenger(), maxIterations);
-
-		return graphWithScoredVertices.runVertexCentricIteration(iteration)
+		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
+				new LabelMessenger(), maxIterations)
 				.mapVertices(new RemoveScoreFromVertexValuesMapper());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 3e9a29d..262b2c5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
@@ -45,12 +44,9 @@ public class SingleSourceShortestPaths<K extends Comparable<K>
& Serializable>
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
 
-		Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId));
-
-		VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration(
-				new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations);
-
-		return mappedInput.runVertexCentricIteration(iteration);
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				maxIterations);
 	}
 
 	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
new file mode 100644
index 0000000..f161d8d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This class is used to configure a vertex-centric iteration.
+ *
+ * An IterationConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link VertexUpdateFunction} and {@link MessagingFunction}.
+ *
+ * The IterationConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration(
+ * VertexUpdateFunction, MessagingFunction, int, IterationConfiguration)}.
+ *
+ */
+public class IterationConfiguration {
+
+	/** the iteration name **/
+	private String name;
+
+	/** the iteration parallelism **/
+	private int parallelism = -1;
+
+	/** the iteration aggregators **/
+	private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+
+	/** the broadcast variables for the update function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the messaging function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** flag that defines whether the solution set is kept in managed memory **/
+	private boolean unmanagedSolutionSet = false;
+	
+	public IterationConfiguration() {}
+
+
+	/**
+	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+	 * 
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	/**
+	 * Gets the name of the vertex-centric iteration.
+	 * @param defaultName 
+	 * 
+	 * @return The name of the iteration.
+	 */
+	public String getName(String defaultName) {
+		if (name != null) {
+			return name;			
+		}
+		else {
+			return defaultName;
+		}
+	}
+
+	/**
+	 * Sets the parallelism for the iteration.
+	 * 
+	 * @param parallelism The parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive,
or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+	
+	/**
+	 * Gets the iteration's parallelism.
+	 * 
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	/**
+	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping
object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
+	 */
+	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
+		this.unmanagedSolutionSet = unmanaged;
+	}
+	
+	/**
+	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping
object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory, false otherwise.
+	 */
+	public boolean isSolutionSetUnmanagedMemory() {
+		return this.unmanagedSolutionSet;
+	}
+
+	/**
+	 * Registers a new aggregator. Aggregators registered here are available during the execution
of the vertex updates
+	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * 
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during
execution. 
+	 * @param aggregator The aggregator.
+	 */
+	public void registerAggregator(String name, Aggregator<?> aggregator) {
+		this.aggregators.put(name, aggregator);
+	}
+	
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the vertex update
function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Gets the set of aggregators that are registered for this vertex-centric iteration.
+	 *
+	 * @return a Map of the registered aggregators, where the key is the aggregator name
+	 * and the value is the Aggregator object
+	 */
+	public Map<String, Aggregator<?>> getAggregators() {
+		return this.aggregators;
+	}
+
+	/**
+	 * Get the broadcast variables of the VertexUpdateFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+		return this.bcVarsUpdate;
+	}
+
+	/**
+	 * Get the broadcast variables of the MessagingFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+		return this.bcVarsMessaging;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index c54ee0c..ca66521 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -19,10 +19,7 @@
 package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
@@ -82,23 +79,13 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 	
 	private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
 	
-	private final Map<String, Aggregator<?>> aggregators;
-	
 	private final int maximumNumberOfIterations;
 	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
 	private final TypeInformation<Message> messageType;
 	
 	private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
-	
-	private String name;
-	
-	private int parallelism = -1;
-	
-	private boolean unmanagedSolutionSet;
+
+	private IterationConfiguration configuration;
 	
 	// ----------------------------------------------------------------------------------
 	
@@ -115,8 +102,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 		this.updateFunction = uf;
 		this.messagingFunction = mf;
 		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();		
+		this.maximumNumberOfIterations = maximumNumberOfIterations;		
 		this.messageType = getMessageType(mf);
 	}
 	
@@ -124,97 +110,6 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
 	}
 	
-	/**
-	 * Registers a new aggregator. Aggregators registered here are available during the execution
of the vertex updates
-	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during
execution. 
-	 * @param aggregator The aggregator.
-	 */
-	public void registerAggregator(String name, Aggregator<?> aggregator) {
-		this.aggregators.put(name, aggregator);
-	}
-	
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the vertex update
function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-	
-	/**
-	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
-	 * 
-	 * @param name The name for the iteration.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	/**
-	 * Gets the name from this vertex-centric iteration.
-	 * 
-	 * @return The name of the iteration.
-	 */
-	public String getName() {
-		return name;
-	}
-	
-	/**
-	 * Sets the parallelism for the iteration.
-	 * 
-	 * @param parallelism The parallelism.
-	 */
-	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive,
or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the iteration's parallelism.
-	 * 
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-	/**
-	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping
object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
-	 */
-	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
-		this.unmanagedSolutionSet = unmanaged;
-	}
-	
-	/**
-	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping
object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @return True, if the solution set is in unmanaged memory, false otherwise.
-	 */
-	public boolean isSolutionSetUnmanagedMemory() {
-		return this.unmanagedSolutionSet;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Custom Operator behavior
 	// --------------------------------------------------------------------------------------------
@@ -249,20 +144,27 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
 		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType,
messageType);
 
-		// set up the iteration operator
-		final String name = (this.name != null) ? this.name :
-			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
 		final int[] zeroKeyPos = new int[] {0};
 	
 		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
iteration =
 			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations,
zeroKeyPos);
-		iteration.name(name);
-		iteration.parallelism(parallelism);
-		iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
-		
-		// register all aggregators
-		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet())
{
-			iteration.registerAggregator(entry.getKey(), entry.getValue());
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName(
+					"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet())
{
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction
+ ")");
 		}
 		
 		// build the messaging function (co group)
@@ -272,8 +174,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 		
 		// configure coGroup message function with name and broadcast variables
 		messages = messages.name("Messaging");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
-			messages = messages.withBroadcastSet(e.f1, e.f0);
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars())
{
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}			
 		}
 		
 		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey,
VertexValue, Message>(updateFunction, vertexTypes);
@@ -284,8 +189,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 		
 		// configure coGroup update function with name and broadcast variables
 		updates = updates.name("Vertex State Updates");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
-			updates = updates.withBroadcastSet(e.f1, e.f0);
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars())
{
+				updates = updates.withBroadcastSet(e.f1, e.f0);
+			}			
 		}
 
 		// let the operator know that we preserve the key field
@@ -452,4 +360,20 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey>
& Se
 			return this.resultType;
 		}
 	}
+
+	/**
+	 * Configures this vertex-centric iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(IterationConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this vertex-centric iteration
+	 */
+	public IterationConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 1157a18..5a7cd5c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -92,7 +92,7 @@ public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKe
 	}
 	
 	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator
is combines
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator
combines
 	 * all aggregates globally once per superstep and makes them available in the next superstep.
 	 * 
 	 * @param name The name of the aggregator.

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index ffe91d9..d84952a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.junit.Assert;
@@ -48,9 +47,8 @@ public class CollectionModeSuperstepITCase {
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),

 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
 		
-		VertexCentricIteration<Long, Long, Long, Long> iteration = 
-				graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
-		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10);
 
 		result.getVertices().map(
 				new VertexToTuple2Map<Long, Long>()).output(
@@ -83,5 +81,4 @@ public class CollectionModeSuperstepITCase {
 			return 1l;
 		}
 	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
new file mode 100644
index 0000000..b497070
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.List;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
+
+	public VertexCentricConfigurationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testRunWithConfiguration() throws Exception {
+		/*
+		 * Test Graph's runVertexCentricIteration when configuration parameters are provided
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),

+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		// create the configuration object
+		IterationConfiguration parameters = new IterationConfiguration();
+
+		parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
+		parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4,
5, 6));
+		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10, parameters);
+
+		result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath,
"\n", "\t");
+		env.execute();
+		expectedResult = "1	11\n" +
+						"2	11\n" +
+						"3	11\n" +
+						"4	11\n" +
+						"5	11";
+	}
+
+	@Test
+	public void testIterationConfiguration() throws Exception {
+
+		/*
+		 * Test name, parallelism and solutionSetUnmanaged parameters
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		VertexCentricIteration<Long, Long, Long, Long> iteration = VertexCentricIteration
+				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), 
+						new DummyMessageFunction(), 10);
+		
+		IterationConfiguration parameters = new IterationConfiguration();
+		parameters.setName("gelly iteration");
+		parameters.setParallelism(2);
+		parameters.setSolutionSetUnmanagedMemory(true);
+		
+		iteration.configure(parameters);
+		
+		Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+		DataSet<Vertex<Long, Long>> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+		
+		result.map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+		env.execute();
+		expectedResult = "1	11\n" +
+						"2	12\n" +
+						"3	13\n" +
+						"4	14\n" +
+						"5	15";
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long>
{
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0));
+			Assert.assertEquals(2, bcastSet.get(1));
+			Assert.assertEquals(3, bcastSet.get(2));
+			
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long>
inMessages) {
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long,
Long> {
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0));
+			Assert.assertEquals(5, bcastSet.get(1));
+			Assert.assertEquals(6, bcastSet.get(2));
+			
+			// test aggregator
+			if (getSuperstepNumber() == 2) {
+				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+				Assert.assertEquals(5, aggrValue);
+			}
+		}
+
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long,
Long> {
+
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long>
inMessages) {
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class DummyMessageFunction extends MessagingFunction<Long, Long,
Long, Long> {
+
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>,
Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
index 8fb9a11..380e027 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -65,8 +64,7 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase
 		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
 		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);

 		
-		VertexCentricIteration<Long, Long, Long, NullValue> iteration = graph.createVertexCentricIteration(new
CCUpdater(), new CCMessager(), 100);
-		Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(iteration);
+		Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(),
new CCMessager(), 100);
 		
 		result.getVertices().writeAsCsv(resultPath, "\n", " ");
 		env.execute();


Mime
View raw message