flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-4264] [gelly] New GraphMetrics driver
Date Wed, 24 Aug 2016 14:40:49 GMT
Repository: flink
Updated Branches:
  refs/heads/master ad8e665f0 -> 58850f292


[FLINK-4264] [gelly] New GraphMetrics driver

Updates VertexMetrics analytic, adds directed and undirected
EdgeMetric analytics, and includes a new GraphMetrics driver.

This closes #2295


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/58850f29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/58850f29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/58850f29

Branch: refs/heads/master
Commit: 58850f29243559c67bdc07c737bc5772c3f098db
Parents: ad8e665
Author: Greg Hogan <code@greghogan.com>
Authored: Wed Jul 20 15:44:54 2016 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Wed Aug 24 10:05:12 2016 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/driver/GraphMetrics.java | 232 +++++++++
 .../graph/examples/ClusteringCoefficient.java   |  14 +-
 .../org/apache/flink/graph/examples/HITS.java   |   2 +-
 .../flink/graph/examples/JaccardIndex.java      |   2 +-
 .../flink/graph/examples/TriangleListing.java   |   2 +-
 .../degree/annotate/directed/VertexDegrees.java |   2 +-
 .../directed/LocalClusteringCoefficient.java    |  32 +-
 .../undirected/LocalClusteringCoefficient.java  |  31 +-
 .../library/metric/directed/EdgeMetrics.java    | 507 +++++++++++++++++++
 .../library/metric/directed/VertexMetrics.java  | 102 +++-
 .../library/metric/undirected/EdgeMetrics.java  | 445 ++++++++++++++++
 .../metric/undirected/VertexMetrics.java        |  63 ++-
 .../metric/directed/EdgeMetricsTest.java        |  90 ++++
 .../metric/directed/VertexMetricsTest.java      |  19 +-
 .../metric/undirected/EdgeMetricsTest.java      |  89 ++++
 .../metric/undirected/VertexMetricsTest.java    |  14 +-
 16 files changed, 1600 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
new file mode 100644
index 0000000..cc265bb
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/driver/GraphMetrics.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.driver;
+
+import org.apache.commons.lang3.StringEscapeUtils;
+import org.apache.commons.lang3.text.WordUtils;
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.CsvOutputFormat;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
+import org.apache.flink.graph.GraphCsvReader;
+import org.apache.flink.graph.asm.translate.LongValueToIntValue;
+import org.apache.flink.graph.asm.translate.TranslateGraphIds;
+import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.types.StringValue;
+
+import java.text.NumberFormat;
+
+/**
+ * Computes vertex and edge metrics on a directed or undirected graph.
+ *
+ * @see org.apache.flink.graph.library.metric.directed.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.directed.VertexMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.EdgeMetrics
+ * @see org.apache.flink.graph.library.metric.undirected.VertexMetrics
+ */
+public class GraphMetrics {
+
+	public static final int DEFAULT_SCALE = 10;
+
+	public static final int DEFAULT_EDGE_FACTOR = 16;
+
+	public static final boolean DEFAULT_CLIP_AND_FLIP = true;
+
+	private static void printUsage() {
+		System.out.println(WordUtils.wrap("Computes vertex and edge metrics on a directed or undirected graph.", 80));
+		System.out.println();
+		System.out.println("usage: GraphMetrics --directed <true | false> --input <csv | rmat [options]>");
+		System.out.println();
+		System.out.println("options:");
+		System.out.println("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
+		System.out.println("  --input rmat [--scale SCALE] [--edge_factor EDGE_FACTOR]");
+	}
+
+	public static void main(String[] args) throws Exception {
+		// Set up the execution environment
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		env.getConfig().enableObjectReuse();
+
+		ParameterTool parameters = ParameterTool.fromArgs(args);
+		if (! parameters.has("directed")) {
+			printUsage();
+			return;
+		}
+		boolean directedAlgorithm = parameters.getBoolean("directed");
+
+		GraphAnalytic vm;
+		GraphAnalytic em;
+
+		switch (parameters.get("input", "")) {
+			case "csv": {
+				String lineDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_line_delimiter", CsvOutputFormat.DEFAULT_LINE_DELIMITER));
+
+				String fieldDelimiter = StringEscapeUtils.unescapeJava(
+					parameters.get("input_field_delimiter", CsvOutputFormat.DEFAULT_FIELD_DELIMITER));
+
+				GraphCsvReader reader = Graph
+					.fromCsvReader(parameters.get("input_filename"), env)
+						.ignoreCommentsEdges("#")
+						.lineDelimiterEdges(lineDelimiter)
+						.fieldDelimiterEdges(fieldDelimiter);
+
+				switch (parameters.get("type", "")) {
+					case "integer": {
+						Graph<LongValue, NullValue, NullValue> graph = reader
+							.keyType(LongValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+						}
+					} break;
+
+					case "string": {
+						Graph<StringValue, NullValue, NullValue> graph = reader
+							.keyType(StringValue.class);
+
+						if (directedAlgorithm) {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.directed.Simplify<StringValue, NullValue, NullValue>());
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<StringValue, NullValue, NullValue>());
+						} else {
+							if (parameters.getBoolean("simplify", false)) {
+								graph = graph
+									.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<StringValue, NullValue, NullValue>(false));
+							}
+
+							vm = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<StringValue, NullValue, NullValue>());
+							em = graph
+								.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<StringValue, NullValue, NullValue>());
+						}
+					} break;
+
+					default:
+						printUsage();
+						return;
+				}
+				} break;
+
+			case "rmat": {
+				int scale = parameters.getInt("scale", DEFAULT_SCALE);
+				int edgeFactor = parameters.getInt("edge_factor", DEFAULT_EDGE_FACTOR);
+
+				RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+				long vertexCount = 1L << scale;
+				long edgeCount = vertexCount * edgeFactor;
+
+
+				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+					.generate();
+
+				if (directedAlgorithm) {
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>());
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				} else {
+					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
+
+					if (scale > 32) {
+						Graph<LongValue, NullValue, NullValue> newGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<LongValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<LongValue, NullValue, NullValue>());
+					} else {
+						Graph<IntValue, NullValue, NullValue> newGraph = graph
+							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
+							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip));
+
+						vm = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<IntValue, NullValue, NullValue>());
+						em = newGraph
+							.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<IntValue, NullValue, NullValue>());
+					}
+				}
+				} break;
+
+			default:
+				printUsage();
+				return;
+		}
+
+		env.execute("Graph Metrics");
+
+		System.out.print("Vertex metrics:\n  ");
+		System.out.println(vm.getResult().toString().replace(";", "\n "));
+		System.out.print("\nEdge metrics:\n  ");
+		System.out.println(em.getResult().toString().replace(";", "\n "));
+
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		NumberFormat nf = NumberFormat.getInstance();
+		System.out.println("\nExecution runtime: " + nf.format(result.getNetRuntime()) + " ms");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 8641428..e099e2b 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -72,7 +72,7 @@ public class ClusteringCoefficient {
 		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex ID, the degree of" +
 			" the vertex, and the number of edges between vertex neighbors.", 80));
 		System.out.println();
-		System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
+		System.out.println("usage: ClusteringCoefficient --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
 		System.out.println();
 		System.out.println("options:");
 		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");
@@ -174,7 +174,8 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
 						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false));
 					} else {
 						Graph<IntValue, NullValue, NullValue> newGraph = graph
 							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
@@ -183,7 +184,8 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
 						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false));
 					}
 				} else {
 					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
@@ -195,7 +197,8 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>());
 						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false));
 					} else {
 						Graph<IntValue, NullValue, NullValue> newGraph = graph
 							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToIntValue()))
@@ -204,7 +207,8 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>());
 						lcc = newGraph
-							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>());
+							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setIncludeZeroDegreeVertices(false));
 					}
 				}
 			} break;

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
index c772a3a..59612d9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/HITS.java
@@ -65,7 +65,7 @@ public class HITS {
 			" scores for every vertex in a directed graph. A good \"hub\" links to good \"authorities\"" +
 			" and good \"authorities\" are linked from good \"hubs\".", 80));
 		System.out.println();
-		System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]");
+		System.out.println("usage: HITS --input <csv | rmat [options]> --output <print | hash | csv [options]>");
 		System.out.println();
 		System.out.println("options:");
 		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index 2158fa2..824aab7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -69,7 +69,7 @@ public class JaccardIndex {
 		System.out.println(WordUtils.wrap("This algorithm returns 4-tuples containing two vertex IDs, the" +
 			" number of shared neighbors, and the number of distinct neighbors.", 80));
 		System.out.println();
-		System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]");
+		System.out.println("usage: JaccardIndex --input <csv | rmat [options]> --output <print | hash | csv [options]>");
 		System.out.println();
 		System.out.println("options:");
 		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index cd06dde..f3ce708 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -66,7 +66,7 @@ public class TriangleListing {
 		System.out.println(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
 			" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80));
 		System.out.println();
-		System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]");
+		System.out.println("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>");
 		System.out.println();
 		System.out.println("options:");
 		System.out.println("  --input csv --type <integer | string> --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]");

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 9fef221..84873bc 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 
 	@Override
 	protected String getAlgorithmName() {
-		return VertexOutDegree.class.getName();
+		return VertexDegrees.class.getName();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index e0defcd..22c8b41 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -32,6 +32,7 @@ import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees
 import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -59,9 +60,26 @@ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<
 extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
+
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
+	 * By default the vertex set is checked for zero degree vertices. When this
+	 * flag is disabled only clustering coefficient scores for vertices with
+	 * a degree of a least one will be produced.
+	 *
+	 * @param includeZeroDegreeVertices whether to output scores for vertices
+	 *                                  with a degree of zero
+	 * @return this
+	 */
+	public LocalClusteringCoefficient<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
+
+		return this;
+	}
+
+	/**
 	 * Override the parallelism of operators processing small amounts of data.
 	 *
 	 * @param littleParallelism operator parallelism
@@ -90,6 +108,16 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
+
 		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
 
 		return true;
@@ -128,8 +156,8 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 		// u, deg(u)
 		DataSet<Vertex<K, Degrees>> vertexDegree = input
 			.run(new VertexDegrees<K, VV, EV>()
-				.setParallelism(littleParallelism)
-				.setIncludeZeroDegreeVertices(true));
+				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
+				.setParallelism(littleParallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index cd859d9..4b4bf07 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -32,6 +32,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmDelegatingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
@@ -59,9 +60,26 @@ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<
 extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 	// Optional configuration
+	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
+
 	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
+	 * By default the vertex set is checked for zero degree vertices. When this
+	 * flag is disabled only clustering coefficient scores for vertices with
+	 * a degree of a least one will be produced.
+	 *
+	 * @param includeZeroDegreeVertices whether to output scores for vertices
+	 *                                  with a degree of zero
+	 * @return this
+	 */
+	public LocalClusteringCoefficient<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+		this.includeZeroDegreeVertices.set(includeZeroDegreeVertices);
+
+		return this;
+	}
+
+	/**
 	 * Override the parallelism of operators processing small amounts of data.
 	 *
 	 * @param littleParallelism operator parallelism
@@ -91,6 +109,15 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
+		// verify that configurations can be merged
+
+		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
+			return false;
+		}
+
+		// merge configurations
+
+		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
 		littleParallelism = Math.min(littleParallelism, rhs.littleParallelism);
 
 		return true;
@@ -129,8 +156,8 @@ extends GraphAlgorithmDelegatingDataSet<K, VV, EV, Result<K>> {
 		// u, deg(u)
 		DataSet<Vertex<K, LongValue>> vertexDegree = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setParallelism(littleParallelism)
-				.setIncludeZeroDegreeVertices(true));
+				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
+				.setParallelism(littleParallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
new file mode 100644
index 0000000..167e31c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -0,0 +1,507 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Collector;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the following edge metrics in a directed graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triangle triplets
+ *  - number of rectangle triplets
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum out degree
+ *  - maximum in degree
+ *  - maximum number of triangle triplets
+ *  - maximum number of rectangle triplets
+ *  - maximum number of triplets
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+	 *   a hash-combineable hashable-reduce.
+	 *
+	 * Use distinct to replace ReduceEdgeStats when the combiner can be disabled
+	 *   with a sorted-reduce forced.
+	 */
+
+	@Override
+	public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		// s, t, (d(s), d(t))
+		DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> edgeDegreesPair = input
+			.run(new EdgeDegreesPair<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
+		DataSet<Tuple3<K, Degrees, LongValue>> edgeStats = edgeDegreesPair
+			.flatMap(new EdgeStats<K, EV>())
+				.setParallelism(parallelism)
+				.name("Edge stats")
+			.groupBy(0, 1)
+			.reduceGroup(new ReduceEdgeStats<K>())
+				.setParallelism(parallelism)
+				.name("Reduce edge stats")
+			.groupBy(0)
+			.reduce(new SumEdgeStats<K>())
+				.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum edge stats");
+
+		edgeStats
+			.output(new EdgeMetricsHelper<K, EV>(id))
+				.setParallelism(parallelism)
+				.name("Edge metrics");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		long edgeCount = res.getAccumulatorResult(id + "-1");
+		long triangleTripletCount = res.getAccumulatorResult(id + "-2");
+		long rectangleTripletCount = res.getAccumulatorResult(id + "-3");
+		long tripletCount = res.getAccumulatorResult(id + "-4");
+		long maximumDegree = res.getAccumulatorResult(id + "-5");
+		long maximumOutDegree = res.getAccumulatorResult(id + "-6");
+		long maximumInDegree = res.getAccumulatorResult(id + "-7");
+		long maximumTriangleTriplets = res.getAccumulatorResult(id + "-8");
+		long maximumRectangleTriplets = res.getAccumulatorResult(id + "-9");
+		long maximumTriplets = res.getAccumulatorResult(id + "-a");
+
+		return new Result(vertexCount, edgeCount, triangleTripletCount, rectangleTripletCount, tripletCount,
+			maximumDegree, maximumOutDegree, maximumInDegree,
+			maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets);
+	}
+
+	/**
+	 * Produces a pair of tuples. The first tuple contains the source vertex ID,
+	 * the target vertex ID, the source degrees, and the low-order count. The
+	 * second tuple is the same with the source and target roles reversed.
+	 *
+	 * The low-order count is one if the source vertex degree is less than the
+	 * target vertex degree or if the degrees are equal and the source vertex
+	 * ID compares lower than the target vertex ID; otherwise the low-order
+	 * count is zero.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	private static final class EdgeStats<T extends Comparable<T>, ET>
+	implements FlatMapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple4<T, T, Degrees, LongValue>> {
+		private LongValue zero = new LongValue(0);
+
+		private LongValue one = new LongValue(1);
+
+		private Tuple4<T, T, Degrees, LongValue> output = new Tuple4<>();
+
+		@Override
+		public void flatMap(Edge<T, Tuple3<ET, Degrees, Degrees>> edge, Collector<Tuple4<T, T, Degrees, LongValue>> out)
+				throws Exception {
+			Tuple3<ET, Degrees, Degrees> degrees = edge.f2;
+			long sourceDegree = degrees.f1.getDegree().getValue();
+			long targetDegree = degrees.f2.getDegree().getValue();
+
+			boolean ordered = (sourceDegree < targetDegree
+				|| (sourceDegree == targetDegree && edge.f0.compareTo(edge.f1) < 0));
+
+			output.f0 = edge.f0;
+			output.f1 = edge.f1;
+			output.f2 = edge.f2.f1;
+			output.f3 = ordered ? one : zero;
+			out.collect(output);
+
+			output.f0 = edge.f1;
+			output.f1 = edge.f0;
+			output.f2 = edge.f2.f2;
+			output.f3 = ordered ? zero : one;
+			out.collect(output);
+		}
+	}
+
+	/**
+	 * Produces a distinct value for each edge.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static final class ReduceEdgeStats<T>
+	implements GroupReduceFunction<Tuple4<T, T, Degrees, LongValue>, Tuple3<T, Degrees, LongValue>> {
+		Tuple3<T, Degrees, LongValue> output = new Tuple3<>();
+
+		@Override
+		public void reduce(Iterable<Tuple4<T, T, Degrees, LongValue>> values, Collector<Tuple3<T, Degrees, LongValue>> out)
+				throws Exception {
+			Tuple4<T, T, Degrees, LongValue> value = values.iterator().next();
+
+			output.f0 = value.f0;
+			output.f1 = value.f2;
+			output.f2 = value.f3;
+
+			out.collect(output);
+		}
+	}
+
+	/**
+	 * Sums the low-order counts.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class SumEdgeStats<T>
+	implements ReduceFunction<Tuple3<T, Degrees, LongValue>> {
+		@Override
+		public Tuple3<T, Degrees, LongValue> reduce(Tuple3<T, Degrees, LongValue> value1, Tuple3<T, Degrees, LongValue> value2)
+				throws Exception {
+			value1.f2.setValue(value1.f2.getValue() + value2.f2.getValue());
+			return value1;
+		}
+	}
+
+	/**
+	 * Helper class to collect edge metrics.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
+	extends RichOutputFormat<Tuple3<T, Degrees, LongValue>> {
+		private final String id;
+
+		private long vertexCount;
+		private long edgeCount;
+		private long triangleTripletCount;
+		private long rectangleTripletCount;
+		private long tripletCount;
+		private long maximumDegree;
+		private long maximumOutDegree;
+		private long maximumInDegree;
+		private long maximumTriangleTriplets;
+		private long maximumRectangleTriplets;
+		private long maximumTriplets;
+
+		/**
+		 * This helper class collects edge metrics by scanning over and
+		 * discarding elements from the given DataSet.
+		 *
+		 * The unique id is required because Flink's accumulator namespace is
+		 * among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public EdgeMetricsHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(Tuple3<T, Degrees, LongValue> record) throws IOException {
+			Degrees degrees = record.f1;
+			long degree = degrees.getDegree().getValue();
+			long outDegree = degrees.getOutDegree().getValue();
+			long inDegree = degrees.getInDegree().getValue();
+
+			long lowDegree = record.f2.getValue();
+			long highDegree = degree - lowDegree;
+
+			long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
+			long rectangleTriplets = triangleTriplets + lowDegree * highDegree;
+			long triplets = degree * (degree - 1) / 2;
+
+			vertexCount++;
+			edgeCount += outDegree;
+			triangleTripletCount += triangleTriplets;
+			rectangleTripletCount += rectangleTriplets;
+			tripletCount += triplets;
+			maximumDegree = Math.max(maximumDegree, degree);
+			maximumOutDegree = Math.max(maximumOutDegree, outDegree);
+			maximumInDegree = Math.max(maximumInDegree, inDegree);
+			maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets);
+			maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets);
+			maximumTriplets = Math.max(maximumTriplets, triplets);
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
+			getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount));
+			getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount));
+			getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount));
+			getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree));
+			getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumOutDegree));
+			getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumInDegree));
+			getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriangleTriplets));
+			getRuntimeContext().addAccumulator(id + "-9", new LongMaximum(maximumRectangleTriplets));
+			getRuntimeContext().addAccumulator(id + "-a", new LongMaximum(maximumTriplets));
+		}
+	}
+
+	/**
+	 * Wraps edge metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+		private long edgeCount;
+		private long triangleTripletCount;
+		private long rectangleTripletCount;
+		private long tripletCount;
+		private long maximumDegree;
+		private long maximumOutDegree;
+		private long maximumInDegree;
+		private long maximumTriangleTriplets;
+		private long maximumRectangleTriplets;
+		private long maximumTriplets;
+
+		public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount,
+				long maximumDegree, long maximumOutDegree, long maximumInDegree,
+				long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) {
+			this.vertexCount = vertexCount;
+			this.edgeCount = edgeCount;
+			this.triangleTripletCount = triangleTripletCount;
+			this.rectangleTripletCount = rectangleTripletCount;
+			this.tripletCount = tripletCount;
+			this.maximumDegree = maximumDegree;
+			this.maximumOutDegree = maximumOutDegree;
+			this.maximumInDegree = maximumInDegree;
+			this.maximumTriangleTriplets = maximumTriangleTriplets;
+			this.maximumRectangleTriplets = maximumRectangleTriplets;
+			this.maximumTriplets = maximumTriplets;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the number of edges.
+		 *
+		 * @return number of edges
+		 */
+		public long getNumberOfEdges() {
+			return edgeCount;
+		}
+
+		/**
+		 * Get the number of triangle triplets.
+		 *
+		 * @return number of triangle triplets
+		 */
+		public long getNumberOfTriangleTriplets() {
+			return triangleTripletCount;
+		}
+
+		/**
+		 * Get the number of rectangle triplets.
+		 *
+		 * @return number of rectangle triplets
+		 */
+		public long getNumberOfRectangleTriplets() {
+			return rectangleTripletCount;
+		}
+
+		/**
+		 * Get the number of triplets.
+		 *
+		 * @return number of triplets
+		 */
+		public long getNumberOfTriplets() {
+			return tripletCount;
+		}
+
+		/**
+		 * Get the maximum degree.
+		 *
+		 * @return maximum degree
+		 */
+		public long getMaximumDegree() {
+			return maximumDegree;
+		}
+
+		/**
+		 * Get the maximum out degree.
+		 *
+		 * @return maximum out degree
+		 */
+		public long getMaximumOutDegree() {
+			return maximumOutDegree;
+		}
+
+		/**
+		 * Get the maximum in degree.
+		 *
+		 * @return maximum in degree
+		 */
+		public long getMaximumInDegree() {
+			return maximumInDegree;
+		}
+
+		/**
+		 * Get the maximum triangle triplets.
+		 *
+		 * @return maximum triangle triplets
+		 */
+		public long getMaximumTriangleTriplets() {
+			return maximumTriangleTriplets;
+		}
+
+		/**
+		 * Get the maximum rectangle triplets.
+		 *
+		 * @return maximum rectangle triplets
+		 */
+		public long getMaximumRectangleTriplets() {
+			return maximumRectangleTriplets;
+		}
+
+		/**
+		 * Get the maximum triplets.
+		 *
+		 * @return maximum triplets
+		 */
+		public long getMaximumTriplets() {
+			return maximumTriplets;
+		}
+
+		@Override
+		public String toString() {
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "vertex count: " + nf.format(vertexCount)
+				+ "; edge count: " + nf.format(edgeCount)
+				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
+				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
+				+ "; triplet count: " + nf.format(tripletCount)
+				+ "; maximum degree: " + nf.format(maximumDegree)
+				+ "; maximum out degree: " + nf.format(maximumOutDegree)
+				+ "; maximum in degree: " + nf.format(maximumInDegree)
+				+ "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets)
+				+ "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets)
+				+ "; maximum triplets: " + nf.format(maximumTriplets);
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(edgeCount)
+				.append(triangleTripletCount)
+				.append(rectangleTripletCount)
+				.append(tripletCount)
+				.append(maximumDegree)
+				.append(maximumOutDegree)
+				.append(maximumInDegree)
+				.append(maximumTriangleTriplets)
+				.append(maximumRectangleTriplets)
+				.append(maximumTriplets)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(edgeCount, rhs.edgeCount)
+				.append(triangleTripletCount, rhs.triangleTripletCount)
+				.append(rectangleTripletCount, rhs.rectangleTripletCount)
+				.append(tripletCount, rhs.tripletCount)
+				.append(maximumDegree, rhs.maximumDegree)
+				.append(maximumOutDegree, rhs.maximumOutDegree)
+				.append(maximumInDegree, rhs.maximumInDegree)
+				.append(maximumTriangleTriplets, rhs.maximumTriangleTriplets)
+				.append(maximumRectangleTriplets, rhs.maximumRectangleTriplets)
+				.append(maximumTriplets, rhs.maximumTriplets)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 434bd28..22f7733 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
@@ -35,12 +36,19 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the number of vertices, number of edges, and number of triplets in
- * a directed graph.
+ * Compute the following vertex metrics in a directed graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum out degree
+ *  - maximum in degree
+ *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -107,8 +115,12 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		long vertexCount = res.getAccumulatorResult(id + "-0");
 		long edgeCount = res.getAccumulatorResult(id + "-1");
 		long tripletCount = res.getAccumulatorResult(id + "-2");
+		long maximumDegree = res.getAccumulatorResult(id + "-3");
+		long maximumOutDegree = res.getAccumulatorResult(id + "-4");
+		long maximumInDegree = res.getAccumulatorResult(id + "-5");
+		long maximumTriplets = res.getAccumulatorResult(id + "-6");
 
-		return new Result(vertexCount, edgeCount / 2, tripletCount);
+		return new Result(vertexCount, edgeCount, tripletCount, maximumDegree, maximumOutDegree, maximumInDegree, maximumTriplets);
 	}
 
 	/**
@@ -123,13 +135,17 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		private long vertexCount;
 		private long edgeCount;
 		private long tripletCount;
+		private long maximumDegree;
+		private long maximumOutDegree;
+		private long maximumInDegree;
+		private long maximumTriplets;
 
 		/**
 		 * This helper class collects vertex metrics by scanning over and
 		 * discarding elements from the given DataSet.
 		 *
 		 * The unique id is required because Flink's accumulator namespace is
-		 * among all operators.
+		 * shared among all operators.
 		 *
 		 * @param id unique string used for accumulator names
 		 */
@@ -147,10 +163,16 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		public void writeRecord(Vertex<T, Degrees> record) throws IOException {
 			long degree = record.f1.getDegree().getValue();
 			long outDegree = record.f1.getOutDegree().getValue();
+			long inDegree = record.f1.getInDegree().getValue();
+			long triplets = degree * (degree - 1) / 2;
 
 			vertexCount++;
 			edgeCount += outDegree;
-			tripletCount += degree * (degree - 1) / 2;
+			tripletCount += triplets;
+			maximumDegree = Math.max(maximumDegree, degree);
+			maximumOutDegree = Math.max(maximumOutDegree, outDegree);
+			maximumInDegree = Math.max(maximumInDegree, inDegree);
+			maximumTriplets = Math.max(maximumTriplets, triplets);
 		}
 
 		@Override
@@ -158,6 +180,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
 			getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
 			getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount));
+			getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree));
+			getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumOutDegree));
+			getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumInDegree));
+			getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriplets));
 		}
 	}
 
@@ -168,11 +194,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		private long vertexCount;
 		private long edgeCount;
 		private long tripletCount;
+		private long maximumDegree;
+		private long maximumOutDegree;
+		private long maximumInDegree;
+		private long maximumTriplets;
 
-		public Result(long vertexCount, long edgeCount, long tripletCount) {
+		public Result(long vertexCount, long edgeCount, long tripletCount, long maximumDegree, long maximumOutDegree, long maximumInDegree, long maximumTriplets) {
 			this.vertexCount = vertexCount;
 			this.edgeCount = edgeCount;
 			this.tripletCount = tripletCount;
+			this.maximumDegree = maximumDegree;
+			this.maximumOutDegree = maximumOutDegree;
+			this.maximumInDegree = maximumInDegree;
+			this.maximumTriplets = maximumTriplets;
 		}
 
 		/**
@@ -202,11 +236,53 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			return tripletCount;
 		}
 
+		/**
+		 * Get the maximum degree.
+		 *
+		 * @return maximum degree
+		 */
+		public long getMaximumDegree() {
+			return maximumDegree;
+		}
+
+		/**
+		 * Get the maximum out degree.
+		 *
+		 * @return maximum out degree
+		 */
+		public long getMaximumOutDegree() {
+			return maximumOutDegree;
+		}
+
+		/**
+		 * Get the maximum in degree.
+		 *
+		 * @return maximum in degree
+		 */
+		public long getMaximumInDegree() {
+			return maximumInDegree;
+		}
+
+		/**
+		 * Get the maximum triplets.
+		 *
+		 * @return maximum triplets
+		 */
+		public long getMaximumTriplets() {
+			return maximumTriplets;
+		}
+
 		@Override
 		public String toString() {
-			return "vertex count: " + vertexCount
-				+ ", edge count:" + edgeCount
-				+ ", triplet count: " + tripletCount;
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "vertex count: " + nf.format(vertexCount)
+				+ "; edge count: " + nf.format(edgeCount)
+				+ "; triplet count: " + nf.format(tripletCount)
+				+ "; maximum degree: " + nf.format(maximumDegree)
+				+ "; maximum out degree: " + nf.format(maximumOutDegree)
+				+ "; maximum in degree: " + nf.format(maximumInDegree)
+				+ "; maximum triplets: " + nf.format(maximumTriplets);
 		}
 
 		@Override
@@ -215,6 +291,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.append(vertexCount)
 				.append(edgeCount)
 				.append(tripletCount)
+				.append(maximumDegree)
+				.append(maximumOutDegree)
+				.append(maximumInDegree)
+				.append(maximumTriplets)
 				.hashCode();
 		}
 
@@ -230,6 +310,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.append(vertexCount, rhs.vertexCount)
 				.append(edgeCount, rhs.edgeCount)
 				.append(tripletCount, rhs.tripletCount)
+				.append(maximumDegree, rhs.maximumDegree)
+				.append(maximumOutDegree, rhs.maximumOutDegree)
+				.append(maximumInDegree, rhs.maximumInDegree)
+				.append(maximumTriplets, rhs.maximumTriplets)
 				.isEquals();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
new file mode 100644
index 0000000..1d5b664
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -0,0 +1,445 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
+import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the following edge metrics in an undirected graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triangle triplets
+ *  - number of rectangle triplets
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum number of triangle triplets
+ *  - maximum number of rectangle triplets
+ *  - maximum number of triplets
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private boolean reduceOnTargetId = false;
+
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * The degree can be counted from either the edge source or target IDs.
+	 * By default the source IDs are counted. Reducing on target IDs may
+	 * optimize the algorithm if the input edge list is sorted by target ID.
+	 *
+	 * @param reduceOnTargetId set to {@code true} if the input edge list
+	 *                         is sorted by target ID
+	 * @return this
+	 */
+	public EdgeMetrics<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
+		this.reduceOnTargetId = reduceOnTargetId;
+
+		return this;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+	 *   a hash-combineable hashed-reduce.
+	 */
+
+	@Override
+	public EdgeMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		// s, t, (d(s), d(t))
+		DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> edgeDegreePair = input
+			.run(new EdgeDegreePair<K, VV, EV>()
+				.setReduceOnTargetId(reduceOnTargetId)
+				.setParallelism(parallelism));
+
+		// s, d(s), count of (u, v) where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
+		DataSet<Tuple3<K, LongValue, LongValue>> edgeStats = edgeDegreePair
+			.map(new EdgeStats<K, EV>())
+				.setParallelism(parallelism)
+				.name("Edge stats")
+			.groupBy(0)
+			.reduce(new SumEdgeStats<K>())
+				.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum edge stats");
+
+		edgeStats
+			.output(new EdgeMetricsHelper<K, EV>(id))
+				.setParallelism(parallelism)
+				.name("Edge metrics");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		long edgeCount = res.getAccumulatorResult(id + "-1");
+		long triangleTripletCount = res.getAccumulatorResult(id + "-2");
+		long rectangleTripletCount = res.getAccumulatorResult(id + "-3");
+		long tripletCount = res.getAccumulatorResult(id + "-4");
+		long maximumDegree = res.getAccumulatorResult(id + "-5");
+		long maximumTriangleTriplets = res.getAccumulatorResult(id + "-6");
+		long maximumRectangleTriplets = res.getAccumulatorResult(id + "-7");
+		long maximumTriplets = res.getAccumulatorResult(id + "-8");
+
+		return new Result(vertexCount, edgeCount / 2, triangleTripletCount, rectangleTripletCount, tripletCount,
+			maximumDegree, maximumTriangleTriplets, maximumRectangleTriplets, maximumTriplets);
+	}
+
+	/**
+	 * Evaluates each edge and emits a tuple containing the source vertex ID,
+	 * the source vertex degree, and a value of zero or one indicating the
+	 * low-order count. The low-order count is one if the source vertex degree
+	 * is less than the target vertex degree or if the degrees are equal and
+	 * the source vertex ID compares lower than the target vertex ID; otherwise
+	 * the low-order count is zero.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	@FunctionAnnotation.ForwardedFields("0; 2.1->1")
+	private static class EdgeStats<T extends Comparable<T>, ET>
+	implements MapFunction<Edge<T, Tuple3<ET, LongValue, LongValue>>, Tuple3<T, LongValue, LongValue>> {
+		private LongValue zero = new LongValue(0);
+
+		private LongValue one = new LongValue(1);
+
+		private Tuple3<T, LongValue, LongValue> output = new Tuple3<>();
+
+		@Override
+		public Tuple3<T, LongValue, LongValue> map(Edge<T, Tuple3<ET, LongValue, LongValue>> edge)
+				throws Exception {
+			Tuple3<ET, LongValue, LongValue> degrees = edge.f2;
+
+			output.f0 = edge.f0;
+			output.f1 = degrees.f1;
+
+			long sourceDegree = degrees.f1.getValue();
+			long targetDegree = degrees.f2.getValue();
+
+			if (sourceDegree < targetDegree ||
+					(sourceDegree == targetDegree && edge.f0.compareTo(edge.f1) < 0)) {
+				output.f2 = one;
+			} else {
+				output.f2 = zero;
+			}
+
+			return output;
+		}
+	}
+
+	/**
+	 * Sums the low-order counts.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class SumEdgeStats<T>
+	implements ReduceFunction<Tuple3<T, LongValue, LongValue>> {
+		@Override
+		public Tuple3<T, LongValue, LongValue> reduce(Tuple3<T, LongValue, LongValue> value1, Tuple3<T, LongValue, LongValue> value2)
+				throws Exception {
+			value1.f2.setValue(value1.f2.getValue() + value2.f2.getValue());
+			return value1;
+		}
+	}
+
+	/**
+	 * Helper class to collect edge metrics.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class EdgeMetricsHelper<T extends Comparable<T>, ET>
+	extends RichOutputFormat<Tuple3<T, LongValue, LongValue>> {
+		private final String id;
+
+		private long vertexCount;
+		private long edgeCount;
+		private long triangleTripletCount;
+		private long rectangleTripletCount;
+		private long tripletCount;
+		private long maximumDegree;
+		private long maximumTriangleTriplets;
+		private long maximumRectangleTriplets;
+		private long maximumTriplets;
+
+		/**
+		 * This helper class collects edge metrics by scanning over and
+		 * discarding elements from the given DataSet.
+		 *
+		 * The unique id is required because Flink's accumulator namespace is
+		 * among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public EdgeMetricsHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(Tuple3<T, LongValue, LongValue> record) throws IOException {
+			long degree = record.f1.getValue();
+			long lowDegree = record.f2.getValue();
+			long highDegree = degree - lowDegree;
+
+			long triangleTriplets = lowDegree * (lowDegree - 1) / 2;
+			long rectangleTriplets = triangleTriplets + lowDegree * highDegree;
+			long triplets = degree * (degree - 1) / 2;
+
+			vertexCount++;
+			edgeCount += degree;
+			triangleTripletCount += triangleTriplets;
+			rectangleTripletCount += rectangleTriplets;
+			tripletCount += triplets;
+			maximumDegree = Math.max(maximumDegree, degree);
+			maximumTriangleTriplets = Math.max(maximumTriangleTriplets, triangleTriplets);
+			maximumRectangleTriplets = Math.max(maximumRectangleTriplets, rectangleTriplets);
+			maximumTriplets = Math.max(maximumTriplets, triplets);
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
+			getRuntimeContext().addAccumulator(id + "-2", new LongCounter(triangleTripletCount));
+			getRuntimeContext().addAccumulator(id + "-3", new LongCounter(rectangleTripletCount));
+			getRuntimeContext().addAccumulator(id + "-4", new LongCounter(tripletCount));
+			getRuntimeContext().addAccumulator(id + "-5", new LongMaximum(maximumDegree));
+			getRuntimeContext().addAccumulator(id + "-6", new LongMaximum(maximumTriangleTriplets));
+			getRuntimeContext().addAccumulator(id + "-7", new LongMaximum(maximumRectangleTriplets));
+			getRuntimeContext().addAccumulator(id + "-8", new LongMaximum(maximumTriplets));
+		}
+	}
+
+	/**
+	 * Wraps edge metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+		private long edgeCount;
+		private long triangleTripletCount;
+		private long rectangleTripletCount;
+		private long tripletCount;
+		private long maximumDegree;
+		private long maximumTriangleTriplets;
+		private long maximumRectangleTriplets;
+		private long maximumTriplets;
+
+		public Result(long vertexCount, long edgeCount, long triangleTripletCount, long rectangleTripletCount, long tripletCount,
+				long maximumDegree, long maximumTriangleTriplets, long maximumRectangleTriplets, long maximumTriplets) {
+			this.vertexCount = vertexCount;
+			this.edgeCount = edgeCount;
+			this.triangleTripletCount = triangleTripletCount;
+			this.rectangleTripletCount = rectangleTripletCount;
+			this.tripletCount = tripletCount;
+			this.maximumDegree = maximumDegree;
+			this.maximumTriangleTriplets = maximumTriangleTriplets;
+			this.maximumRectangleTriplets = maximumRectangleTriplets;
+			this.maximumTriplets = maximumTriplets;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the number of edges.
+		 *
+		 * @return number of edges
+		 */
+		public long getNumberOfEdges() {
+			return edgeCount;
+		}
+
+		/**
+		 * Get the number of triangle triplets.
+		 *
+		 * @return number of triangle triplets
+		 */
+		public long getNumberOfTriangleTriplets() {
+			return triangleTripletCount;
+		}
+
+		/**
+		 * Get the number of rectangle triplets.
+		 *
+		 * @return number of rectangle triplets
+		 */
+		public long getNumberOfRectangleTriplets() {
+			return rectangleTripletCount;
+		}
+
+		/**
+		 * Get the number of triplets.
+		 *
+		 * @return number of triplets
+		 */
+		public long getNumberOfTriplets() {
+			return tripletCount;
+		}
+
+		/**
+		 * Get the maximum degree.
+		 *
+		 * @return maximum degree
+		 */
+		public long getMaximumDegree() {
+			return maximumDegree;
+		}
+
+		/**
+		 * Get the maximum triangle triplets.
+		 *
+		 * @return maximum triangle triplets
+		 */
+		public long getMaximumTriangleTriplets() {
+			return maximumTriangleTriplets;
+		}
+
+		/**
+		 * Get the maximum rectangle triplets.
+		 *
+		 * @return maximum rectangle triplets
+		 */
+		public long getMaximumRectangleTriplets() {
+			return maximumRectangleTriplets;
+		}
+
+		/**
+		 * Get the maximum triplets.
+		 *
+		 * @return maximum triplets
+		 */
+		public long getMaximumTriplets() {
+			return maximumTriplets;
+		}
+
+		@Override
+		public String toString() {
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "vertex count: " + nf.format(vertexCount)
+				+ "; edge count: " + nf.format(edgeCount)
+				+ "; triangle triplet count: " + nf.format(triangleTripletCount)
+				+ "; rectangle triplet count: " + nf.format(rectangleTripletCount)
+				+ "; triplet count: " + nf.format(tripletCount)
+				+ "; maximum degree: " + nf.format(maximumDegree)
+				+ "; maximum triangle triplets: " + nf.format(maximumTriangleTriplets)
+				+ "; maximum rectangle triplets: " + nf.format(maximumRectangleTriplets)
+				+ "; maximum triplets: " + nf.format(maximumTriplets);
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(edgeCount)
+				.append(triangleTripletCount)
+				.append(rectangleTripletCount)
+				.append(tripletCount)
+				.append(maximumDegree)
+				.append(maximumTriangleTriplets)
+				.append(maximumRectangleTriplets)
+				.append(maximumTriplets)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(edgeCount, rhs.edgeCount)
+				.append(triangleTripletCount, rhs.triangleTripletCount)
+				.append(rectangleTripletCount, rhs.rectangleTripletCount)
+				.append(tripletCount, rhs.tripletCount)
+				.append(maximumDegree, rhs.maximumDegree)
+				.append(maximumTriangleTriplets, rhs.maximumTriangleTriplets)
+				.append(maximumRectangleTriplets, rhs.maximumRectangleTriplets)
+				.append(maximumTriplets, rhs.maximumTriplets)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 3c26e43..d04fa7b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -22,6 +22,7 @@ import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.io.RichOutputFormat;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.configuration.Configuration;
@@ -35,12 +36,17 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.util.AbstractID;
 
 import java.io.IOException;
+import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the number of vertices, number of edges, and number of triplets in
- * an undirected graph.
+ * Compute the following vertex metrics in an undirected graph:
+ *  - number of vertices
+ *  - number of edges
+ *  - number of triplets
+ *  - maximum degree
+ *  - maximum number of triplets
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -125,8 +131,10 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		long vertexCount = res.getAccumulatorResult(id + "-0");
 		long edgeCount = res.getAccumulatorResult(id + "-1");
 		long tripletCount = res.getAccumulatorResult(id + "-2");
+		long maximumDegree = res.getAccumulatorResult(id + "-3");
+		long maximumTriplets = res.getAccumulatorResult(id + "-4");
 
-		return new Result(vertexCount, edgeCount / 2, tripletCount);
+		return new Result(vertexCount, edgeCount / 2, tripletCount, maximumDegree, maximumTriplets);
 	}
 
 	/**
@@ -141,13 +149,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		private long vertexCount;
 		private long edgeCount;
 		private long tripletCount;
+		private long maximumDegree;
+		private long maximumTriplets;
 
 		/**
 		 * This helper class collects vertex metrics by scanning over and
 		 * discarding elements from the given DataSet.
 		 *
 		 * The unique id is required because Flink's accumulator namespace is
-		 * among all operators.
+		 * shared among all operators.
 		 *
 		 * @param id unique string used for accumulator names
 		 */
@@ -164,10 +174,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		@Override
 		public void writeRecord(Vertex<T, LongValue> record) throws IOException {
 			long degree = record.f1.getValue();
+			long triplets = degree * (degree - 1) / 2;
 
 			vertexCount++;
 			edgeCount += degree;
-			tripletCount += degree * (degree - 1) / 2;
+			tripletCount += triplets;
+			maximumDegree = Math.max(maximumDegree, degree);
+			maximumTriplets = Math.max(maximumTriplets, triplets);
 		}
 
 		@Override
@@ -175,6 +188,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
 			getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
 			getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount));
+			getRuntimeContext().addAccumulator(id + "-3", new LongMaximum(maximumDegree));
+			getRuntimeContext().addAccumulator(id + "-4", new LongMaximum(maximumTriplets));
 		}
 	}
 
@@ -185,11 +200,15 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		private long vertexCount;
 		private long edgeCount;
 		private long tripletCount;
+		private long maximumDegree;
+		private long maximumTriplets;
 
-		public Result(long vertexCount, long edgeCount, long tripletCount) {
+		public Result(long vertexCount, long edgeCount, long tripletCount, long maximumDegree, long maximumTriplets) {
 			this.vertexCount = vertexCount;
 			this.edgeCount = edgeCount;
 			this.tripletCount = tripletCount;
+			this.maximumDegree = maximumDegree;
+			this.maximumTriplets = maximumTriplets;
 		}
 
 		/**
@@ -219,11 +238,33 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			return tripletCount;
 		}
 
+		/**
+		 * Get the maximum degree.
+		 *
+		 * @return maximum degree
+		 */
+		public long getMaximumDegree() {
+			return maximumDegree;
+		}
+
+		/**
+		 * Get the maximum triplets.
+		 *
+		 * @return maximum triplets
+		 */
+		public long getMaximumTriplets() {
+			return maximumTriplets;
+		}
+
 		@Override
 		public String toString() {
-			return "vertex count: " + vertexCount
-					+ ", edge count:" + edgeCount
-					+ ", triplet count: " + tripletCount;
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "vertex count: " + nf.format(vertexCount)
+				+ "; edge count: " + nf.format(edgeCount)
+				+ "; triplet count: " + nf.format(tripletCount)
+				+ "; maximum degree: " + nf.format(maximumDegree)
+				+ "; maximum triplets: " + nf.format(maximumTriplets);
 		}
 
 		@Override
@@ -232,6 +273,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.append(vertexCount)
 				.append(edgeCount)
 				.append(tripletCount)
+				.append(maximumDegree)
+				.append(maximumTriplets)
 				.hashCode();
 		}
 
@@ -247,6 +290,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 				.append(vertexCount, rhs.vertexCount)
 				.append(edgeCount, rhs.edgeCount)
 				.append(tripletCount, rhs.tripletCount)
+				.append(maximumDegree, rhs.maximumDegree)
+				.append(maximumTriplets, rhs.maximumTriplets)
 				.isEquals();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
new file mode 100644
index 0000000..af5a154
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/EdgeMetricsTest.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.directed;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeMetricsTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 2, 3, 1, 3, 6);
+
+		Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, NullValue>()
+			.run(directedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, edgeMetrics);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedEdges = completeGraphVertexCount * expectedDegree;
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
+
+		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
+			expectedDegree, expectedDegree, expectedDegree,
+			expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets);
+
+		Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, edgeMetrics);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult;
+
+		expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+		Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(withoutZeroDegreeVertices, expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 12009, 107817, 315537, 1003442, 463, 334, 342, 820, 3822, 106953);
+
+		Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(directedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, withoutZeroDegreeVertices);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
index 8145abd..e4362c0 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/directed/VertexMetricsTest.java
@@ -34,10 +34,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithSimpleGraph()
 			throws Exception {
-		Result expectedResult = new Result(6, 7, 13);
+		Result expectedResult = new Result(6, 7, 13, 4, 2, 3, 6);
 
 		Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>()
-			.run(undirectedSimpleGraph)
+			.run(directedSimpleGraph)
 			.execute();
 
 		assertEquals(expectedResult, vertexMetrics);
@@ -47,10 +47,11 @@ extends AsmTestBase {
 	public void testWithCompleteGraph()
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
-		long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
-		long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedEdges = completeGraphVertexCount * expectedDegree;
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
-		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets);
+		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets, expectedDegree, expectedDegree, expectedDegree, expectedMaximumTriplets);
 
 		Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.run(completeGraph)
@@ -64,7 +65,7 @@ extends AsmTestBase {
 			throws Exception {
 		Result expectedResult;
 
-		expectedResult = new Result(0, 0, 0);
+		expectedResult = new Result(0, 0, 0, 0, 0, 0, 0);
 
 		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.setIncludeZeroDegreeVertices(false)
@@ -73,7 +74,7 @@ extends AsmTestBase {
 
 		assertEquals(withoutZeroDegreeVertices, expectedResult);
 
-		expectedResult = new Result(3, 0, 0);
+		expectedResult = new Result(3, 0, 0, 0, 0, 0, 0);
 
 		Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.setIncludeZeroDegreeVertices(true)
@@ -86,10 +87,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		Result expectedResult = new Result(902, 10442, 1003442);
+		Result expectedResult = new Result(902, 12009, 1003442, 463, 334, 342, 106953);
 
 		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
-			.run(undirectedRMatGraph)
+			.run(directedRMatGraph)
 			.execute();
 
 		assertEquals(expectedResult, withoutZeroDegreeVertices);

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
new file mode 100644
index 0000000..b300d66
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/EdgeMetricsTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeMetricsTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(6, 7, 2, 6, 13, 4, 1, 3, 6);
+
+		Result edgeMetrics = new EdgeMetrics<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, edgeMetrics);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
+
+		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets / 3, 2 * expectedTriplets / 3, expectedTriplets,
+			expectedDegree, expectedMaximumTriplets, expectedMaximumTriplets, expectedMaximumTriplets);
+
+		Result edgeMetrics = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, edgeMetrics);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult;
+
+		expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+		Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(withoutZeroDegreeVertices, expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 10442, 107817, 315537, 1003442, 463, 820, 3822, 106953);
+
+		Result withoutZeroDegreeVertices = new EdgeMetrics<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, withoutZeroDegreeVertices);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/58850f29/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
index a36ca94..8f7e1da 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -34,7 +34,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithSimpleGraph()
 			throws Exception {
-		Result expectedResult = new Result(6, 7, 13);
+		Result expectedResult = new Result(6, 7, 13, 4, 6);
 
 		Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>()
 			.run(undirectedSimpleGraph)
@@ -48,9 +48,11 @@ extends AsmTestBase {
 			throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
 		long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
-		long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedMaximumTriplets = CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+		long expectedTriplets = completeGraphVertexCount * expectedMaximumTriplets;
 
-		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets);
+		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets,
+			expectedDegree, expectedMaximumTriplets);
 
 		Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.run(completeGraph)
@@ -64,7 +66,7 @@ extends AsmTestBase {
 			throws Exception {
 		Result expectedResult;
 
-		expectedResult = new Result(0, 0, 0);
+		expectedResult = new Result(0, 0, 0, 0, 0);
 
 		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.setIncludeZeroDegreeVertices(false)
@@ -73,7 +75,7 @@ extends AsmTestBase {
 
 		assertEquals(withoutZeroDegreeVertices, expectedResult);
 
-		expectedResult = new Result(3, 0, 0);
+		expectedResult = new Result(3, 0, 0, 0, 0);
 
 		Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.setIncludeZeroDegreeVertices(true)
@@ -86,7 +88,7 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		Result expectedResult = new Result(902, 10442, 1003442);
+		Result expectedResult = new Result(902, 10442, 1003442, 463, 106953);
 
 		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
 			.run(undirectedRMatGraph)


Mime
View raw message