flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] flink git commit: [FLINK-4204] [gelly] Clean up gelly-examples
Date Mon, 24 Oct 2016 16:32:46 GMT
Repository: flink
Updated Branches:
  refs/heads/master c4783c856 -> c4f9f0d78


http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
deleted file mode 100644
index 615d765..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAnalytic;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementations of Global and Local Clustering Coefficient.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then calculates
- * the local clustering coefficient for each vertex and the global clustering
- * coefficient for the graph.
- *
- * @see org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient
- * @see org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient
- */
-public class ClusteringCoefficient {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("The local clustering coefficient measures the connectedness of each" +
-			" vertex's neighborhood and the global clustering coefficient measures the connectedness of the graph." +
-			" Scores range from 0.0 (no edges between neighbors or vertices) to 1.0 (neighborhood or graph" +
-			" is a clique).", 80));
-		System.out.println();
-		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
-			" the vertex, and the number of edges between vertex neighbors.", 80));
-		System.out.println();
-		System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		// global and local clustering coefficient results
-		GraphAnalytic gcc;
-		GraphAnalytic acc;
-		DataSet lcc;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						} else {
-							gcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							acc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-							lcc = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
-									.setLittleParallelism(little_parallelism));
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					if (scale > 32) {
-						Graph<LongValue, NullValue, NullValue> newGraph = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					} else {
-						Graph<IntValue, NullValue, NullValue> newGraph = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism));
-
-						gcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						acc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
-								.setIncludeZeroDegreeVertices(false)
-								.setLittleParallelism(little_parallelism));
-					}
-				}
-			} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				if (directedAlgorithm) {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
-					}
-				} else {
-					for (Object e: lcc.collect()) {
-						org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result result =
-							(org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result)e;
-						System.out.println(result.toVerboseString());
-					}
-				}
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(lcc));
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute("Clustering Coefficient");
-
-				System.out.println(gcc.getResult());
-				System.out.println(acc.getResult());
-				break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
deleted file mode 100644
index 73bba2c..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-import java.text.NumberFormat;
-
-/**
- * Generate an RMat graph for Graph 500.
- *
- * Note that this does not yet implement permutation of vertex labels or edges.
- *
- * @see <a href="http://www.graph500.org/specifications">Graph 500</a>
- */
-public class Graph500 {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_SIMPLIFY = false;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		// Generate RMat graph
-		int scale = parameters.getInt("scale", DEFAULT_SCALE);
-		int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-		long vertexCount = 1L << scale;
-		long edgeCount = vertexCount * edgeFactor;
-
-		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
-		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.generate();
-
-		if (simplify) {
-			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-		}
-
-		DataSet<Tuple2<LongValue,LongValue>> edges = graph
-			.getEdges()
-			.project(0, 1);
-
-		// Print, hash, or write RMat graph to disk
-		switch (parameters.get("output", "")) {
-		case "print":
-			edges.print();
-			break;
-
-		case "hash":
-			System.out.println(DataSetUtils.checksumHashCode(edges));
-			break;
-
-		case "csv":
-			String filename = parameters.get("filename");
-
-			String row_delimiter = parameters.get("row_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER);
-			String field_delimiter = parameters.get("field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER);
-
-			edges.writeAsCsv(filename, row_delimiter, field_delimiter);
-
-			env.execute();
-			break;
-		default:
-			System.out.println("A Graph500 generator using the Recursive Matrix (RMat) graph generator.");
-			System.out.println();
-			System.out.println("The graph matrix contains 2^scale vertices although not every vertex will");
-			System.out.println("be represented in an edge. The number of edges is edge_factor * 2^scale edges");
-			System.out.println("although some edges may be duplicates.");
-			System.out.println();
-			System.out.println("Note: this does not yet implement permutation of vertex labels or edges.");
-			System.out.println();
-			System.out.println("usage:");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output print");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output hash");
-			System.out.println("  Graph500 [--scale SCALE] [--edge_factor EDGE_FACTOR] --output csv" +
-					" --filename FILENAME [--row_delimiter ROW_DELIMITER] [--field_delimiter FIELD_DELIMITER]");
-
-			return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
deleted file mode 100644
index e7b47bf..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.aggregation.Aggregations;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.utils.ExampleUtils;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * "&lt;sourceVertexID&gt;\t&lt;targetVertexID&gt;".
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-public class GraphMetrics implements ProgramDescription {
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		/** create the graph **/
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(getEdgesDataSet(env), env);
-		
-		/** get the number of vertices **/
-		long numVertices = graph.numberOfVertices();
-		
-		/** get the number of edges **/
-		long numEdges = graph.numberOfEdges();
-		
-		/** compute the average node degree **/
-		DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();
-
-		DataSet<Double> avgNodeDegree = verticesWithDegrees
-				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
-		
-		/** find the vertex with the maximum in-degree **/
-		DataSet<Long> maxInDegreeVertex = graph.inDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum in-degree **/
-		DataSet<Long> minInDegreeVertex = graph.inDegrees().minBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the maximum out-degree **/
-		DataSet<Long> maxOutDegreeVertex = graph.outDegrees().maxBy(1).map(new ProjectVertexId());
-
-		/** find the vertex with the minimum out-degree **/
-		DataSet<Long> minOutDegreeVertex = graph.outDegrees().minBy(1).map(new ProjectVertexId());
-		
-		/** print the results **/
-		ExampleUtils.printResult(env.fromElements(numVertices), "Total number of vertices");
-		ExampleUtils.printResult(env.fromElements(numEdges), "Total number of edges");
-		ExampleUtils.printResult(avgNodeDegree, "Average node degree");
-		ExampleUtils.printResult(maxInDegreeVertex, "Vertex with Max in-degree");
-		ExampleUtils.printResult(minInDegreeVertex, "Vertex with Min in-degree");
-		ExampleUtils.printResult(maxOutDegreeVertex, "Vertex with Max out-degree");
-		ExampleUtils.printResult(minOutDegreeVertex, "Vertex with Min out-degree");
-
-		env.execute();
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {
-
-		private long numberOfVertices;
-
-		public AvgNodeDegreeMapper(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		public Double map(Tuple2<Long, LongValue> sumTuple) {
-			return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
-		public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
-	}
-
-	@Override
-	public String getDescription() {
-		return "Graph Metrics Example";
-	}
-
-	// ******************************************************************************************************************
-	// UTIL METHODS
-	// ******************************************************************************************************************
-
-	private static boolean fileOutput = false;
-
-	private static String edgesInputPath = null;
-
-	static final int NUM_VERTICES = 100;
-
-	static final long SEED = 9876;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			if(args.length != 1) {
-				System.err.println("Usage: GraphMetrics <input edges>");
-				return false;
-			}
-
-			fileOutput = true;
-			edgesInputPath = args[0];
-		} else {
-			System.out.println("Executing Graph Metrics example with default parameters and built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("Usage: GraphMetrics <input edges>");
-		}
-		return true;
-	}
-
-	@SuppressWarnings("serial")
-	private static DataSet<Edge<Long, NullValue>> getEdgesDataSet(ExecutionEnvironment env) {
-		if (fileOutput) {
-			return env.readCsvFile(edgesInputPath)
-					.lineDelimiter("\n").fieldDelimiter("\t")
-					.types(Long.class, Long.class).map(
-							new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-
-								public Edge<Long, NullValue> map(Tuple2<Long, Long> value) {
-									return new Edge<Long, NullValue>(value.f0, value.f1, 
-											NullValue.getInstance());
-								}
-					});
-		} else {
-			return ExampleUtils.getRandomEdges(env, NUM_VERTICES);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
deleted file mode 100644
index f70d5dc..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.directed.Simplify;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of HITS (Hubs and Authorities).
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * hub and authority scores for each vertex.
- *
- * @see org.apache.flink.graph.library.link_analysis.HITS
- */
-public class HITS {
-
-	public static final int DEFAULT_ITERATIONS = 10;
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Hyperlink-Induced Topic Search computes two interdependent" +
-			" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
-			" and good \"authorities\" are linked from good \"hubs\".", 80));
-		System.out.println();
-		System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		int iterations = parameters.getInt("iterations", DEFAULT_ITERATIONS);
-
-		DataSet hits;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						hits = reader
-							.keyType(LongValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-					} break;
-
-					case "string": {
-						hits = reader
-							.keyType(StringValue.class)
-							.run(new org.apache.flink.graph.library.link_analysis.HITS<StringValue, NullValue, NullValue>(iterations));
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (scale > 32) {
-					hits = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<LongValue, NullValue, NullValue>(iterations));
-				} else {
-					hits = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-						.run(new Simplify<IntValue, NullValue, NullValue>())
-						.run(new org.apache.flink.graph.library.link_analysis.HITS<IntValue, NullValue, NullValue>(iterations));
-				}
-				} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				for (Object e: hits.collect()) {
-					System.out.println(((Result)e).toVerboseString());
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(hits));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				hits.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
deleted file mode 100644
index 2845e2d..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ /dev/null
@@ -1,208 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.StrBuilder;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.client.program.ProgramParametrizationException;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Driver for the library implementation of Jaccard Index.
- *
- * This example reads a simple, undirected graph from a CSV file or generates
- * an undirected RMat graph with the given scale and edge factor then calculates
- * all non-zero Jaccard Index similarity scores between vertices.
- *
- * @see org.apache.flink.graph.library.similarity.JaccardIndex
- */
-public class JaccardIndex {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static String getUsage(String message) {
-		return new StrBuilder()
-			.appendNewLine()
-			.appendln(WordUtils.wrap("The Jaccard Index measures the similarity between vertex" +
-				" neighborhoods and is computed as the number of shared neighbors divided by the number of" +
-				" distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all neighbors are" +
-				" shared).", 80))
-			.appendNewLine()
-			.appendln(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
-				" number of shared neighbors, and the number of distinct neighbors.", 80))
-			.appendNewLine()
-			.appendln("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>")
-			.appendNewLine()
-			.appendln("options:")
-			.appendln("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
-			.appendln("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]")
-			.appendNewLine()
-			.appendln("  --output print")
-			.appendln("  --output hash")
-			.appendln("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]")
-			.appendNewLine()
-			.appendln("Usage error: " + message)
-			.toString();
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-
-		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
-
-		DataSet ji;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						ji = reader
-							.keyType(LongValue.class)
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					case "string": {
-						ji = reader
-							.keyType(StringValue.class)
-							.run(new org.apache.flink.graph.library.similarity.JaccardIndex<StringValue, NullValue, NullValue>()
-								.setLittleParallelism(little_parallelism));
-					} break;
-
-					default:
-						throw new ProgramParametrizationException(getUsage("invalid CSV type"));
-				}
-				} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setParallelism(little_parallelism)
-					.generate();
-
-				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-				if (scale > 32) {
-					ji = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<LongValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				} else {
-					ji = graph
-						.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-							.setParallelism(little_parallelism))
-						.run(new Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-							.setParallelism(little_parallelism))
-						.run(new org.apache.flink.graph.library.similarity.JaccardIndex<IntValue, NullValue, NullValue>()
-							.setLittleParallelism(little_parallelism));
-				}
-				} break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid input type"));
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				for (Object e: ji.collect()) {
-					Result result = (Result)e;
-					System.out.println(result.toVerboseString());
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(ji));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				ji.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-
-			default:
-				throw new ProgramParametrizationException(getUsage("invalid output type"));
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
deleted file mode 100644
index 43c5eba..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ /dev/null
@@ -1,230 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples;
-
-import org.apache.commons.lang3.StringEscapeUtils;
-import org.apache.commons.lang3.text.WordUtils;
-import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.CsvOutputFormat;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
-import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
-import org.apache.flink.graph.asm.translate.TranslateGraphIds;
-import org.apache.flink.graph.generator.RMatGraph;
-import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
-import org.apache.flink.graph.generator.random.RandomGenerableFactory;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.types.StringValue;
-
-import java.text.NumberFormat;
-
-/**
- * Driver for the library implementation of Triangle Listing.
- *
- * This example reads a simple directed or undirected graph from a CSV file or
- * generates an RMat graph with the given scale and edge factor then lists
- * all triangles.
- *
- * @see org.apache.flink.graph.library.clustering.directed.TriangleListing
- * @see org.apache.flink.graph.library.clustering.undirected.TriangleListing
- */
-public class TriangleListing {
-
-	public static final int DEFAULT_SCALE = 10;
-
-	public static final int DEFAULT_EDGE_FACTOR = 16;
-
-	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
-
-	private static void printUsage() {
-		System.out.println(WordUtils.wrap("Lists all triangles in a graph.", 80));
-		System.out.println();
-		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
-			" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80));
-		System.out.println();
-		System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
-		System.out.println();
-		System.out.println("options:");
-		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
-		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
-		System.out.println();
-		System.out.println("  --output print");
-		System.out.println("  --output hash");
-		System.out.println("  --output csv --output_filename FILENAME [--output_line_delimiter LINE_DELIMITER] [--output_field_delimiter FIELD_DELIMITER]");
-	}
-
-	public static void main(String[] args) throws Exception {
-		// Set up the execution environment
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-		env.getConfig().enableObjectReuse();
-
-		ParameterTool parameters = ParameterTool.fromArgs(args);
-		if (! parameters.has("directed")) {
-			printUsage();
-			return;
-		}
-		boolean directedAlgorithm = parameters.getBoolean("directed");
-
-		DataSet tl;
-
-		switch (parameters.get("input", "")) {
-			case "csv": {
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				GraphCsvReader reader = Graph
-					.fromCsvReader(parameters.get("input_filename"), env)
-						.ignoreCommentsEdges("#")
-						.lineDelimiterEdges(lineDelimiter)
-						.fieldDelimiterEdges(fieldDelimiter);
-
-				switch (parameters.get("type", "")) {
-					case "integer": {
-						Graph<LongValue, NullValue, NullValue> graph = reader
-							.keyType(LongValue.class);
-
-						if (directedAlgorithm) {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
-						} else {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
-						}
-					} break;
-
-					case "string": {
-						Graph<StringValue, NullValue, NullValue> graph = reader
-							.keyType(StringValue.class);
-
-						if (directedAlgorithm) {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>());
-						} else {
-							tl = graph
-								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>());
-						}
-					} break;
-
-					default:
-						printUsage();
-						return;
-				}
-
-
-			} break;
-
-			case "rmat": {
-				int scale = parameters.getInt("scale", DEFAULT_SCALE);
-				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
-
-				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
-
-				long vertexCount = 1L << scale;
-				long edgeCount = vertexCount * edgeFactor;
-
-				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.generate();
-
-				if (directedAlgorithm) {
-					if (scale > 32) {
-						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>());
-					} else {
-						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>())
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>());
-					}
-				} else {
-					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
-
-					graph = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
-					if (scale > 32) {
-						tl = graph
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>());
-					} else {
-						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue()))
-							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip))
-							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>());
-					}
-				}
-			} break;
-
-			default:
-				printUsage();
-				return;
-		}
-
-		switch (parameters.get("output", "")) {
-			case "print":
-				if (directedAlgorithm) {
-					for (Object e: tl.collect()) {
-						org.apache.flink.graph.library.clustering.directed.TriangleListing.Result result =
-							(org.apache.flink.graph.library.clustering.directed.TriangleListing.Result) e;
-						System.out.println(result.toVerboseString());
-					}
-				} else {
-					tl.print();
-				}
-				break;
-
-			case "hash":
-				System.out.println(DataSetUtils.checksumHashCode(tl));
-				break;
-
-			case "csv":
-				String filename = parameters.get("output_filename");
-
-				String lineDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
-
-				String fieldDelimiter = StringEscapeUtils.unescapeJava(
-					parameters.get("output_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
-
-				tl.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
-
-				env.execute();
-				break;
-			default:
-				printUsage();
-				return;
-		}
-
-		JobExecutionResult result = env.getLastJobExecutionResult();
-
-		NumberFormat nf = NumberFormat.getInstance();
-		System.out.println("Execution runtime: " + nf.format(result.getNetRuntime()) + " ms");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
deleted file mode 100644
index b1bc831..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/utils/ExampleUtils.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.examples.utils;
-
-import java.io.PrintStream;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.io.OutputFormat;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-
-public class ExampleUtils {
-
-	@SuppressWarnings({ "serial", "unchecked", "rawtypes" })
-	public static void printResult(DataSet set, String msg) {
-		set.output(new PrintingOutputFormatWithMessage(msg) {
-		});
-	}
-
-	public static class PrintingOutputFormatWithMessage<T> implements
-			OutputFormat<T> {
-
-		private static final long serialVersionUID = 1L;
-
-		private transient PrintStream stream;
-
-		private transient String prefix;
-
-		private String message;
-
-		// --------------------------------------------------------------------------------------------
-
-		/**
-		 * Instantiates a printing output format that prints to standard out.
-		 */
-		public PrintingOutputFormatWithMessage() {
-		}
-
-		public PrintingOutputFormatWithMessage(String msg) {
-			this.message = msg;
-		}
-
-		@Override
-		public void open(int taskNumber, int numTasks) {
-			// get the target stream
-			this.stream = System.out;
-
-			// set the prefix to message
-			this.prefix = message + ": ";
-		}
-
-		@Override
-		public void writeRecord(T record) {
-			if (this.prefix != null) {
-				this.stream.println(this.prefix + record.toString());
-			} else {
-				this.stream.println(record.toString());
-			}
-		}
-
-		@Override
-		public void close() {
-			this.stream = null;
-			this.prefix = null;
-		}
-
-		@Override
-		public String toString() {
-			return "Print to System.out";
-		}
-
-		@Override
-		public void configure(Configuration parameters) {
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static DataSet<Vertex<Long, NullValue>> getVertexIds(
-			ExecutionEnvironment env, final long numVertices) {
-		return env.generateSequence(1, numVertices).map(
-				new MapFunction<Long, Vertex<Long, NullValue>>() {
-					public Vertex<Long, NullValue> map(Long l) {
-						return new Vertex<Long, NullValue>(l, NullValue
-								.getInstance());
-					}
-				});
-	}
-
-	@SuppressWarnings("serial")
-	public static DataSet<Edge<Long, NullValue>> getRandomEdges(
-			ExecutionEnvironment env, final long numVertices) {
-		return env.generateSequence(1, numVertices).flatMap(
-				new FlatMapFunction<Long, Edge<Long, NullValue>>() {
-					@Override
-					public void flatMap(Long key, Collector<Edge<Long, NullValue>> out) throws Exception {
-						int numOutEdges = (int) (Math.random() * (numVertices / 2));
-						for (int i = 0; i < numOutEdges; i++) {
-							long target = (long) (Math.random() * numVertices) + 1;
-							out.collect(new Edge<Long, NullValue>(key, target,
-									NullValue.getInstance()));
-						}
-					}
-				});
-	}
-
-	public static DataSet<Vertex<Long, Double>> getLongDoubleVertexData(
-			ExecutionEnvironment env) {
-		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
-		vertices.add(new Vertex<Long, Double>(1L, 1.0));
-		vertices.add(new Vertex<Long, Double>(2L, 2.0));
-		vertices.add(new Vertex<Long, Double>(3L, 3.0));
-		vertices.add(new Vertex<Long, Double>(4L, 4.0));
-		vertices.add(new Vertex<Long, Double>(5L, 5.0));
-
-		return env.fromCollection(vertices);
-	}
-
-	public static DataSet<Edge<Long, Double>> getLongDoubleEdgeData(
-			ExecutionEnvironment env) {
-		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
-		edges.add(new Edge<Long, Double>(1L, 2L, 12.0));
-		edges.add(new Edge<Long, Double>(1L, 3L, 13.0));
-		edges.add(new Edge<Long, Double>(2L, 3L, 23.0));
-		edges.add(new Edge<Long, Double>(3L, 4L, 34.0));
-		edges.add(new Edge<Long, Double>(3L, 5L, 35.0));
-		edges.add(new Edge<Long, Double>(4L, 5L, 45.0));
-		edges.add(new Edge<Long, Double>(5L, 1L, 51.0));
-
-		return env.fromCollection(edges);
-	}
-
-	/**
-	 * Private constructor to prevent instantiation.
-	 */
-	private ExampleUtils() {
-		throw new RuntimeException();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
deleted file mode 100644
index ebf43d4..0000000
--- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.scala.examples
-
-import org.apache.flink.api.scala._
-import org.apache.flink.graph.scala._
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.Edge
-import org.apache.flink.util.Collector
-
-/**
- * This example illustrates how to use Gelly metrics methods and get simple statistics
- * from the input graph.  
- * 
- * The program creates a random graph and computes and prints
- * the following metrics:
- * - number of vertices
- * - number of edges
- * - average node degree
- * - the vertex ids with the max/min in- and out-degrees
- *
- * The input file is expected to contain one edge per line,
- * with long IDs and no values, in the following format:
- * {{{
- *   <sourceVertexID>\t<targetVertexID>
- * }}}
- * If no arguments are provided, the example runs with a random graph of 100 vertices.
- *
- */
-object GraphMetrics {
-  def main(args: Array[String]) {
-    if (!parseParameters(args)) {
-      return
-    }
-
-    val env = ExecutionEnvironment.getExecutionEnvironment
-    /** create the graph **/
-    val graph: Graph[Long, NullValue, NullValue] = Graph.fromDataSet(getEdgeDataSet(env), env)
-
-    /** get the number of vertices **/
-    val numVertices = graph.numberOfVertices
-
-    /** get the number of edges **/
-    val numEdges = graph.numberOfEdges
-
-    /** compute the average node degree **/
-    val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)
-
-    /** find the vertex with the maximum in-degree **/
-    val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum in-degree **/
-    val minInDegreeVertex = graph.inDegrees.min(1).map(in => in._1)
-
-    /** find the vertex with the maximum out-degree **/
-    val maxOutDegreeVertex = graph.outDegrees.max(1).map(in => in._1)
-
-    /** find the vertex with the minimum out-degree **/
-    val minOutDegreeVertex = graph.outDegrees.min(1).map(in => in._1)
-
-    /** print the results **/
-    env.fromElements(numVertices).printOnTaskManager("Total number of vertices")
-    env.fromElements(numEdges).printOnTaskManager("Total number of edges")
-    avgDegree.printOnTaskManager("Average node degree")
-    maxInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    minInDegreeVertex.printOnTaskManager("Vertex with Max in-degree")
-    maxOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-    minOutDegreeVertex.printOnTaskManager("Vertex with Max out-degree")
-
-  }
-
-  private def parseParameters(args: Array[String]): Boolean = {
-    if (args.length > 0) {
-      fileOutput = true
-      if (args.length == 1) {
-        edgesPath = args(0)
-        true
-      } else {
-        System.err.println("Usage: GraphMetrics <edges path>")
-        false
-      }
-    } else {
-      System.out.println("Executing GraphMetrics example with built-in default data.")
-      System.out.println("  Provide parameters to read input data from a file.")
-      System.out.println("  Usage: GraphMetrics <edges path>")
-      true
-    }
-  }
-
-  private def getEdgeDataSet(env: ExecutionEnvironment): DataSet[Edge[Long, NullValue]] = {
-    if (fileOutput) {
-      env.readCsvFile[(Long, Long)](
-        edgesPath,
-        fieldDelimiter = "\t").map(
-        in => new Edge[Long, NullValue](in._1, in._2, NullValue.getInstance()))
-    } else {
-      env.generateSequence(1, numVertices).flatMap[Edge[Long, NullValue]](
-        (key: Long, out: Collector[Edge[Long, NullValue]]) => {
-          val numOutEdges: Int = (Math.random() * (numVertices / 2)).toInt
-          for ( i <- 0 to numOutEdges ) {
-            val target: Long = ((Math.random() * numVertices) + 1).toLong
-            new Edge[Long, NullValue](key, target, NullValue.getInstance())
-          }
-      })
-    }
-  }
-
-  private var fileOutput: Boolean = false
-  private var edgesPath: String = null
-  private var outputPath: String = null
-  private val numVertices = 100
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 167e31c..b3e1e30 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -367,6 +367,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triangle triplets.
 		 *
 		 * @return number of triangle triplets
@@ -453,6 +462,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
 				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
 				+ "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 22f7733..909eea5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -228,6 +228,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triplets.
 		 *
 		 * @return number of triplets
@@ -278,6 +287,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum out degree: " + nf.format(maximumOutDegree)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 1d5b664..6bce42c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -329,6 +329,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triangle triplets.
 		 *
 		 * @return number of triangle triplets
@@ -397,6 +406,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
 				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
 				+ "; triplet count: " + nf.format(tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/c4f9f0d7/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index d04fa7b..8012605 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -230,6 +230,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		}
 
 		/**
+		 * Get the average degree.
+		 *
+		 * @return average degree
+		 */
+		public float getAverageDegree() {
+			return edgeCount / (float)vertexCount;
+		}
+
+		/**
 		 * Get the number of triplets.
 		 *
 		 * @return number of triplets
@@ -262,6 +271,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 			return "vertex count: " + nf.format(vertexCount)
 				+ "; edge count: " + nf.format(edgeCount)
+				+ "; average degree: " + nf.format(getAverageDegree())
 				+ "; triplet count: " + nf.format(tripletCount)
 				+ "; maximum degree: " + nf.format(maximumDegree)
 				+ "; maximum triplets: " + nf.format(maximumTriplets);


Mime
View raw message