flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-6989] [gelly] Refactor examples with Output interface
Date Thu, 29 Jun 2017 17:36:16 GMT
Repository: flink
Updated Branches:
  refs/heads/master 901a9caf4 -> f8842f697


[FLINK-6989] [gelly] Refactor examples with Output interface

The current organization of the Gelly examples retains full flexibility
by handling the Graph input to the algorithm Driver and having the
Driver overload interfaces for the various output types. The outputs
must be made independent in order to support Transforms which are
applied between the Driver and Output (and also between the Input and
Driver).

This closes #4179


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8842f69
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8842f69
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8842f69

Branch: refs/heads/master
Commit: f8842f6974d524967dfa563d914f4ab03d66aeba
Parents: 901a9ca
Author: Greg Hogan <code@greghogan.com>
Authored: Fri Jun 23 11:29:03 2017 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Thu Jun 29 06:45:42 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/graph/Runner.java     | 113 +++++++++----------
 .../apache/flink/graph/drivers/AdamicAdar.java  |  13 +--
 .../graph/drivers/ClusteringCoefficient.java    |  38 ++-----
 .../graph/drivers/ConnectedComponents.java      |  55 ++-------
 .../org/apache/flink/graph/drivers/Driver.java  |  21 +++-
 .../apache/flink/graph/drivers/DriverBase.java  |  45 ++++++++
 .../apache/flink/graph/drivers/EdgeList.java    |  61 +---------
 .../flink/graph/drivers/GraphMetrics.java       |  35 ++----
 .../org/apache/flink/graph/drivers/HITS.java    |  13 +--
 .../flink/graph/drivers/JaccardIndex.java       |  18 +--
 .../apache/flink/graph/drivers/PageRank.java    |  13 +--
 .../flink/graph/drivers/SimpleDriver.java       | 110 ------------------
 .../flink/graph/drivers/TriangleListing.java    |  38 ++-----
 .../apache/flink/graph/drivers/input/CSV.java   |   9 +-
 .../graph/drivers/input/CirculantGraph.java     |   7 +-
 .../graph/drivers/input/CompleteGraph.java      |   5 -
 .../flink/graph/drivers/input/CycleGraph.java   |   5 -
 .../flink/graph/drivers/input/EchoGraph.java    |   5 -
 .../flink/graph/drivers/input/EmptyGraph.java   |   5 -
 .../graph/drivers/input/GeneratedGraph.java     |   4 +-
 .../flink/graph/drivers/input/GridGraph.java    |   7 +-
 .../graph/drivers/input/HypercubeGraph.java     |   5 -
 .../flink/graph/drivers/input/InputBase.java    |  39 +++++++
 .../flink/graph/drivers/input/PathGraph.java    |   5 -
 .../flink/graph/drivers/input/RMatGraph.java    |   5 -
 .../graph/drivers/input/SingletonEdgeGraph.java |   5 -
 .../flink/graph/drivers/input/StarGraph.java    |   5 -
 .../apache/flink/graph/drivers/output/CSV.java  |  35 ++++--
 .../apache/flink/graph/drivers/output/Hash.java |  35 ++++--
 .../flink/graph/drivers/output/Output.java      |  41 +++++++
 .../flink/graph/drivers/output/OutputBase.java  |  37 ++++++
 .../flink/graph/drivers/output/Print.java       |  48 ++++++--
 .../drivers/parameter/IterationConvergence.java |   7 +-
 .../graph/drivers/parameter/Parameter.java      |   9 ++
 .../drivers/parameter/ParameterizedBase.java    |   8 +-
 .../drivers/parameter/SimpleParameter.java      |   5 +
 .../flink/graph/drivers/parameter/Simplify.java |   7 +-
 .../flink/graph/drivers/EdgeListITCase.java     |  72 ++++++------
 38 files changed, 445 insertions(+), 543 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index e468a58..af2a11c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -19,8 +19,8 @@
 package org.apache.flink.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.drivers.AdamicAdar;
@@ -46,11 +46,11 @@ import org.apache.flink.graph.drivers.input.RMatGraph;
 import org.apache.flink.graph.drivers.input.SingletonEdgeGraph;
 import org.apache.flink.graph.drivers.input.StarGraph;
 import org.apache.flink.graph.drivers.output.Hash;
+import org.apache.flink.graph.drivers.output.Output;
 import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.Parameterized;
 import org.apache.flink.util.InstantiationUtil;
 
-import org.apache.commons.lang3.StringEscapeUtils;
 import org.apache.commons.lang3.text.StrBuilder;
 
 import java.util.ArrayList;
@@ -103,6 +103,11 @@ public class Runner {
 		.addClass(PageRank.class)
 		.addClass(TriangleListing.class);
 
+	private static ParameterizedFactory<Output> outputFactory = new ParameterizedFactory<Output>()
+		.addClass(org.apache.flink.graph.drivers.output.CSV.class)
+		.addClass(Hash.class)
+		.addClass(Print.class);
+
 	/**
 	 * List available algorithms. This is displayed to the user when no valid
 	 * algorithm is given in the program parameterization.
@@ -174,16 +179,12 @@ public class Runner {
 			.appendNewLine()
 			.appendln("Available outputs:");
 
-		if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
-			strBuilder.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-		}
-
-		if (algorithm instanceof Hash) {
-			strBuilder.appendln("  --output hash");
-		}
-
-		if (algorithm instanceof Print) {
-			strBuilder.appendln("  --output print");
+		for (Output output : outputFactory) {
+			strBuilder
+				.append("  --output ")
+				.append(output.getName())
+				.append(" ")
+				.appendln(output.getUsage());
 		}
 
 		return strBuilder
@@ -211,8 +212,11 @@ public class Runner {
 			config.enableObjectReuse();
 		}
 
-		// Usage
+		// ----------------------------------------------------------------------------------------
+		// Usage and configuration
+		// ----------------------------------------------------------------------------------------
 
+		// algorithm and usage
 		if (!parameters.has(ALGORITHM)) {
 			throw new ProgramParametrizationException(getAlgorithmsListing());
 		}
@@ -224,6 +228,7 @@ public class Runner {
 			throw new ProgramParametrizationException("Unknown algorithm name: " + algorithmName);
 		}
 
+		// input and usage
 		if (!parameters.has(INPUT)) {
 			if (!parameters.has(OUTPUT)) {
 				// if neither input nor output is given then print algorithm usage
@@ -232,6 +237,12 @@ public class Runner {
 			throw new ProgramParametrizationException("No input given");
 		}
 
+		try {
+			algorithm.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
+
 		String inputName = parameters.get(INPUT);
 		Input input = inputFactory.get(inputName);
 
@@ -239,72 +250,52 @@ public class Runner {
 			throw new ProgramParametrizationException("Unknown input type: " + inputName);
 		}
 
-		// Input
-
 		try {
 			input.configure(parameters);
 		} catch (RuntimeException ex) {
 			throw new ProgramParametrizationException(ex.getMessage());
 		}
 
-		Graph graph = input.create(env);
-
-		// Algorithm
-
-		algorithm.configure(parameters);
-		algorithm.plan(graph);
-
-		// Output
+		// output and usage
 		if (!parameters.has(OUTPUT)) {
 			throw new ProgramParametrizationException("No output given");
 		}
 
 		String outputName = parameters.get(OUTPUT);
-		String executionNamePrefix = input.getIdentity() + " -> " + algorithmName + " -> ";
+		Output output = outputFactory.get(outputName);
 
-		System.out.println();
+		if (output == null) {
+			throw new ProgramParametrizationException("Unknown output type: " + outputName);
+		}
+
+		try {
+			output.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
 
-		switch (outputName.toLowerCase()) {
-			case "csv":
-				if (algorithm instanceof org.apache.flink.graph.drivers.output.CSV) {
-					String filename = parameters.getRequired("output_filename");
+		// ----------------------------------------------------------------------------------------
+		// Execute
+		// ----------------------------------------------------------------------------------------
 
-					String lineDelimiter = StringEscapeUtils.unescapeJava(
-						parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+		// Create input
+		Graph graph = input.create(env);
 
-					String fieldDelimiter = StringEscapeUtils.unescapeJava(
-						parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+		// Run algorithm
+		DataSet results = algorithm.plan(graph);
 
-					org.apache.flink.graph.drivers.output.CSV c = (org.apache.flink.graph.drivers.output.CSV) algorithm;
-					c.writeCSV(filename, lineDelimiter, fieldDelimiter);
+		// Output
+		String executionName = input.getIdentity() + " ⇨ " + algorithmName + " ⇨ " + output.getName();
 
-					env.execute(executionNamePrefix + "CSV");
-				} else {
-					throw new ProgramParametrizationException("Algorithm does not support output type 'CSV'");
-				}
-				break;
-
-			case "hash":
-				if (algorithm instanceof Hash) {
-					Hash h = (Hash) algorithm;
-					h.hash(executionNamePrefix + "Hash");
-				} else {
-					throw new ProgramParametrizationException("Algorithm does not support output type 'hash'");
-				}
-				break;
-
-			case "print":
-				if (algorithm instanceof Print) {
-					Print h = (Print) algorithm;
-					h.print(executionNamePrefix + "Print");
-				} else {
-					throw new ProgramParametrizationException("Algorithm does not support output type 'print'");
-				}
-				break;
+		System.out.println();
 
-			default:
-				throw new ProgramParametrizationException("Unknown output type: " + outputName);
+		if (results == null) {
+			env.execute(executionName);
+		} else {
+			output.write(executionName, System.out, results);
 		}
+
+		algorithm.printAnalytics(System.out);
 	}
 
 	/**
@@ -336,7 +327,7 @@ public class Runner {
 		 */
 		public T get(String name) {
 			for (T instance : this) {
-				if (name.equals(instance.getName())) {
+				if (name.equalsIgnoreCase(instance.getName())) {
 					return instance;
 				}
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
index c5867ed..e439ccd 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -20,11 +20,8 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -36,8 +33,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
  */
 public class AdamicAdar<K extends CopyableValue<K>, VV, EV>
-extends SimpleDriver<K, VV, EV, Result<K>>
-implements CSV, Print {
+extends DriverBase<K, VV, EV> {
 
 	private DoubleParameter minRatio = new DoubleParameter(this, "minimum_ratio")
 		.setDefaultValue(0.0)
@@ -51,11 +47,6 @@ implements CSV, Print {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "similarity score weighted by centerpoint degree";
 	}
@@ -72,7 +63,7 @@ implements CSV, Print {
 	}
 
 	@Override
-	protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 
 		return graph

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index bcd8ec4..14e953a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -22,9 +22,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
@@ -32,6 +29,8 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
+import java.io.PrintStream;
+
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
@@ -45,8 +44,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
  */
 public class ClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends SimpleDriver<K, VV, EV, PrintableResult>
-implements CSV, Hash, Print {
+extends DriverBase<K, VV, EV> {
 
 	private static final String DIRECTED = "directed";
 
@@ -63,11 +61,6 @@ implements CSV, Hash, Print {
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "measure the connectedness of vertex neighborhoods";
 	}
@@ -87,7 +80,7 @@ implements CSV, Hash, Print {
 	}
 
 	@Override
-	protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 
 		switch (order.getValue()) {
@@ -127,25 +120,8 @@ implements CSV, Hash, Print {
 	}
 
 	@Override
-	public void hash(String executionName) throws Exception {
-		super.hash(executionName);
-		printAnalytics();
-	}
-
-	@Override
-	public void print(String executionName) throws Exception {
-		super.print(executionName);
-		printAnalytics();
-	}
-
-	@Override
-	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
-		super.writeCSV(filename, lineDelimiter, fieldDelimiter);
-		printAnalytics();
-	}
-
-	private void printAnalytics() {
-		System.out.println(globalClusteringCoefficient.getResult().toPrintableString());
-		System.out.println(averageClusteringCoefficient.getResult().toPrintableString());
+	public void printAnalytics(PrintStream out) {
+		out.println(globalClusteringCoefficient.getResult().toPrintableString());
+		out.println(averageClusteringCoefficient.getResult().toPrintableString());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
index 8a158a2..32f94c1 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ConnectedComponents.java
@@ -22,17 +22,8 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
-import org.apache.flink.graph.asm.dataset.Collect;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.graph.library.GSAConnectedComponents;
 
-import java.util.List;
-
 /**
  * Driver for {@link org.apache.flink.graph.library.GSAConnectedComponents}.
  *
@@ -40,15 +31,7 @@ import java.util.List;
  * handle object reuse (see FLINK-5891).
  */
 public class ConnectedComponents<K extends Comparable<K>, VV, EV>
-extends ParameterizedBase
-implements Driver<K, VV, EV>, CSV, Hash, Print {
-
-	private DataSet<Vertex<K, K>> components;
-
-	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
+extends DriverBase<K, VV, EV> {
 
 	@Override
 	public String getShortDescription() {
@@ -61,37 +44,19 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
 	}
 
 	@Override
-	public void plan(Graph<K, VV, EV> graph) throws Exception {
-		components = graph
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
+		return graph
 			.mapVertices(new MapVertices<K, VV>())
 			.run(new GSAConnectedComponents<K, K, EV>(Integer.MAX_VALUE));
 	}
 
-	@Override
-	public void hash(String executionName) throws Exception {
-		Checksum checksum = new ChecksumHashCode<Vertex<K, K>>()
-			.run(components)
-			.execute(executionName);
-
-		System.out.println(checksum);
-	}
-
-	@Override
-	public void print(String executionName) throws Exception {
-		List<Vertex<K, K>> results = new Collect<Vertex<K, K>>().run(components).execute(executionName);
-
-		for (Vertex<K, K> result : results) {
-			System.out.println(result);
-		}
-	}
-
-	@Override
-	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
-		components
-			.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
-				.name("CSV: " + filename);
-	}
-
+	/**
+	 * Initialize vertices into separate components by setting each vertex
+	 * value to the vertex ID.
+	 *
+	 * @param <T> vertex ID type
+	 * @param <VT> vertex value type
+	 */
 	private static final class MapVertices<T, VT>
 	implements MapFunction<Vertex<T, VT>, T> {
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
index fda9079..4ec7653 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Driver.java
@@ -18,14 +18,17 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.drivers.parameter.Parameterized;
 
+import java.io.PrintStream;
+
 /**
- * A driver for one or more {@link GraphAlgorithm}s and/or
- * {@link GraphAnalytic}s.
+ * A driver for one or more {@link GraphAlgorithm} and/or
+ * {@link GraphAnalytic}.
  *
  * <p>It is preferable to include multiple, overlapping algorithms/analytics in
  * the same driver both for simplicity and since this examples module
@@ -59,8 +62,20 @@ extends Parameterized {
 	 * <p>Drivers are first configured, next planned, and finally the chosen
 	 * output method is called.
 	 *
+	 * <p>A {@code null} value should be returned when the {@link Driver} does
+	 * not execute a {@link GraphAlgorithm} but only executes a
+	 * {@link GraphAnalytic}.
+	 *
 	 * @param graph input graph
 	 * @throws Exception on error
 	 */
-	void plan(Graph<K, VV, EV> graph) throws Exception;
+	DataSet plan(Graph<K, VV, EV> graph) throws Exception;
+
+	/**
+	 * Analytic results are summaries so are always printed to the console
+	 * irrespective of the chosen {@code Output}.
+	 *
+	 * @param out output stream for printing results
+	 */
+	void printAnalytics(PrintStream out);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
new file mode 100644
index 0000000..38e4ea84
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+import java.io.PrintStream;
+
+/**
+ * Base class for example drivers.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public abstract class DriverBase<K, VV, EV>
+extends ParameterizedBase
+implements Driver<K, VV, EV> {
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
+	@Override
+	public void printAnalytics(PrintStream out) {
+		// analytics are optionally executed by drivers overriding this method
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
index 5da0284..287e222 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -25,18 +25,9 @@ import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
-import org.apache.flink.graph.asm.dataset.Collect;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.graph.utils.EdgeToTuple2Map;
 import org.apache.flink.types.NullValue;
 
-import java.util.List;
-
 /**
  * Convert a {@link Graph} to the {@link DataSet} of {@link Edge}.
  *
@@ -45,15 +36,7 @@ import java.util.List;
  * @param <EV> edge value type
  */
 public class EdgeList<K, VV, EV>
-extends ParameterizedBase
-implements Driver<K, VV, EV>, CSV, Hash, Print {
-
-	private DataSet<Edge<K, EV>> edges;
-
-	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
+extends DriverBase<K, VV, EV> {
 
 	@Override
 	public String getShortDescription() {
@@ -66,47 +49,15 @@ implements Driver<K, VV, EV>, CSV, Hash, Print {
 	}
 
 	@Override
-	public void plan(Graph<K, VV, EV> graph) throws Exception {
-		edges = graph
-			.getEdges();
-	}
-
-	@Override
-	public void hash(String executionName) throws Exception {
-		Checksum checksum = new ChecksumHashCode<Edge<K, EV>>()
-			.run(edges)
-			.execute(executionName);
-
-		System.out.println(checksum);
-	}
-
-	@Override
-	public void print(String executionName) throws Exception {
-		List<Edge<K, EV>> records = new Collect<Edge<K, EV>>().run(edges).execute(executionName);
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
+		DataSet<Edge<K, EV>> edges = graph.getEdges();
 
 		if (hasNullValueEdges(edges)) {
-			for (Edge<K, EV> result : records) {
-				System.out.println("(" + result.f0 + "," + result.f1 + ")");
-			}
-		} else {
-			for (Edge<K, EV> result : records) {
-				System.out.println(result);
-			}
-		}
-	}
-
-	@Override
-	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
-		if (hasNullValueEdges(edges)) {
-			edges
+			return edges
 				.map(new EdgeToTuple2Map<K, EV>())
-					.name("Edge to Tuple2")
-				.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
-					.name("CSV: " + filename);
+				.name("Edge to Tuple2");
 		} else {
-			edges
-				.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
-					.name("CSV: " + filename);
+			return edges;
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index ea02225..f4da13c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -18,16 +18,16 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 
 import org.apache.commons.lang3.text.StrBuilder;
 
+import java.io.PrintStream;
+
 /**
  * Driver for directed and undirected graph metrics analytics.
  *
@@ -37,8 +37,7 @@ import org.apache.commons.lang3.text.StrBuilder;
  * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
  */
 public class GraphMetrics<K extends Comparable<K>, VV, EV>
-extends ParameterizedBase
-implements Driver<K, VV, EV>, Hash, Print {
+extends DriverBase<K, VV, EV> {
 
 	private static final String DIRECTED = "directed";
 
@@ -52,11 +51,6 @@ implements Driver<K, VV, EV>, Hash, Print {
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> edgeMetrics;
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "compute vertex and edge metrics";
 	}
@@ -87,7 +81,7 @@ implements Driver<K, VV, EV>, Hash, Print {
 	}
 
 	@Override
-	public void plan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		switch (order.getValue()) {
 			case DIRECTED:
 				vertexMetrics = graph
@@ -105,22 +99,17 @@ implements Driver<K, VV, EV>, Hash, Print {
 					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
 				break;
 		}
-	}
 
-	@Override
-	public void hash(String executionName) throws Exception {
-		print(executionName);
+		return null;
 	}
 
 	@Override
-	public void print(String executionName) throws Exception {
-		vertexMetrics.execute(executionName);
-
-		System.out.print("Vertex metrics:\n  ");
-		System.out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n "));
+	public void printAnalytics(PrintStream out) {
+		out.print("Vertex metrics:\n  ");
+		out.println(vertexMetrics.getResult().toPrintableString().replace(";", "\n "));
 
-		System.out.println();
-		System.out.print("Edge metrics:\n  ");
-		System.out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n "));
+		out.println();
+		out.print("Edge metrics:\n  ");
+		out.println(edgeMetrics.getResult().toPrintableString().replace(";", "\n "));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index 6f24c09..1987421 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -20,10 +20,7 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.IterationConvergence;
-import org.apache.flink.graph.library.linkanalysis.HITS.Result;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
@@ -32,19 +29,13 @@ import org.apache.commons.lang3.text.WordUtils;
  * Driver for {@link org.apache.flink.graph.library.linkanalysis.HITS}.
  */
 public class HITS<K, VV, EV>
-extends SimpleDriver<K, VV, EV, Result<K>>
-implements CSV, Print {
+extends DriverBase<K, VV, EV> {
 
 	private static final int DEFAULT_ITERATIONS = 10;
 
 	private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "score vertices as hubs and authorities";
 	}
@@ -61,7 +52,7 @@ implements CSV, Print {
 	}
 
 	@Override
-	protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.HITS<K, VV, EV>(
 				iterationConvergence.getValue().iterations,

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index f6e10f0..8f6cfb7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -20,12 +20,8 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
-import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -37,8 +33,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
 public class JaccardIndex<K extends CopyableValue<K>, VV, EV>
-extends SimpleDriver<K, VV, EV, Result<K>>
-implements CSV, Hash, Print {
+extends DriverBase<K, VV, EV> {
 
 	private LongParameter minNumerator = new LongParameter(this, "minimum_numerator")
 		.setDefaultValue(0)
@@ -56,15 +51,10 @@ implements CSV, Hash, Print {
 		.setDefaultValue(1)
 		.setMinimumValue(1);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");
 
-	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
 	public String getShortDescription() {
@@ -85,7 +75,7 @@ implements CSV, Hash, Print {
 	}
 
 	@Override
-	protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 
 		return graph

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index b2602b9..224dea8 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -20,11 +20,8 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.IterationConvergence;
-import org.apache.flink.graph.library.linkanalysis.PageRank.Result;
 
 import org.apache.commons.lang3.text.StrBuilder;
 
@@ -32,8 +29,7 @@ import org.apache.commons.lang3.text.StrBuilder;
  * @see org.apache.flink.graph.library.linkanalysis.PageRank
  */
 public class PageRank<K, VV, EV>
-extends SimpleDriver<K, VV, EV, Result<K>>
-implements CSV, Print {
+extends DriverBase<K, VV, EV> {
 
 	private static final int DEFAULT_ITERATIONS = 10;
 
@@ -45,11 +41,6 @@ implements CSV, Print {
 	private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "score vertices by the number and quality of incoming links";
 	}
@@ -66,7 +57,7 @@ implements CSV, Print {
 	}
 
 	@Override
-	protected DataSet<Result<K>> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.PageRank<K, VV, EV>(
 				dampingFactor.getValue(),

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
deleted file mode 100644
index a5ace26..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/SimpleDriver.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.drivers;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
-import org.apache.flink.graph.asm.dataset.Collect;
-import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
-
-import java.util.List;
-
-/**
- * A base driver storing a single result {@link DataSet} with values
- * implementing {@link PrintableResult}.
- *
- * @param <R> algorithm's result type
- */
-public abstract class SimpleDriver<K, VV, EV, R extends PrintableResult>
-extends ParameterizedBase
-implements Driver<K, VV, EV> {
-
-	private DataSet<R> result;
-
-	protected DataSet<R> getResult() {
-		return result;
-	}
-
-	/**
-	 * Plan the algorithm and return the result {@link DataSet}.
-	 *
-	 * @param graph input graph
-	 * @return driver output
-	 * @throws Exception on error
-	 */
-	protected abstract DataSet<R> simplePlan(Graph<K, VV, EV> graph) throws Exception;
-
-	@Override
-	public void plan(Graph<K, VV, EV> graph) throws Exception {
-		result = simplePlan(graph);
-	}
-
-	/**
-	 * Print hash of execution results.
-	 *
-	 * <p>Does *not* implement/override {@code Hash} since {@link Driver}
-	 * implementations designate the appropriate outputs.
-	 *
-	 * @param executionName job name
-	 * @throws Exception on error
-	 */
-	public void hash(String executionName) throws Exception {
-		Checksum checksum = new ChecksumHashCode<R>()
-			.run(result)
-			.execute(executionName);
-
-		System.out.println(checksum);
-	}
-
-	/**
-	 * Print execution results.
-	 *
-	 * <p>Does *not* implement/override {@code Print} since {@link Driver}
-	 * implementations designate the appropriate outputs.
-	 *
-	 * @param executionName job name
-	 * @throws Exception on error
-	 */
-	public void print(String executionName) throws Exception {
-		List<R> results = new Collect<R>().run(result).execute(executionName);
-
-		for (R result : results) {
-			System.out.println(result.toPrintableString());
-		}
-	}
-
-	/**
-	 * Write execution results to file using CSV format.
-	 *
-	 * <p>Does *not* implement/override {@code CSV} since {@link Driver}
-	 * implementations designate the appropriate outputs.
-	 *
-	 * @param filename output filename
-	 * @param lineDelimiter CSV delimiter between lines
-	 * @param fieldDelimiter CSV delimiter between fields
-	 */
-	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
-		result
-			.writeAsCsv(filename, lineDelimiter, fieldDelimiter)
-				.name("CSV: " + filename);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 1c9bdc5..86a61d5 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -22,9 +22,6 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.drivers.output.CSV;
-import org.apache.flink.graph.drivers.output.Hash;
-import org.apache.flink.graph.drivers.output.Print;
 import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
 import org.apache.flink.graph.drivers.parameter.LongParameter;
@@ -33,6 +30,8 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
+import java.io.PrintStream;
+
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
@@ -44,8 +43,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @see org.apache.flink.graph.library.clustering.undirected.TriadicCensus
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends SimpleDriver<K, VV, EV, PrintableResult>
-implements CSV, Hash, Print {
+extends DriverBase<K, VV, EV> {
 
 	private static final String DIRECTED = "directed";
 
@@ -64,11 +62,6 @@ implements CSV, Hash, Print {
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getShortDescription() {
 		return "list triangles";
 	}
@@ -85,7 +78,7 @@ implements CSV, Hash, Print {
 	}
 
 	@Override
-	protected DataSet<PrintableResult> simplePlan(Graph<K, VV, EV> graph) throws Exception {
+	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 
 		switch (order.getValue()) {
@@ -123,27 +116,10 @@ implements CSV, Hash, Print {
 	}
 
 	@Override
-	public void hash(String executionName) throws Exception {
-		super.hash(executionName);
-		printAnalytics();
-	}
-
-	@Override
-	public void print(String executionName) throws Exception {
-		super.print(executionName);
-		printAnalytics();
-	}
-
-	@Override
-	public void writeCSV(String filename, String lineDelimiter, String fieldDelimiter) {
-		super.writeCSV(filename, lineDelimiter, fieldDelimiter);
-		printAnalytics();
-	}
-
-	private void printAnalytics() {
+	public void printAnalytics(PrintStream out) {
 		if (computeTriadicCensus.getValue()) {
-			System.out.print("Triadic census:\n  ");
-			System.out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n "));
+			out.print("Triadic census:\n  ");
+			out.println(triadicCensus.getResult().toPrintableString().replace(";", "\n "));
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
index b3f88f6..697da97 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -24,7 +24,6 @@ import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphCsvReader;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.graph.drivers.parameter.Simplify;
 import org.apache.flink.graph.drivers.parameter.StringParameter;
 import org.apache.flink.types.IntValue;
@@ -41,8 +40,7 @@ import org.apache.commons.lang3.text.WordUtils;
  * @param <K> key type
  */
 public class CSV<K extends Comparable<K>>
-extends ParameterizedBase
-implements Input<K, NullValue, NullValue> {
+extends InputBase<K, NullValue, NullValue> {
 
 	private static final String INTEGER = "integer";
 
@@ -68,11 +66,6 @@ implements Input<K, NullValue, NullValue> {
 	private Simplify simplify = new Simplify(this);
 
 	@Override
-	public String getName() {
-		return CSV.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
index a5a2540..a9d05a2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
@@ -52,13 +52,8 @@ extends GeneratedGraph<LongValue> {
 	private List<OffsetRange> offsetRanges = new ArrayList<>();
 
 	@Override
-	public String getName() {
-		return CirculantGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getUsage() {
-		return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]]"
+		return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX + "2 ...]] "
 			+ super.getUsage();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
index dc85df4..64bae73 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -40,11 +40,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return CompleteGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
index 9ef67c3..d84cfca 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -40,11 +40,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return CycleGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
index c9b0874..5ca2f2f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
@@ -44,11 +44,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return EchoGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
index e7b5942..6feb3c8 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -36,11 +36,6 @@ extends GeneratedGraph<LongValue> {
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
 	@Override
-	public String getName() {
-		return EmptyGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
index d4467df..a0446ee 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
 import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
 import org.apache.flink.types.LongValue;
@@ -41,8 +40,7 @@ import org.apache.commons.lang3.text.WordUtils;
  * @param <K> graph ID type
  */
 public abstract class GeneratedGraph<K>
-extends ParameterizedBase
-implements Input<K, NullValue, NullValue> {
+extends InputBase<K, NullValue, NullValue> {
 
 	private static final String BYTE = "byte";
 	private static final String NATIVE_BYTE = "nativeByte";

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index a3aabd9..1b6bac1 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -48,13 +48,8 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return GridGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getUsage() {
-		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]]"
+		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]] "
 			+ super.getUsage();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
index 8d1e8b1..1be65bd 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -41,11 +41,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return HypercubeGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + dimensions + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java
new file mode 100644
index 0000000..60b5217
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/InputBase.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+/**
+ * Base class for inputs.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public abstract class InputBase<K, VV, EV>
+extends ParameterizedBase
+implements Input<K, VV, EV> {
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
index 9e02056..7f3a3e5 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -40,11 +40,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return PathGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index 3b75089..66ba888 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -83,11 +83,6 @@ extends GeneratedMultiGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return this.getClass().getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() +
 			" (s" + scale + "e" + edgeFactor + getSimplifyShortString() + ")";

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
index 65e0196..44da3f3 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -41,11 +41,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return SingletonEdgeGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexPairCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
index b37dc49..d488b59 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -40,11 +40,6 @@ extends GeneratedGraph<LongValue> {
 		.setDefaultValue(PARALLELISM_DEFAULT);
 
 	@Override
-	public String getName() {
-		return StarGraph.class.getSimpleName();
-	}
-
-	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
index 5d1faeb..9b6aae6 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/CSV.java
@@ -18,17 +18,32 @@
 
 package org.apache.flink.graph.drivers.output;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+
+import java.io.PrintStream;
+
 /**
  * Write algorithm output to file using CSV format.
+ *
+ * @param <T> result Type
  */
-public interface CSV {
-
-	/**
-	 * Write execution results to file using CSV format.
-	 *
-	 * @param filename output filename
-	 * @param lineDelimiter CSV delimiter between lines
-	 * @param fieldDelimiter CSV delimiter between fields
-	 */
-	void writeCSV(String filename, String lineDelimiter, String fieldDelimiter);
+public class CSV<T>
+extends OutputBase<T> {
+
+	private StringParameter filename = new StringParameter(this, "output_filename");
+
+	private StringParameter lineDelimiter = new StringParameter(this, "output_line_delimiter")
+		.setDefaultValue(CsvOutputFormat.DEFAULT_LINE_DELIMITER);
+
+	private StringParameter fieldDelimiter = new StringParameter(this, "output_field_delimiter")
+		.setDefaultValue(CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
+
+	@Override
+	public void write(String executionName, PrintStream out, DataSet<T> data) throws Exception {
+		data
+			.writeAsCsv(filename.getValue(), lineDelimiter.getValue(), fieldDelimiter.getValue())
+				.name("CSV: " + filename.getValue());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
index e1c399e..a853339 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Hash.java
@@ -18,16 +18,33 @@
 
 package org.apache.flink.graph.drivers.output;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+
+import java.io.PrintStream;
+
 /**
  * Print hash of algorithm output.
+ *
+ * @param <T> result Type
  */
-public interface Hash {
-
-	/**
-	 * Print hash of execution results.
-	 *
-	 * @param executionName job name
-	 * @throws Exception on error
-	 */
-	void hash(String executionName) throws Exception;
+public class Hash<T>
+extends OutputBase<T> {
+
+	private BooleanParameter printExecutionPlan = new BooleanParameter(this, "__print_execution_plan");
+
+	@Override
+	public void write(String executionName, PrintStream out, DataSet<T> data) throws Exception {
+		ChecksumHashCode<T> checksumHashCode = new ChecksumHashCode<T>().run(data);
+
+		if (printExecutionPlan.getValue()) {
+			System.out.println(data.getExecutionEnvironment().getExecutionPlan());
+		}
+
+		ChecksumHashCode.Checksum checksum = checksumHashCode
+			.execute(executionName);
+
+		out.println(checksum);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java
new file mode 100644
index 0000000..710adce
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Output.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.drivers.parameter.Parameterized;
+
+import java.io.PrintStream;
+
+/**
+ * Output writer for a {@link GraphAlgorithm} result.
+ */
+public interface Output<T>
+extends Parameterized {
+
+	/**
+	 * Write the output {@link DataSet}.
+	 *
+	 * @param executionName job name
+	 * @param out output printer
+	 * @param data the output
+	 */
+	void write(String executionName, PrintStream out, DataSet<T> data) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java
new file mode 100644
index 0000000..cbeff30
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/OutputBase.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.drivers.output;
+
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+
+/**
+ * Base class for outputs.
+ *
+ * @param <T> result Type
+ */
+public abstract class OutputBase<T>
+extends ParameterizedBase
+implements Output<T> {
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
index be421b0..623fed4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/output/Print.java
@@ -18,16 +18,46 @@
 
 package org.apache.flink.graph.drivers.output;
 
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
+
+import java.io.PrintStream;
+import java.util.List;
+
 /**
  * Print algorithm output.
+ *
+ * @param <T> result Type
  */
-public interface Print {
-
-	/**
-	 * Print execution results.
-	 *
-	 * @param executionName job name
-	 * @throws Exception on error
-	 */
-	void print(String executionName) throws Exception;
+public class Print<T>
+extends OutputBase<T> {
+
+	private BooleanParameter printExecutionPlan = new BooleanParameter(this, "__print_execution_plan");
+
+	@Override
+	public void write(String executionName, PrintStream out, DataSet<T> data) throws Exception {
+		Collect<T> collector = new Collect<T>().run(data);
+
+		if (printExecutionPlan.getValue()) {
+			System.out.println(data.getExecutionEnvironment().getExecutionPlan());
+		}
+
+		List<T> results = collector.execute(executionName);
+
+		if (results.size() == 0) {
+			return;
+		}
+
+		if (results.get(0) instanceof PrintableResult) {
+			for (Object result : results) {
+				System.out.println(((PrintableResult) result).toPrintableString());
+			}
+		} else {
+			for (Object result : results) {
+				System.out.println(result);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
index f02c536..6730987 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/IterationConvergence.java
@@ -53,7 +53,12 @@ implements Parameter<Value> {
 
 	@Override
 	public String getUsage() {
-		return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD]";
+		return "[--iterations ITERATIONS] [--convergence_threshold CONVERGENCE_THRESHOLD] ";
+	}
+
+	@Override
+	public boolean isHidden() {
+		return false;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java
index 9dbac4b..a0fc0cf 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Parameter.java
@@ -40,6 +40,15 @@ public interface Parameter<T> {
 	String getUsage();
 
 	/**
+	 * A hidden parameter is parsed from the command-line configuration but is
+	 * not printed in the usage string. This can be used for power-user options
+	 * not displayed to the general user.
+	 *
+	 * @return whether this parameter should be hidden from standard usage
+	 */
+	boolean isHidden();
+
+	/**
 	 * Read and parse the parameter value from command-line arguments.
 	 *
 	 * @param parameterTool parameter parser

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
index a3991cf..ba6f6af 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ParameterizedBase.java
@@ -51,9 +51,11 @@ implements Parameterized {
 
 		// print parameters as ordered list
 		for (Parameter<?> parameter : parameters) {
-			strBuilder
-				.append(parameter.getUsage())
-				.append(" ");
+			if (!parameter.isHidden()) {
+				strBuilder
+					.append(parameter.getUsage())
+					.append(" ");
+			}
 		}
 
 		return strBuilder.toString();

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java
index 93469ac..170e691 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/SimpleParameter.java
@@ -66,6 +66,11 @@ implements Parameter<T> {
 	}
 
 	@Override
+	public boolean isHidden() {
+		return name.startsWith("__");
+	}
+
+	@Override
 	public T getValue() {
 		return value;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
index 4d9e481..9fc937c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/Simplify.java
@@ -67,7 +67,12 @@ implements Parameter<Ordering> {
 
 	@Override
 	public String getUsage() {
-		return "[--simplify <directed | undirected [--clip_and_flip]>]";
+		return "[--simplify <directed | undirected [--clip_and_flip]>] ";
+	}
+
+	@Override
+	public boolean isHidden() {
+		return false;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/f8842f69/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
index 15f7293..a56a19b 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -60,16 +60,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x0000000000344448L;
+				checksum = 0x000000000001ae80L;
 				break;
 
 			case "long":
-				checksum = 0x0000000000a19d48L;
+				checksum = 0x0000000000053580L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x000000000c47ca48L;
+				checksum = 0x0000000000656880L;
 				break;
 
 			default:
@@ -114,16 +114,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x000000000217bbe2L;
+				checksum = 0x0000000000113ca0L;
 				break;
 
 			case "long":
-				checksum = 0x0000000006788c22L;
+				checksum = 0x0000000000356460L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x000000007ddfd962L;
+				checksum = 0x00000000040f6f20L;
 				break;
 
 			default:
@@ -158,16 +158,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000001a2224L;
+				checksum = 0x000000000000d740L;
 				break;
 
 			case "long":
-				checksum = 0x000000000050cea4L;
+				checksum = 0x0000000000029ac0L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x000000000623e524L;
+				checksum = 0x000000000032b440L;
 				break;
 
 			default:
@@ -202,16 +202,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x0000000000a9ddeaL;
+				checksum = 0x0000000000057720L;
 				break;
 
 			case "long":
-				checksum = 0x00000000020d3f2aL;
+				checksum = 0x000000000010ede0L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x0000000027e9516aL;
+				checksum = 0x00000000014993a0L;
 				break;
 
 			default:
@@ -253,16 +253,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000001ca34aL;
+				checksum = 0x000000000000eba0L;
 				break;
 
 			case "long":
-				checksum = 0x000000000071408aL;
+				checksum = 0x000000000003a660L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x00000000081ee80aL;
+				checksum = 0x0000000000430ee0L;
 				break;
 
 			default:
@@ -297,16 +297,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000035df180L;
+				checksum = 0x00000000001bc800L;
 				break;
 
 			case "long":
-				checksum = 0x0000000005a52180L;
+				checksum = 0x00000000002e9800L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x0000000273474480L;
+				checksum = 0x00000000143c1500L;
 				break;
 
 			default:
@@ -341,16 +341,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000001982daL;
+				checksum = 0x000000000000d220L;
 				break;
 
 			case "long":
-				checksum = 0x00000000004ee21aL;
+				checksum = 0x0000000000028ae0L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x00000000060a065aL;
+				checksum = 0x000000000031dea0L;
 				break;
 
 			default:
@@ -385,16 +385,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x0000000003bf67f7L;
+				checksum = 0x00000000001ee529L;
 				break;
 
 			case "long":
-				checksum = 0x0000000008f467f7L;
+				checksum = 0x000000000049e529L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x00000001660861bdL;
+				checksum = 0x000000000b8c9aa3L;
 				break;
 
 			default:
@@ -429,16 +429,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000029aafb3L;
+				checksum = 0x00000000001579bdL;
 				break;
 
 			case "long":
-				checksum = 0x000000000592e9b3L;
+				checksum = 0x00000000002dffbdL;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x000000011b079691L;
+				checksum = 0x0000000009213f1fL;
 				break;
 
 			default:
@@ -473,16 +473,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x0000000004627ab6L;
+				checksum = 0x0000000000242920L;
 				break;
 
 			case "long":
-				checksum = 0x0000000009193576L;
+				checksum = 0x00000000004b1660L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x00000001e9adcf56L;
+				checksum = 0x000000000fcbc080L;
 				break;
 
 			default:
@@ -517,16 +517,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x000000000034d5a4L;
+				checksum = 0x000000000001b3c0L;
 				break;
 
 			case "long":
-				checksum = 0x00000000006b8224L;
+				checksum = 0x0000000000037740L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x000000000757c6a4L;
+				checksum = 0x00000000003ca2c0L;
 				break;
 
 			default:
@@ -561,16 +561,16 @@ extends DriverBaseITCase {
 			case "integer":
 			case "nativeInteger":
 			case "nativeLong":
-				checksum = 0x00000000000d195aL;
+				checksum = 0x0000000000006ba0L;
 				break;
 
 			case "long":
-				checksum = 0x000000000042789aL;
+				checksum = 0x0000000000022460L;
 				break;
 
 			case "string":
 			case "nativeString":
-				checksum = 0x00000000032f0adaL;
+				checksum = 0x00000000001a4a20L;
 				break;
 
 			default:


Mime
View raw message