flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [2/2] flink git commit: [FLINK-4204] [gelly] Clean up gelly-examples
Date Mon, 24 Oct 2016 16:32:47 GMT
[FLINK-4204] [gelly] Clean up gelly-examples

Moves drivers into separate package. Adds default main class to print
usage listing included classes. Includes documentation for running
Gelly examples.

This closes #2670


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c4f9f0d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c4f9f0d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c4f9f0d7

Branch: refs/heads/master
Commit: c4f9f0d78d65a7ce7290820ae9fb2919c9116e57
Parents: c4783c8
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Aug 24 11:32:43 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Mon Oct 24 12:14:05 2016 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/index.md                    |  61 +++-
 flink-libraries/flink-gelly-examples/pom.xml    |  13 +
 .../main/java/org/apache/flink/graph/Usage.java |  60 ++++
 .../apache/flink/graph/driver/GraphMetrics.java | 232 -------------
 .../graph/drivers/ClusteringCoefficient.java    | 343 +++++++++++++++++++
 .../apache/flink/graph/drivers/Graph500.java    | 141 ++++++++
 .../flink/graph/drivers/GraphMetrics.java       | 234 +++++++++++++
 .../org/apache/flink/graph/drivers/HITS.java    | 187 ++++++++++
 .../flink/graph/drivers/JaccardIndex.java       | 222 ++++++++++++
 .../flink/graph/drivers/TriangleListing.java    | 251 ++++++++++++++
 .../graph/examples/ClusteringCoefficient.java   | 326 ------------------
 .../apache/flink/graph/examples/Graph500.java   | 129 -------
 .../flink/graph/examples/GraphMetrics.java      | 171 ---------
 .../org/apache/flink/graph/examples/HITS.java   | 185 ----------
 .../flink/graph/examples/JaccardIndex.java      | 208 -----------
 .../flink/graph/examples/TriangleListing.java   | 230 -------------
 .../graph/examples/utils/ExampleUtils.java      | 162 ---------
 .../graph/scala/examples/GraphMetrics.scala     | 129 -------
 .../library/metric/directed/EdgeMetrics.java    |  10 +
 .../library/metric/directed/VertexMetrics.java  |  10 +
 .../library/metric/undirected/EdgeMetrics.java  |  10 +
 .../metric/undirected/VertexMetrics.java        |  10 +
 22 files changed, 1551 insertions(+), 1773 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/docs/dev/libs/gelly/index.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/index.md b/docs/dev/libs/gelly/index.md
index 2eeec2c..db7073f 100644
--- a/docs/dev/libs/gelly/index.md
+++ b/docs/dev/libs/gelly/index.md
@@ -64,6 +64,65 @@ Add the following dependency to your `pom.xml` to use Gelly.
 
 Note that Gelly is currently not part of the binary distribution. See linking with it for cluster execution [here]({{ site.baseurl }}/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution).
 
-The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API. After reading this guide, you might also want to check the {% gh_link /flink-libraries/flink-gelly-examples/ "Gelly examples" %}.
+The remaining sections provide a description of available methods and present several examples of how to use Gelly and how to mix it with the Flink DataSet API.
+
+Running Gelly Examples
+----------------------
+
+The Gelly library and examples jars are provided in the [Flink distribution](https://flink.apache.org/downloads.html "Apache Flink: Downloads")
+in the folder **opt/lib/gelly** (for versions older than Flink 1.2 these can be manually downloaded from
+[Maven Central](http://search.maven.org/#search|ga|1|flink%20gelly).
+
+To run the Gelly examples the **flink-gelly** (for Java) or **flink-gelly-scala** (for Scala) jar must be copied to
+Flink's **lib** directory.
+
+~~~bash
+cp opt/lib/gelly/flink-gelly_*.jar lib/
+cp opt/lib/gelly/flink-gelly-scala_*.jar lib/
+~~~
+
+Gelly's examples jar includes both drivers for the library methods as well as additional example algorithms. After
+configuring and starting the cluster, list the available algorithm classes:
+
+~~~bash
+./bin/start-cluster.sh
+./bin/flink run opt/lib/gelly/flink-gelly-examples_*.jar
+~~~
+
+The Gelly drivers can generate [RMat](http://www.cs.cmu.edu/~christos/PUBLICATIONS/siam04.pdf) graph data or read the
+edge list from a CSV file. Each node in a cluster must have access to the input file. Calculate graph metrics on a
+directed generated graph:
+
+~~~bash
+./bin/flink run -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input rmat
+~~~
+
+The size of the graph is adjusted by the *\-\-scale* and *\-\-edge_factor* parameters. The
+[library generator](./graph_generators.html#rmat-graph) provides access to additional configuration to adjust the
+power-law skew and random noise.
+
+Sample social network data is provided by the [Stanford Network Analysis Project](http://snap.stanford.edu/data/index.html).
+The [com-lj](http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz) data set is a good starter size.
+Run a few algorithms and monitor the job progress in Flink's Web UI:
+
+~~~bash
+wget -O - http://snap.stanford.edu/data/bigdata/communities/com-lj.ungraph.txt.gz | gunzip -c > com-lj.ungraph.txt
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.GraphMetrics opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt --input_field_delimiter '\t'
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.ClusteringCoefficient opt/lib/gelly/flink-gelly-examples_*.jar \
+    --directed true --input csv --type integer --input_filename com-lj.ungraph.txt  --input_field_delimiter '\t' \
+    --output hash
+
+./bin/flink run -q -c org.apache.flink.graph.drivers.JaccardIndex opt/lib/gelly/flink-gelly-examples_*.jar \
+    --input csv --type integer --simplify true --input_filename com-lj.ungraph.txt --input_field_delimiter '\t' \
+    --output hash
+~~~
+
+Please submit feature requests and report issues on the user [mailing list](https://flink.apache.org/community.html#mailing-lists)
+or [Flink Jira](https://issues.apache.org/jira/browse/FLINK). We welcome suggestions for new algorithms and features as
+well as [code contributions](https://flink.apache.org/contribute-code.html).
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/pom.xml
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/pom.xml b/flink-libraries/flink-gelly-examples/pom.xml
index 80da0ff..9b90b04 100644
--- a/flink-libraries/flink-gelly-examples/pom.xml
+++ b/flink-libraries/flink-gelly-examples/pom.xml
@@ -143,6 +143,19 @@
 				</configuration>
 			</plugin>
 
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-jar-plugin</artifactId>
+				<version>2.5</version>
+				<configuration>
+					<archive>
+						<manifestEntries>
+							<Main-Class>org.apache.flink.graph.Usage</Main-Class>
+						</manifestEntries>
+					</archive>
+				</configuration>
+			</plugin>
+
 			<!-- Adding scala source directories to build path -->
 			<plugin>
 				<groupId>org.codehaus.mojo</groupId>

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
new file mode 100644
index 0000000..9d8f116
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Usage.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * This default main class prints usage listing available classes.
+ */
+public class Usage {
+
+	private static final Class[] DRIVERS = new Class[]{
+		org.apache.flink.graph.drivers.ClusteringCoefficient.class,
+		org.apache.flink.graph.drivers.Graph500.class,
+		org.apache.flink.graph.drivers.GraphMetrics.class,
+		org.apache.flink.graph.drivers.HITS.class,
+		org.apache.flink.graph.drivers.JaccardIndex.class,
+		org.apache.flink.graph.drivers.TriangleListing.class,
+	};
+
+	private static final Class[] EXAMPLES = new Class[]{
+		org.apache.flink.graph.examples.ConnectedComponents.class,
+		org.apache.flink.graph.examples.EuclideanGraphWeighing.class,
+		org.apache.flink.graph.examples.GSASingleSourceShortestPaths.class,
+		org.apache.flink.graph.examples.IncrementalSSSP.class,
+		org.apache.flink.graph.examples.MusicProfiles.class,
+		org.apache.flink.graph.examples.PregelSSSP.class,
+		org.apache.flink.graph.examples.SingleSourceShortestPaths.class,
+		org.apache.flink.graph.scala.examples.ConnectedComponents.class,
+		org.apache.flink.graph.scala.examples.GSASingleSourceShortestPaths.class,
+		org.apache.flink.graph.scala.examples.SingleSourceShortestPaths.class,
+	};
+
+	public static void main(String[] args) throws Exception {
+		System.out.println("Driver classes call algorithms from the Gelly library:");
+		for (Class cls : DRIVERS) {
+			System.out.println("  " + cls.getName());
+		}
+
+		System.out.println("");
+		System.out.println("Example classes illustrate Gelly APIs or alternative algorithms:");
+		for (Class cls : EXAMPLES) {
+			System.out.println("  " + cls.getName());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
deleted file mode 100644
index 79c5f80..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.driver;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Computes vertex and edge metrics on a directed or undirected graph.
- *
- * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
- * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
- * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
- * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
- */
-public class GraphMetrics {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80));
-		System.out.println();
-		System.out.println("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		GraphAnalytic vm;
-		GraphAnalytic em;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
-						} else {
-							if (parameters.getBoolean("simplify", false)) {
-								graph = graph
-									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
-							}
-
-							vm = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
-							em = graph
-								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
-
-						vm = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
-						em = newGraph
-							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
-					}
-				}
-				} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		env.execute("Graph Metrics");
-
-		System.out.print("Vertex metrics:\n  ");
-		System.out.println(vm.getResult().toString().replace(";", "\n "));
-		System.out.print("\nEdge metrics:\n  ");
-		System.out.println(em.getResult().toString().replace(";", "\n "));
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
new file mode 100644
index 0000000..18b0406
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for the library implementations of Global and Local Clustering Coefficient.
+ *
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then calculates
+ * the local clustering coefficient for each vertex and the global clustering
+ * coefficient for the graph.
+ *
+ * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
+ * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
+ */
+public class ClusteringCoefficient {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
+				" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
+				" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
+				" is a clique).", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
+				" the vertex, and the number of edges between vertex neighbors.", 80))
+			.appendNewLine()
+			.appendln("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
+		// global and local clustering coefficient results
+		GraphAnalytic gcc;
+		GraphAnalytic acc;
+		DataSet lcc;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							gcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+							lcc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+			} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.setParallelism(little_parallelism)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+								.setParallelism(little_parallelism))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+								.setParallelism(little_parallelism));
+
+						gcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+						lcc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false)
+								.setLittleParallelism(little_parallelism));
+					}
+				}
+			} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				if (directedAlgorithm) {
+					for (Object e: lcc.collect()) {
+						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
+							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
+						System.out.println(result.toVerboseString());
+					}
+				} else {
+					for (Object e: lcc.collect()) {
+						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
+							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
+						System.out.println(result.toVerboseString());
+					}
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(lcc));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute("Clustering Coefficient");
+				break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		System.out.println(gcc.getResult());
+		System.out.println(acc.getResult());
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
new file mode 100644
index 0000000..8f9a54a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/Graph500.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Generate an RMat graph for Graph 500.
+ *
+ * Note that this does not yet implement permutation of vertex labels or edges.
+ *
+ * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
+ */
+public class Graph500 {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_SIMPLIFY = false;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln("A Graph500 generator using the Recursive Matrix (RMat) graph generator.")
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The graph matrix contains 2^scale vertices although not every vertex will" +
+				" be represented in an edge. The number of edges is edge_factor * 2^scale edges" +
+				" although some edges may be duplicates.", 80))
+			.appendNewLine()
+			.appendln("Note: this does not yet implement permutation of vertex labels or edges.")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		// Generate RMat graph
+		int scale = parameters.getInt("scale", DEFAULT_SCALE);
+		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		long vertexCount = 1L << scale;
+		long edgeCount = vertexCount * edgeFactor;
+
+		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
+		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.generate();
+
+		if (simplify) {
+			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		}
+
+		DataSet<Tuple2<LongValue,LongValue>> edges = graph
+			.getEdges()
+			.project(0, 1);
+
+		// Print, hash, or write RMat graph to disk
+		switch (parameters.get("output", "")) {
+		case "print":
+			edges.print();
+			break;
+
+		case "hash":
+			System.out.println(DataSetUtils.checksumHashCode(edges));
+			break;
+
+		case "csv":
+			String filename = parameters.get("filename");
+
+			String lineDelimiter = StringEscapeUtils.unescapeJava(
+				parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+			String fieldDelimiter = StringEscapeUtils.unescapeJava(
+				parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+			edges.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+			env.execute("Graph500");
+			break;
+		default:
+			throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
new file mode 100644
index 0000000..4fb11c3
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Computes vertex and edge metrics on a directed or undirected graph.
+ *
+ * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
+ */
+public class GraphMetrics {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80))
+			.appendNewLine()
+			.appendln("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		GraphAnalytic vm;
+		GraphAnalytic em;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		env.execute("Graph Metrics");
+
+		System.out.print("Vertex metrics:\n  ");
+		System.out.println(vm.getResult().toString().replace(";", "\n "));
+		System.out.print("\nEdge metrics:\n  ");
+		System.out.println(em.getResult().toString().replace(";", "\n "));
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
new file mode 100644
index 0000000..e0a233a
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.directed.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.link_analysis.HITS.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of HITS (Hubs and Authorities).
+ *
+ * This example reads a simple, undirected graph from a CSV file or generates
+ * an undirected RMat graph with the given scale and edge factor then calculates
+ * hub and authority scores for each vertex.
+ *
+ * @see org.apache.flink.graph.library.link_analysis.HITS
+ */
+public class HITS {
+
+	private static final int DEFAULT_ITERATIONS = 10;
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
+				" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
+				" and good \"authorities\" are linked from good \"hubs\".", 80))
+			.appendNewLine()
+			.appendln("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
+
+		DataSet hits;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						hits = reader
+							.keyType(LongValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
+					} break;
+
+					case "string": {
+						hits = reader
+							.keyType(StringValue.class)
+							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (scale > 32) {
+					hits = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>())
+						.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
+				} else {
+					hits = graph
+						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+						.run(new Simplify<IntValue, NullValue, NullValue>())
+						.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				for (Object e: hits.collect()) {
+					System.out.println(((Result)e).toVerboseString());
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(hits));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute();
+				break;
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
new file mode 100644
index 0000000..5c173e0
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -0,0 +1,222 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Driver for the library implementation of Jaccard Index.
+ *
+ * This example reads a simple, undirected graph from a CSV file or generates
+ * an undirected RMat graph with the given scale and edge factor then calculates
+ * all non-zero Jaccard Index similarity scores between vertices.
+ *
+ * @see org.apache.flink.graph.library.similarity.JaccardIndex
+ */
+public class JaccardIndex {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
+				" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
+				" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
+				" shared).", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
+				" number of shared neighbors, and the number of distinct neighbors.", 80))
+			.appendNewLine()
+			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.appendNewLine()
+			.appendln("Usage error: " + message)
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+
+		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+
+		DataSet ji;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (parameters.getBoolean("simplify", false)) {
+							graph = graph
+								.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+						}
+
+						ji = graph
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (parameters.getBoolean("simplify", false)) {
+							graph = graph
+								.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+						}
+
+						ji = graph
+							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.setParallelism(little_parallelism)
+					.generate();
+
+				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+				if (scale > 32) {
+					ji = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
+							.setParallelism(little_parallelism))
+						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
+							.setLittleParallelism(little_parallelism));
+				} else {
+					ji = graph
+						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
+							.setParallelism(little_parallelism))
+						.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
+							.setParallelism(little_parallelism))
+						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
+							.setLittleParallelism(little_parallelism));
+				}
+				} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				for (Object e: ji.collect()) {
+					Result result = (Result)e;
+					System.out.println(result.toVerboseString());
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(ji));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute("Jaccard Index");
+				break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
new file mode 100644
index 0000000..954f732
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.StrBuilder;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Driver for the library implementation of Triangle Listing.
+ *
+ * This example reads a simple directed or undirected graph from a CSV file or
+ * generates an RMat graph with the given scale and edge factor then lists
+ * all triangles.
+ *
+ * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
+ * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
+ */
+public class TriangleListing {
+
+	private static final int DEFAULT_SCALE = 10;
+
+	private static final int DEFAULT_EDGE_FACTOR = 16;
+
+	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static String getUsage(String message) {
+		return new StrBuilder()
+			.appendNewLine()
+			.appendln(WordUtils.wrap("Lists all triangles in a graph.", 80))
+			.appendNewLine()
+			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
+				" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80))
+			.appendNewLine()
+			.appendln("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendNewLine()
+			.appendln("options:")
+			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
+			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
+			.appendNewLine()
+			.appendln("  --output print")
+			.appendln("  --output hash")
+			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
+			.toString();
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		if (! parameters.has("directed")) {
+			throw new ProgramParametrizationException(getUsage("must declare execution mode as '--directed true' or '--directed false'"));
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		DataSet tl;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							tl = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
+				}
+
+
+			} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						tl = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
+					} else {
+						tl = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					graph = graph
+						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+
+					if (scale > 32) {
+						tl = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
+					} else {
+						tl = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
+							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
+					}
+				}
+			} break;
+
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid input type"));
+		}
+
+		switch (parameters.get("output", "")) {
+			case "print":
+				if (directedAlgorithm) {
+					for (Object e: tl.collect()) {
+						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
+							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
+						System.out.println(result.toVerboseString());
+					}
+				} else {
+					tl.print();
+				}
+				break;
+
+			case "hash":
+				System.out.println(DataSetUtils.checksumHashCode(tl));
+				break;
+
+			case "csv":
+				String filename = parameters.get("output_filename");
+
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
+
+				env.execute();
+				break;
+			default:
+				throw new ProgramParametrizationException(getUsage("invalid output type"));
+		}
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}


Mime
View raw message