flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [6/6] flink git commit: [FLINK-5913] [gelly] Example drivers
Date Fri, 31 Mar 2017 16:35:29 GMT
[FLINK-5913] [gelly] Example drivers

Replace existing and create new algorithm Driver implementations for
each of the library methods.

This closes #3635


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a48357db
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a48357db
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a48357db

Branch: refs/heads/master
Commit: a48357db8c4187fd08f3b17880899ebbcb5d3b5e
Parents: ded25be
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Oct 26 15:18:50 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Fri Mar 31 11:17:26 2017 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Usage.java |   2 -
 .../apache/flink/graph/drivers/AdamicAdar.java  |  71 ++++
 .../graph/drivers/ClusteringCoefficient.java    | 378 +++++--------------
 .../graph/drivers/ConnectedComponents.java      | 105 ++++++
 .../apache/flink/graph/drivers/EdgeList.java    |  92 +++++
 .../apache/flink/graph/drivers/Graph500.java    | 165 --------
 .../flink/graph/drivers/GraphMetrics.java       | 265 ++++---------
 .../org/apache/flink/graph/drivers/HITS.java    | 188 ++-------
 .../flink/graph/drivers/JaccardIndex.java       | 224 ++---------
 .../apache/flink/graph/drivers/PageRank.java    |  74 ++++
 .../flink/graph/drivers/SimpleDriver.java       |  65 ++++
 .../flink/graph/drivers/TriangleListing.java    | 362 +++++-------------
 .../drivers/parameter/IterationConvergence.java |  89 +++++
 .../graph/examples/ConnectedComponents.java     | 141 -------
 .../examples/GSASingleSourceShortestPaths.java  |   4 +-
 .../parameter/IterationConvergenceTest.java     |  66 ++++
 .../examples/ConnectedComponentsITCase.java     |  72 ----
 .../main/java/org/apache/flink/graph/Graph.java |  16 +-
 .../graph/library/ConnectedComponents.java      |   5 +-
 .../graph/library/GSAConnectedComponents.java   |   8 +-
 .../flink/graph/library/LabelPropagation.java   |   5 +-
 .../clustering/directed/TriangleListing.java    |   2 +-
 .../undirected/LocalClusteringCoefficient.java  |   2 +-
 .../graph/library/link_analysis/PageRank.java   |   8 +-
 .../graph/library/similarity/AdamicAdar.java    |   2 +-
 .../graph/library/similarity/JaccardIndex.java  |   2 +-
 .../apache/flink/graph/utils/GraphUtils.java    |  10 +-
 .../flink/graph/utils/NullValueEdgeMapper.java  |  32 --
 28 files changed, 919 insertions(+), 1536 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
index d923bf0..642fe5b 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -28,7 +28,6 @@ public class Usage {
 
 	private static final Class[] DRIVERS = new Class[]{
 		org.apache.flink.graph.drivers.ClusteringCoefficient.class,
-		org.apache.flink.graph.drivers.Graph500.class,
 		org.apache.flink.graph.drivers.GraphMetrics.class,
 		org.apache.flink.graph.drivers.HITS.class,
 		org.apache.flink.graph.drivers.JaccardIndex.class,
@@ -36,7 +35,6 @@ public class Usage {
 	};
 
 	private static final Class[] EXAMPLES = new Class[]{
-		org.apache.flink.graph.examples.ConnectedComponents.class,
 		org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
 		org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
 		org.apache.flink.graph.examples.IncrementalSSSP.class,

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
new file mode 100644
index 0000000..742c1de
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
+ */
+public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public String getShortDescription() {
+		return "similarity score weighted by centerpoint degree";
+	}
+
+	@Override
+	public String getLongDescription() {
+		return WordUtils.wrap(new StrBuilder()
+			.appendln("Adamic-Adar measures the similarity between vertex neighborhoods and is " +
+				"computed as the sum of the inverse logarithm of centerpoint degree over shared " +
+				"neighbors.")
+			.appendNewLine()
+			.append("The algorithm result contains two vertex IDs and the similarity score.")
+			.toString(), 80);
+	}
+
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		int lp = littleParallelism.getValue().intValue();
+
+		result = graph
+			.run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
+				.setLittleParallelism(lp));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 004390d..c463c0a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -18,333 +18,127 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementations of Global and Local Clustering Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
+ * Driver for directed and undirected clustering coefficient algorithm and analytics.
  *
+ * @see org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient
  * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
  * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient
  * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
  * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
  */
-public class ClusteringCoefficient {
-
-	private static final int DEFAULT_SCALE = 10;
-
-	private static final int DEFAULT_EDGE_FACTOR = 16;
-
-	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
-				" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
-				" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
-				" is a clique).", 80))
-			.appendNewLine()
-			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
-				" the vertex, and the number of edges between vertex neighbors.", 80))
-			.appendNewLine()
-			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat> --output <print | hash | csv>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
-
-		if (! parameters.has("directed")) {
-			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		// global and local clustering coefficient results
-		GraphAnalytic gcc;
-		GraphAnalytic acc;
-		DataSet lcc;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
+public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends SimpleDriver<PrintableResult>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-										.setParallelism(little_parallelism));
-							}
+	private static final String DIRECTED = "directed";
 
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
-										.setParallelism(little_parallelism));
-							}
+	private static final String UNDIRECTED = "undirected";
 
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
+	private ChoiceParameter order = new ChoiceParameter(this, "order")
+		.addChoices(DIRECTED, UNDIRECTED);
 
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
 
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>()
-										.setParallelism(little_parallelism));
-							}
+	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient;
 
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
-										.setParallelism(little_parallelism));
-							}
+	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
 
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
 
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
+	@Override
+	public String getShortDescription() {
+		return "measure the connectedness of vertex neighborhoods";
+	}
 
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
+	@Override
+	public String getLongDescription() {
+		return WordUtils.wrap(new StrBuilder()
+			.appendln("The local clustering coefficient measures the connectedness of each " +
+				"vertex's neighborhood. The global clustering coefficient measures the " +
+				"connected of the graph. The average clustering coefficient is the mean local " +
+				"clustering coefficient. Each score ranges from 0.0 (no edges between vertex " +
+				"neighbors) to 1.0 (neighborhood or graph is a clique).")
+			.appendNewLine()
+			.append("The algorithm result contains the vertex ID, degree, and number of edges " +
+				"connecting neighbors.")
+			.toString(), 80);
+	}
 
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				}
-			} break;
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		int lp = littleParallelism.getValue().intValue();
 
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
+		switch (order.getValue()) {
+			case DIRECTED:
+				result = graph
+					.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 
-		switch (parameters.get("output", "")) {
-			case "print":
-				if (directedAlgorithm) {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toPrintableString());
-					}
-				} else {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toPrintableString());
-					}
-				}
-				break;
+				globalClusteringCoefficient = graph
+					.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(lcc));
+				averageClusteringCoefficient = graph
+					.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 				break;
 
-			case "csv":
-				String filename = parameters.get("output_filename");
+			case UNDIRECTED:
+				result = graph
+					.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+				globalClusteringCoefficient = graph
+					.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute("Clustering Coefficient");
+				averageClusteringCoefficient = graph
+					.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
+						.setLittleParallelism(lp));
 				break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
 		}
+	}
 
-		System.out.println(gcc.getResult());
-		System.out.println(acc.getResult());
+	@Override
+	public void hash(String executionName) throws Exception {
+		super.hash(executionName);
+		printAnalytics();
+	}
 
-		JobExecutionResult result = env.getLastJobExecutionResult();
+	@Override
+	public void print(String executionName) throws Exception {
+		super.print(executionName);
+		printAnalytics();
+	}
+
+	@Override
+	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+		super.writeCSV(filename, lineDelimiter, fieldDelimiter);
+		printAnalytics();
+	}
 
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	private void printAnalytics() {
+		System.out.println(globalClusteringCoefficient.getResult().toPrintableString());
+		System.out.println(averageClusteringCoefficient.getResult().toPrintableString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
new file mode 100644
index 0000000..32263cf
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.library.GSAConnectedComponents;
+
+import java.util.List;
+
+/**
+ * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}.
+ *
+ * The gather-sum-apply implementation is used because scatter-gather does not
+ * handle object reuse (see FLINK-5891).
+ */
+public class ConnectedComponents<K extends Comparable<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+	private DataSet<Vertex<K, K>> components;
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public String getShortDescription() {
+		return "ConnectedComponents";
+	}
+
+	@Override
+	public String getLongDescription() {
+		return "ConnectedComponents";
+	}
+
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		components = graph
+			.mapVertices(new MapVertices<K, VV>())
+			.run(new GSAConnectedComponents<K, K, EV>(Integer.MAX_VALUE));
+	}
+
+	@Override
+	public void hash(String executionName) throws Exception {
+		Checksum checksum = new ChecksumHashCode<Vertex<K, K>>()
+			.run(components)
+			.execute(executionName);
+
+		System.out.println(checksum);
+	}
+
+	@Override
+	public void print(String executionName) throws Exception {
+		Collect<Vertex<K, K>> collector = new Collect<>();
+
+		// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+		List<Vertex<K, K>> records = collector.run(components).execute(executionName);
+
+		for (Vertex<K, K> result : records) {
+			System.out.println(result);
+		}
+	}
+
+	@Override
+	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+		components
+			.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+				.name("CSV: " + filename);
+	}
+
+	private static final class MapVertices<T, VT>
+	implements MapFunction<Vertex<T, VT>, T> {
+		@Override
+		public T map(Vertex<T, VT> value) throws Exception {
+			return value.f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
new file mode 100644
index 0000000..85f32c3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}s.
+ */
+public class EdgeList<K, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, CSV, Hash, Print {
+
+	private DataSet<Edge<K, EV>> edges;
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public String getShortDescription() {
+		return "the edge list";
+	}
+
+	@Override
+	public String getLongDescription() {
+		return "Pass-through of the graph's edge list.";
+	}
+
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		edges = graph
+			.getEdges();
+	}
+
+	@Override
+	public void hash(String executionName) throws Exception {
+		Checksum checksum = new ChecksumHashCode<Edge<K, EV>>()
+			.run(edges)
+			.execute(executionName);
+
+		System.out.println(checksum);
+	}
+
+	@Override
+	public void print(String executionName) throws Exception {
+		Collect<Edge<K, EV>> collector = new Collect<>();
+
+		// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+		List<Edge<K, EV>> records = collector.run(edges).execute(executionName);
+
+		for (Edge<K, EV> result : records) {
+			System.out.println(result);
+		}
+
+	}
+
+	@Override
+	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+		edges
+			.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+				.name("CSV: " + filename);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
deleted file mode 100644
index c2abbf7..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.drivers;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
- */
-public class Graph500 {
-
-	private static final int DEFAULT_SCALE = 10;
-
-	private static final int DEFAULT_EDGE_FACTOR = 16;
-
-	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.")
-			.appendNewLine()
-			.appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" +
-				" be represented in an edge. The number of edges is edge_factor * 2^scale edges" +
-				" although some edges may be duplicates.", 80))
-			.appendNewLine()
-			.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
-			.appendNewLine()
-			.appendln("usage: Graph500 --directed <true | false> --simplify <true | false> --output <print | hash | csv [options]>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
-
-		if (! parameters.has("directed")) {
-			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
-		}
-		boolean directed = parameters.getBoolean("directed");
-
-		if (! parameters.has("simplify")) {
-			throw new ProgramParametrizationException(getUsage("must declare '--simplify true' or '--simplify false'"));
-		}
-		boolean simplify = parameters.getBoolean("simplify");
-
-
-		// Generate RMat graph
-		int scale = parameters.getInt("scale", DEFAULT_SCALE);
-		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-		long vertexCount = 1L << scale;
-		long edgeCount = vertexCount * edgeFactor;
-
-		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.generate();
-
-		if (directed) {
-			if (simplify) {
-				graph = graph
-					.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-			}
-		} else {
-			if (simplify) {
-				graph = graph
-					.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-			} else {
-				graph = graph.getUndirected();
-			}
-		}
-
-		DataSet<Tuple2<LongValue, LongValue>> edges = graph
-			.getEdges()
-			.project(0, 1);
-
-		// Print, hash, or write RMat graph to disk
-		switch (parameters.get("output", "")) {
-		case "print":
-			System.out.println();
-			edges.print();
-			break;
-
-		case "hash":
-			System.out.println();
-			System.out.println(DataSetUtils.checksumHashCode(edges));
-			break;
-
-		case "csv":
-			String filename = parameters.getRequired("output_filename");
-
-			String lineDelimiter = StringEscapeUtils.unescapeJava(
-				parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-			String fieldDelimiter = StringEscapeUtils.unescapeJava(
-				parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-			edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-			env.execute("Graph500");
-			break;
-		default:
-			throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index 9b246df..cc5a894 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -18,224 +18,109 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.types.CopyableValue;
 
 /**
- * Computes vertex and edge metrics on a directed or undirected graph.
+ * Driver for directed and undirected graph metrics analytics.
  *
  * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
  * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
  * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
  * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
  */
-public class GraphMetrics {
+public class GraphMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV>, Hash, Print {
 
-	private static final int DEFAULT_SCALE = 10;
+	private static final String DIRECTED = "directed";
 
-	private static final int DEFAULT_EDGE_FACTOR = 16;
+	private static final String UNDIRECTED = "undirected";
 
-	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+	private ChoiceParameter order = new ChoiceParameter(this, "order")
+		.addChoices(DIRECTED, UNDIRECTED);
 
-	private static String getUsage(String message) {
+	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> vertexMetrics;
+
+	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> edgeMetrics;
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public String getShortDescription() {
+		return "compute vertex and edge metrics";
+	}
+
+	@Override
+	public String getLongDescription() {
 		return new StrBuilder()
+			.appendln("Computes metrics on a directed or undirected graph.")
 			.appendNewLine()
-			.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
-			.appendNewLine()
-			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat>")
+			.appendln("Vertex metrics:")
+			.appendln("- number of vertices")
+			.appendln("- number of edges")
+			.appendln("- number of unidirectional edges (directed only)")
+			.appendln("- number of bidirectional edges (directed only)")
+			.appendln("- average degree")
+			.appendln("- number of triplets")
+			.appendln("- maximum degree")
+			.appendln("- maximum out degree (directed only)")
+			.appendln("- maximum in degree (directed only)")
+			.appendln("- maximum number of triplets")
 			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
+			.appendln("Edge metrics:")
+			.appendln("- number of triangle triplets")
+			.appendln("- number of rectangle triplets")
+			.appendln("- maximum number of triangle triplets")
+			.append("- maximum number of rectangle triplets")
 			.toString();
 	}
 
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		switch (order.getValue()) {
+			case DIRECTED:
+				vertexMetrics = graph
+					.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
 
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
+				edgeMetrics = graph
+					.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
+				break;
 
-		if (! parameters.has("directed")) {
-			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		GraphAnalytic vm;
-		GraphAnalytic em;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.getRequired("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
-						}
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				}
-				} break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
+			case UNDIRECTED:
+				vertexMetrics = graph
+					.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
+
+				edgeMetrics = graph
+					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
+				break;
 		}
+	}
 
-		env.execute("Graph Metrics");
+	@Override
+	public void hash(String executionName) throws Exception {
+		print(executionName);
+	}
 
-		System.out.println();
-		System.out.print("Vertex metrics:\n  ");
-		System.out.println(vm.getResult().toString().replace(";", "\n "));
-		System.out.println();
-		System.out.print("Edge metrics:\n  ");
-		System.out.println(em.getResult().toString().replace(";", "\n "));
+	@Override
+	public void print(String executionName) throws Exception {
+		vertexMetrics.execute(executionName);
 
-		JobExecutionResult result = env.getLastJobExecutionResult();
+		System.out.print("Vertex metrics:\n  ");
+		System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n "));
 
-		NumberFormat nf = NumberFormat.getInstance();
 		System.out.println();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+		System.out.print("Edge metrics:\n  ");
+		System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n "));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index db27f0e..6081fea 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -18,177 +18,51 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
 
 /**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
+ * Driver for {@link org.apache.flink.graph.library.link_analysis.HITS}.
  */
-public class HITS {
+public class HITS<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
 
 	private static final int DEFAULT_ITERATIONS = 10;
 
-	private static final int DEFAULT_SCALE = 10;
+	private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
 
-	private static final int DEFAULT_EDGE_FACTOR = 16;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
-				" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
-				" and good \"authorities\" are linked from good \"hubs\".", 80))
-			.appendNewLine()
-			.appendln("usage: HITS --input <csv | rmat> --output <print | hash | csv>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
 	}
 
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
-
-		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
-
-		DataSet hits;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.getRequired("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						hits = reader
-							.keyType(LongValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-					} break;
-
-					case "string": {
-						hits = reader
-							.keyType(StringValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (scale > 32) {
-					hits = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-				} else {
-					hits = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-						.run(new Simplify<IntValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
-				}
-				} break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				System.out.println();
-				for (Object e: hits.collect()) {
-					System.out.println(((Result)e).toPrintableString());
-				}
-				break;
-
-			case "hash":
-				System.out.println();
-				System.out.println(DataSetUtils.checksumHashCode(hits));
-				break;
-
-			case "csv":
-				String filename = parameters.getRequired("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute("HITS");
-				break;
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
+	@Override
+	public String getShortDescription() {
+		return "score vertices as hubs and authorities";
+	}
 
-		JobExecutionResult result = env.getLastJobExecutionResult();
+	@Override
+	public String getLongDescription() {
+		return WordUtils.wrap(new StrBuilder()
+			.appendln("Hyperlink-Induced Topic Search computes two interdependent scores for " +
+				"each vertex in a directed graph. A good \"hub\" links to good \"authorities\" " +
+				"and good \"authorities\" are linked to from good \"hubs\".")
+			.appendNewLine()
+			.append("The result contains the vertex ID, hub score, and authority score.")
+			.toString(), 80);
+	}
 
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		result = graph
+			.run(new org.apache.flink.graph.library.link_analysis.HITS<K, VV, EV>(
+				iterationConvergence.getValue().iterations,
+				iterationConvergence.getValue().convergenceThreshold));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 09479a6..1c836ea 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -18,211 +18,57 @@
 
 package org.apache.flink.graph.drivers;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
+import org.apache.flink.types.CopyableValue;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
-public class JaccardIndex {
+public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Hash, Print {
 
-	private static final int DEFAULT_SCALE = 10;
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
 
-	private static final int DEFAULT_EDGE_FACTOR = 16;
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
 
-	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+	@Override
+	public String getShortDescription() {
+		return "similarity score as fraction of common neighbors";
+	}
 
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
-				" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
-				" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
-				" shared).", 80))
-			.appendNewLine()
-			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
-				" number of shared neighbors, and the number of distinct neighbors.", 80))
-			.appendNewLine()
-			.appendln("usage: JaccardIndex --input <csv | rmat> --output <print | hash | csv>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+	@Override
+	public String getLongDescription() {
+		return WordUtils.wrap(new StrBuilder()
+			.appendln("Jaccard Index measures the similarity between vertex neighborhoods and " +
+				"is computed as the number of shared neighbors divided by the number of " +
+				"distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all " +
+				"neighbors are shared).")
 			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
+			.append("The result contains two vertex IDs, the number of shared neighbors, and " +
+				"the number of distinct neighbors.")
+			.toString(), 80);
 	}
 
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		env.getConfig().setGlobalJobParameters(parameters);
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		DataSet ji;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.getRequired("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (parameters.getBoolean("simplify", false)) {
-							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false)
-									.setParallelism(little_parallelism));
-						}
-
-						ji = graph
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (parameters.getBoolean("simplify", false)) {
-							graph = graph
-								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false)
-									.setParallelism(little_parallelism));
-						}
-
-						ji = graph
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-				if (scale > 32) {
-					ji = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				} else {
-					ji = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-							.setParallelism(little_parallelism))
-						.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				}
-				} break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				System.out.println();
-				for (Object e: ji.collect()) {
-					Result result = (Result)e;
-					System.out.println(result.toPrintableString());
-				}
-				break;
-
-			case "hash":
-				System.out.println();
-				System.out.println(DataSetUtils.checksumHashCode(ji));
-				break;
-
-			case "csv":
-				String filename = parameters.getRequired("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute("Jaccard Index");
-				break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		int lp = littleParallelism.getValue().intValue();
 
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+		result = graph
+			.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
+				.setLittleParallelism(lp));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
new file mode 100644
index 0000000..8cef077
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.output.CSV;
+import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.DoubleParameter;
+import org.apache.flink.graph.drivers.parameter.IterationConvergence;
+import org.apache.flink.graph.library.link_analysis.PageRank.Result;
+
+/**
+ * @see org.apache.flink.graph.library.link_analysis.PageRank
+ */
+public class PageRank<K, VV, EV>
+extends SimpleDriver<Result<K>>
+implements Driver<K, VV, EV>, CSV, Print {
+
+	private static final int DEFAULT_ITERATIONS = 10;
+
+	private DoubleParameter dampingFactor = new DoubleParameter(this, "damping_factor")
+		.setDefaultValue(0.85)
+		.setMinimumValue(0.0, false)
+		.setMaximumValue(1.0, false);
+
+	private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public String getShortDescription() {
+		return "score vertices by the number and quality of incoming links";
+	}
+
+	@Override
+	public String getLongDescription() {
+		return new StrBuilder()
+			.appendln("PageRank computes a per-vertex score which is the sum of PageRank scores " +
+				"transmitted over in-edges. Each vertex's score is divided evenly among " +
+				"out-edges. High-scoring vertices are linked to by other high-scoring vertices.")
+			.appendNewLine()
+			.append("The result contains the vertex ID and PageRank score.")
+			.toString();
+	}
+
+	@Override
+	public void plan(Graph<K, VV, EV> graph) throws Exception {
+		result = graph
+			.run(new org.apache.flink.graph.library.link_analysis.PageRank<K, VV, EV>(
+				dampingFactor.getValue(),
+				iterationConvergence.getValue().iterations,
+				iterationConvergence.getValue().convergenceThreshold));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a48357db/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
new file mode 100644
index 0000000..98bdfc5
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.util.List;
+
+/**
+ * A base driver storing a single result {@link DataSet} with values
+ * implementing {@link PrintableResult}.
+ *
+ * @param <R> algorithm's result type
+ */
+public abstract class SimpleDriver<R extends PrintableResult>
+extends ParameterizedBase {
+
+	protected DataSet<? extends R> result;
+
+	public void hash(String executionName) throws Exception {
+		Checksum checksum = new ChecksumHashCode<R>()
+			.run((DataSet<R>) result)
+			.execute(executionName);
+
+		System.out.println(checksum);
+	}
+
+	public void print(String executionName) throws Exception {
+		Collect<R> collector = new Collect<>();
+
+		// Refactored due to openjdk7 compile error: https://travis-ci.org/greghogan/flink/builds/200487761
+		List<R> records = collector.run((DataSet<R>) result).execute(executionName);
+
+		for (R result : records) {
+			System.out.println(result.toPrintableString());
+		}
+	}
+
+	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
+		result
+			.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
+				.name("CSV: " + filename);
+	}
+}


Mime
View raw message