flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [6/6] flink git commit: [FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms
Date Wed, 05 Jul 2017 20:53:11 GMT
[FLINK-7006] [gelly] Base class using POJOs for Gelly algorithms

Gelly algorithms commonly have a Result class extending a Tuple type and
implementing one of the Unary/Binary/TertiaryResult interfaces.

Add a Unary/Binary/TertiaryResultBase class implementing each interface
and convert the Result classes to POJOs extending the base result
classes.

This closes #4201


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/98a15004
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/98a15004
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/98a15004

Branch: refs/heads/master
Commit: 98a15004cd815d14f4c243dd25a13aa4b122112c
Parents: 7c150a6
Author: Greg Hogan <code@greghogan.com>
Authored: Mon Jun 26 10:21:50 2017 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Wed Jul 5 15:22:48 2017 -0400

----------------------------------------------------------------------
 .../apache/flink/graph/drivers/input/CSV.java   |   7 +-
 .../drivers/input/GeneratedMultiGraph.java      |   7 +-
 .../flink/graph/drivers/input/RMatGraph.java    |   7 +-
 .../drivers/parameter/ChoiceParameter.java      |   7 +-
 .../drivers/parameter/StringParameter.java      |   7 +-
 .../graph/drivers/TriangleListingITCase.java    |  24 +--
 .../apache/flink/graph/asm/dataset/Collect.java |   2 -
 .../graph/asm/dataset/DataSetAnalytic.java      |  27 +--
 .../annotate/directed/EdgeDegreesPair.java      |   8 +-
 .../annotate/directed/EdgeSourceDegrees.java    |   8 +-
 .../annotate/directed/EdgeTargetDegrees.java    |   8 +-
 .../degree/annotate/directed/VertexDegrees.java |   8 +-
 .../annotate/directed/VertexInDegree.java       |   8 +-
 .../annotate/directed/VertexOutDegree.java      |   8 +-
 .../annotate/undirected/EdgeDegreePair.java     |   8 +-
 .../annotate/undirected/EdgeSourceDegree.java   |   8 +-
 .../annotate/undirected/EdgeTargetDegree.java   |   8 +-
 .../annotate/undirected/VertexDegree.java       |   8 +-
 .../degree/filter/undirected/MaximumDegree.java |   8 +-
 .../flink/graph/asm/result/BinaryResult.java    |  28 ++-
 .../graph/asm/result/BinaryResultBase.java      |  53 +++++
 .../flink/graph/asm/result/ResultBase.java      |  36 ++++
 .../flink/graph/asm/result/TertiaryResult.java  |   5 +-
 .../graph/asm/result/TertiaryResultBase.java    |  66 +++++++
 .../flink/graph/asm/result/UnaryResult.java     |   5 +-
 .../flink/graph/asm/result/UnaryResultBase.java |  42 ++++
 .../graph/asm/simple/directed/Simplify.java     |   8 +-
 .../graph/asm/simple/undirected/Simplify.java   |   8 +-
 .../asm/translate/TranslateEdgeValues.java      |   8 +-
 .../graph/asm/translate/TranslateGraphIds.java  |   8 +-
 .../asm/translate/TranslateVertexValues.java    |   8 +-
 .../library/clustering/TriangleListingBase.java |  91 +++++++++
 .../directed/LocalClusteringCoefficient.java    |  97 ++++++----
 .../clustering/directed/TriadicCensus.java      |  34 ++--
 .../clustering/directed/TriangleListing.java    | 192 +++++++------------
 .../undirected/LocalClusteringCoefficient.java  |  99 ++++++----
 .../clustering/undirected/TriangleListing.java  | 145 ++++----------
 .../flink/graph/library/linkanalysis/HITS.java  |  89 +++++----
 .../graph/library/linkanalysis/PageRank.java    |  70 ++++---
 .../graph/library/similarity/AdamicAdar.java    |  99 +++++-----
 .../graph/library/similarity/JaccardIndex.java  | 151 +++++++--------
 .../utils/proxy/GraphAlgorithmWrappingBase.java |  61 ++++++
 .../proxy/GraphAlgorithmWrappingDataSet.java    |  33 +---
 .../proxy/GraphAlgorithmWrappingGraph.java      |  32 +---
 .../directed/TriangleListingTest.java           |   4 +-
 .../undirected/TriangleListingTest.java         |   2 +-
 .../graph/library/linkanalysis/HITSTest.java    |   4 +-
 .../library/linkanalysis/PageRankTest.java      |  12 +-
 48 files changed, 933 insertions(+), 733 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
index 697da97..7193e5f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CSV.java
@@ -70,12 +70,7 @@ extends InputBase<K, NullValue, NullValue> {
 		return WordUtils.capitalize(getName()) + WordUtils.capitalize(type.getValue()) + " (" + inputFilename + ")";
 	}
 
-	/**
-	 * Generate the graph as configured.
-	 *
-	 * @param env execution environment
-	 * @return input graph
-	 */
+	@Override
 	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env) throws Exception {
 		GraphCsvReader reader = Graph.fromCsvReader(inputFilename.getValue(), env)
 			.ignoreCommentsEdges(commentPrefix.getValue())

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
index c0a16a8..02c8e58 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedMultiGraph.java
@@ -42,12 +42,7 @@ extends GeneratedGraph<K> {
 		return simplify.getShortString();
 	}
 
-	/**
-	 * Generate the graph as configured.
-	 *
-	 * @param env Flink execution environment
-	 * @return input graph
-	 */
+	@Override
 	public Graph<K, NullValue, NullValue> create(ExecutionEnvironment env)
 			throws Exception {
 		Graph<K, NullValue, NullValue> graph = super.create(env);

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index 66ba888..cce7afa 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -93,12 +93,7 @@ extends GeneratedMultiGraph<LongValue> {
 		return 1L << scale.getValue();
 	}
 
-	/**
-	 * Generate the graph as configured.
-	 *
-	 * @param env Flink execution environment
-	 * @return input graph
-	 */
+	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		int lp = littleParallelism.getValue().intValue();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
index c8033de..2c4544f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/ChoiceParameter.java
@@ -51,12 +51,7 @@ extends SimpleParameter<String> {
 		super(owner, name);
 	}
 
-	/**
-	 * Set the default value and add to the list of choices.
-	 *
-	 * @param defaultValue the default value.
-	 * @return this
-	 */
+	@Override
 	public ChoiceParameter setDefaultValue(String defaultValue) {
 		super.setDefaultValue(defaultValue);
 		choices.add(defaultValue);

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
index 34194ec..9ac31f2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/parameter/StringParameter.java
@@ -37,12 +37,7 @@ extends SimpleParameter<String> {
 		super(owner, name);
 	}
 
-	/**
-	 * Set the default value.
-	 *
-	 * @param defaultValue the default value.
-	 * @return this
-	 */
+	@Override
 	public StringParameter setDefaultValue(String defaultValue) {
 		super.setDefaultValue(defaultValue);
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
index 1e330dd..c691922 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/TriangleListingITCase.java
@@ -69,15 +69,15 @@ extends CopyableValueDriverBaseITCase {
 			case "short":
 			case "char":
 			case "integer":
-				checksum = 0x000000003d2f0a9aL;
+				checksum = 0x00000784d2c336cdL;
 				break;
 
 			case "long":
-				checksum = 0x000000016aba3720L;
+				checksum = 0x0000078e5ebf2927L;
 				break;
 
 			case "string":
-				checksum = 0x0000005bfef84facL;
+				checksum = 0x0000077eddf67481L;
 				break;
 
 			default:
@@ -115,15 +115,15 @@ extends CopyableValueDriverBaseITCase {
 			case "short":
 			case "char":
 			case "integer":
-				checksum = 0x0000000001f92b0cL;
+				checksum = 0x0000075b6b9a0ad0L;
 				break;
 
 			case "long":
-				checksum = 0x000000000bb355c6L;
+				checksum = 0x00000761619e7f3cL;
 				break;
 
 			case "string":
-				checksum = 0x00000002f7b5576aL;
+				checksum = 0x0000079b15eb30acL;
 				break;
 
 			default:
@@ -154,15 +154,15 @@ extends CopyableValueDriverBaseITCase {
 			case "short":
 			case "char":
 			case "integer":
-				checksum = 0x00000248fef26209L;
+				checksum = 0x0003a986d6bedc53L;
 				break;
 
 			case "long":
-				checksum = 0x000002dcdf0fbb1bL;
+				checksum = 0x0003a8d91c92b884L;
 				break;
 
 			case "string":
-				checksum = 0x00035b760ab9da74L;
+				checksum = 0x0003a88fffc33f27L;
 				break;
 
 			default:
@@ -205,15 +205,15 @@ extends CopyableValueDriverBaseITCase {
 			case "short":
 			case "char":
 			case "integer":
-				checksum = 0x00000012dee4bf2cL;
+				checksum = 0x0003a95630aae344L;
 				break;
 
 			case "long":
-				checksum = 0x00000017a40efbdaL;
+				checksum = 0x0003a9b1e055d59dL;
 				break;
 
 			case "string":
-				checksum = 0x000159e8be3e370bL;
+				checksum = 0x0003aa1e3d8f2c6bL;
 				break;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
index ad2886f..b456db6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.asm.dataset;
 import org.apache.flink.api.common.accumulators.SerializedListAccumulator;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.AnalyticHelper;
 
 import java.io.IOException;
@@ -47,7 +46,6 @@ extends DataSetAnalyticBase<T, List<T>> {
 			throws Exception {
 		super.run(input);
 
-		ExecutionEnvironment env = input.getExecutionEnvironment();
 		serializer = input.getType().createSerializer(env.getConfig());
 
 		collectHelper = new CollectHelper<>(serializer);

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
index 9c5c448..d8d796b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
@@ -35,14 +35,16 @@ import org.apache.flink.api.java.operators.CustomUnaryOperation;
 public interface DataSetAnalytic<T, R> {
 
 	/**
-	 * This method must be called after the program has executed.
-	 *  1) "run" analytics and algorithms
-	 *  2) call ExecutionEnvironment.execute()
-	 *  3) get analytic results
+	 * All {@code DataSetAnalytic} processing must be terminated by an
+	 * {@link OutputFormat} and obtained via accumulators rather than
+	 * returned by a {@link DataSet}.
 	 *
-	 * @return the result
+	 * @param input input dataset
+	 * @return this
+	 * @throws Exception
 	 */
-	R getResult();
+
+	DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
 
 	/**
 	 * Execute the program and return the result.
@@ -62,13 +64,12 @@ public interface DataSetAnalytic<T, R> {
 	R execute(String jobName) throws Exception;
 
 	/**
-	 * All {@code DataSetAnalytic} processing must be terminated by an
-	 * {@link OutputFormat} and obtained via accumulators rather than
-	 * returned by a {@link DataSet}.
+	 * This method must be called after the program has executed.
+	 *  1) "run" analytics and algorithms
+	 *  2) call ExecutionEnvironment.execute()
+	 *  3) get analytic results
 	 *
-	 * @param input input dataset
-	 * @return this
-	 * @throws Exception
+	 * @return the result
 	 */
-	DataSetAnalytic<T, R> run(DataSet<T> input) throws Exception;
+	R getResult();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 27c829b..685031c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,6 +27,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
@@ -59,12 +60,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Deg
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeDegreesPair.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 3c4e611..0299839 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
@@ -58,12 +59,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeSourceDegrees.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 94788e2..7c13b39 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.util.Preconditions;
 
@@ -58,12 +59,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeTargetDegrees.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index 0333b8b..b0cb7d7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -33,6 +33,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
@@ -85,12 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return VertexDegrees.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index f316b9b..94c2667 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return VertexInDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index b04391d..00f2f89 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return VertexOutDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index c6d0646..6228cac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -80,12 +81,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeDegreePair.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 88bab5a..01ff7d0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeSourceDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 21918c7..d3316ea 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -78,12 +79,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return EdgeTargetDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 9ea99a7..626c11b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Degr
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -98,12 +99,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return VertexDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!VertexDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index ae1e5b6..b485507 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
@@ -115,12 +116,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return MaximumDegree.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!MaximumDegree.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
index b54f00c..f41268c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResult.java
@@ -18,12 +18,17 @@
 
 package org.apache.flink.graph.asm.result;
 
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
 
 /**
  * A {@link GraphAlgorithm} result for a pair vertices.
  */
-public interface BinaryResult<T> {
+public interface BinaryResult<T>
+extends Serializable {
 
 	/**
 	 * Get the first vertex ID.
@@ -52,4 +57,25 @@ public interface BinaryResult<T> {
 	 * @param value new vertex ID
 	 */
 	void setVertexId1(T value);
+
+	/**
+	 * Output each input and a second result with the vertex order flipped.
+	 *
+	 * @param <T> ID type
+	 * @param <RT> result type
+	 */
+	class MirrorResult<T, RT extends BinaryResult<T>>
+	implements FlatMapFunction<RT, RT> {
+		@Override
+		public void flatMap(RT value, Collector<RT> out)
+				throws Exception {
+			out.collect(value);
+
+			T tmp = value.getVertexId0();
+			value.setVertexId0(value.getVertexId1());
+			value.setVertexId1(tmp);
+
+			out.collect(value);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
new file mode 100644
index 0000000..45791ee
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/BinaryResultBase.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for a pair of vertices.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class BinaryResultBase<K>
+extends ResultBase
+implements BinaryResult<K> {
+
+	private K vertexId0;
+
+	private K vertexId1;
+
+	@Override
+	public K getVertexId0() {
+		return vertexId0;
+	}
+
+	@Override
+	public void setVertexId0(K vertexId0) {
+		this.vertexId0 = vertexId0;
+	}
+
+	@Override
+	public K getVertexId1() {
+		return vertexId1;
+	}
+
+	@Override
+	public void setVertexId1(K vertexId1) {
+		this.vertexId1 = vertexId1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
new file mode 100644
index 0000000..01b13c6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/ResultBase.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results.
+ */
+public abstract class ResultBase {
+
+	/**
+	 * {@link Object#toString()} must be overridden to write POJO values in the
+	 * same form as {@link org.apache.flink.api.java.tuple.Tuple}. Values are
+	 * comma-separated and enclosed in parenthesis, e.g. "(f0,f1)".
+	 *
+	 * @return tuple representation string
+	 */
+	@Override
+	public abstract String toString();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
index 246b8cb..a546387 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResult.java
@@ -20,10 +20,13 @@ package org.apache.flink.graph.asm.result;
 
 import org.apache.flink.graph.GraphAlgorithm;
 
+import java.io.Serializable;
+
 /**
  * A {@link GraphAlgorithm} result for three vertices.
  */
-public interface TertiaryResult<T> {
+public interface TertiaryResult<T>
+extends Serializable {
 
 	/**
 	 * Get the first vertex ID.

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
new file mode 100644
index 0000000..eb505b8
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/TertiaryResultBase.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for three vertices.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class TertiaryResultBase<K>
+extends ResultBase
+implements TertiaryResult<K> {
+
+	private K vertexId0;
+
+	private K vertexId1;
+
+	private K vertexId2;
+
+	@Override
+	public K getVertexId0() {
+		return vertexId0;
+	}
+
+	@Override
+	public void setVertexId0(K vertexId0) {
+		this.vertexId0 = vertexId0;
+	}
+
+	@Override
+	public K getVertexId1() {
+		return vertexId1;
+	}
+
+	@Override
+	public void setVertexId1(K vertexId1) {
+		this.vertexId1 = vertexId1;
+	}
+
+	@Override
+	public K getVertexId2() {
+		return vertexId2;
+	}
+
+	@Override
+	public void setVertexId2(K vertexId2) {
+		this.vertexId2 = vertexId2;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
index 04a2d30..d861b43 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResult.java
@@ -20,10 +20,13 @@ package org.apache.flink.graph.asm.result;
 
 import org.apache.flink.graph.GraphAlgorithm;
 
+import java.io.Serializable;
+
 /**
  * A {@link GraphAlgorithm} result for a single vertex.
  */
-public interface UnaryResult<T> {
+public interface UnaryResult<T>
+extends Serializable {
 
 	/**
 	 * Get the first vertex ID.

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
new file mode 100644
index 0000000..5bd3229
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/result/UnaryResultBase.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.asm.result;
+
+/**
+ * Base class for algorithm results for a single vertex.
+ *
+ * @param <K> graph ID type
+ */
+public abstract class UnaryResultBase<K>
+extends ResultBase
+implements UnaryResult<K> {
+
+	private K vertexId0;
+
+	@Override
+	public K getVertexId0() {
+		return vertexId0;
+	}
+
+	@Override
+	public void setVertexId0(K vertexId0) {
+		this.vertexId0 = vertexId0;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index d978096..0d60cd9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -56,12 +57,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return Simplify.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!Simplify.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index 617dce1..dd3a9b3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
@@ -74,12 +75,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return Simplify.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!Simplify.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 3956a81..e444b5c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -71,12 +72,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return TranslateEdgeValues.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index d8c5676..3eb5113 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -73,12 +74,7 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return TranslateGraphIds.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!TranslateGraphIds.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 452cb26..38f415c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.asm.translate;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
@@ -71,12 +72,7 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return TranslateVertexValues.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!TranslateVertexValues.class.isAssignableFrom(other.getClass())) {

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
new file mode 100644
index 0000000..7357fbc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.flink.graph.library.clustering;
+
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Common configuration for directed and undirected Triangle Listing algorithms.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <R> result type
+ */
+public abstract class TriangleListingBase<K, VV, EV, R>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
+
+	// Optional configuration
+	protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
+
+	protected int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Normalize the triangle listing such that for each result (K0, K1, K2)
+	 * the vertex IDs are sorted K0 < K1 < K2.
+	 *
+	 * @param sortTriangleVertices whether to output each triangle's vertices in sorted order
+	 * @return this
+	 */
+	public TriangleListingBase<K, VV, EV, R> setSortTriangleVertices(boolean sortTriangleVertices) {
+		this.sortTriangleVertices.set(sortTriangleVertices);
+
+		return this;
+	}
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public TriangleListingBase<K, VV, EV, R> setLittleParallelism(int littleParallelism) {
+		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	@Override
+	protected final boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		Preconditions.checkNotNull(other);
+
+		if (!TriangleListingBase.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		TriangleListingBase rhs = (TriangleListingBase) other;
+
+		// merge configurations
+
+		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
+		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
+			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
+
+		return true;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index b980244..03c8808 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -27,15 +27,15 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.asm.result.UnaryResultBase;
 import org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
@@ -100,12 +100,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return LocalClusteringCoefficient.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
@@ -192,17 +187,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
 				throws Exception {
-			byte bitmask = value.f3.getValue();
+			byte bitmask = value.getBitmask().getValue();
 
-			output.f0 = value.f0;
+			output.f0 = value.getVertexId0();
 			output.f1 = ((bitmask & 0b000011) == 0b000011) ? two : one;
 			out.collect(output);
 
-			output.f0 = value.f1;
+			output.f0 = value.getVertexId1();
 			output.f1 = ((bitmask & 0b001100) == 0b001100) ? two : one;
 			out.collect(output);
 
-			output.f0 = value.f2;
+			output.f0 = value.getVertexId2();
 			output.f1 = ((bitmask & 0b110000) == 0b110000) ? two : one;
 			out.collect(output);
 		}
@@ -229,8 +224,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFieldsFirst("0; 1.0->1")
-	@ForwardedFieldsSecond("0")
+	@ForwardedFieldsFirst("0->vertexId0; 1.0->degree")
+	@ForwardedFieldsSecond("0->vertexId0")
 	private static class JoinVertexDegreeWithTriangleCount<T>
 	implements JoinFunction<Vertex<T, Degrees>, Tuple2<T, LongValue>, Result<T>> {
 		private LongValue zero = new LongValue(0);
@@ -240,35 +235,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public Result<T> join(Vertex<T, Degrees> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
 				throws Exception {
-			output.f0 = vertexAndDegree.f0;
-			output.f1 = vertexAndDegree.f1.f0;
-			output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+			output.setVertexId0(vertexAndDegree.f0);
+			output.setDegree(vertexAndDegree.f1.f0);
+			output.setTriangleCount((vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1);
 
 			return output;
 		}
 	}
 
 	/**
-	 * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
+	 * A result for the directed Local Clustering Coefficient algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Tuple3<T, LongValue, LongValue>
-	implements PrintableResult, UnaryResult<T> {
-		public static final int HASH_SEED = 0x37a208c4;
+	extends UnaryResultBase<T>
+	implements PrintableResult {
+		private LongValue degree;
 
-		private MurmurHash hasher = new MurmurHash(HASH_SEED);
-
-		@Override
-		public T getVertexId0() {
-			return f0;
-		}
-
-		@Override
-		public void setVertexId0(T value) {
-			f0 = value;
-		}
+		private LongValue triangleCount;
 
 		/**
 		 * Get the vertex degree.
@@ -276,7 +261,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return vertex degree
 		 */
 		public LongValue getDegree() {
-			return f1;
+			return degree;
+		}
+
+		/**
+		 * Set the vertex degree.
+		 *
+		 * @param degree vertex degree
+		 */
+		public void setDegree(LongValue degree) {
+			this.degree = degree;
 		}
 
 		/**
@@ -286,7 +280,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return triangle count
 		 */
 		public LongValue getTriangleCount() {
-			return f2;
+			return triangleCount;
+		}
+
+		/**
+		 * Set the number of triangles containing this vertex; equivalently,
+		 * this is the number of edges between neighbors of this vertex.
+		 *
+		 * @param triangleCount triangle count
+		 */
+		public void setTriangleCount(LongValue triangleCount) {
+			this.triangleCount = triangleCount;
 		}
 
 		/**
@@ -306,11 +310,20 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
 		}
 
+		@Override
+		public String toString() {
+			return "(" + getVertexId0()
+				+ "," + getDegree()
+				+ "," + getTriangleCount()
+				+ ")";
+		}
+
 		/**
 		 * Format values into a human-readable string.
 		 *
 		 * @return verbose string
 		 */
+		@Override
 		public String toPrintableString() {
 			return "Vertex ID: " + getVertexId0()
 				+ ", vertex degree: " + getDegree()
@@ -318,12 +331,22 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
 		}
 
+		// ----------------------------------------------------------------------------------------
+
+		public static final int HASH_SEED = 0x37a208c4;
+
+		private transient MurmurHash hasher;
+
 		@Override
 		public int hashCode() {
+			if (hasher == null) {
+				hasher = new MurmurHash(HASH_SEED);
+			}
+
 			return hasher.reset()
-				.hash(f0.hashCode())
-				.hash(f1.getValue())
-				.hash(f2.getValue())
+				.hash(getVertexId0().hashCode())
+				.hash(degree.getValue())
+				.hash(triangleCount.getValue())
 				.hash();
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 949dd4c..6ec9f0f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -102,24 +102,24 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		BigInteger three = BigInteger.valueOf(3);
 		BigInteger six = BigInteger.valueOf(6);
 
-		BigInteger vertexCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "vc"));
-		BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "uec") / 2);
-		BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "bec") / 2);
-		BigInteger triplet021dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021d"));
-		BigInteger triplet021uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021u"));
-		BigInteger triplet021cCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021c"));
-		BigInteger triplet111dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111d"));
-		BigInteger triplet111uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111u"));
-		BigInteger triplet201Count = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "201"));
+		BigInteger vertexCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "vc"));
+		BigInteger unidirectionalEdgeCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "uec") / 2);
+		BigInteger bidirectionalEdgeCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "bec") / 2);
+		BigInteger triplet021dCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021d"));
+		BigInteger triplet021uCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021u"));
+		BigInteger triplet021cCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "021c"));
+		BigInteger triplet111dCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "111d"));
+		BigInteger triplet111uCount = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "111u"));
+		BigInteger triplet201Count = BigInteger.valueOf(vertexDegreesHelper.<Long>getAccumulator(env, "201"));
 
 		// triads with three connecting edges = closed triplet = triangle
-		BigInteger triangle030tCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030t"));
-		BigInteger triangle030cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030c"));
-		BigInteger triangle120dCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120d"));
-		BigInteger triangle120uCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120u"));
-		BigInteger triangle120cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120c"));
-		BigInteger triangle210Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "210"));
-		BigInteger triangle300Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "300"));
+		BigInteger triangle030tCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "030t"));
+		BigInteger triangle030cCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "030c"));
+		BigInteger triangle120dCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120d"));
+		BigInteger triangle120uCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120u"));
+		BigInteger triangle120cCount = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "120c"));
+		BigInteger triangle210Count = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "210"));
+		BigInteger triangle300Count = BigInteger.valueOf(triangleListingHelper.<Long>getAccumulator(env, "300"));
 
 		// triads with two connecting edges = open triplet;
 		// each triangle deducts the count of three triplets
@@ -211,7 +211,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		@Override
 		public void writeRecord(TriangleListing.Result<T> record) throws IOException {
-			triangleCount[record.f3.getValue()]++;
+			triangleCount[record.getBitmask().getValue()]++;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 38b0746..23e88d1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -36,21 +36,18 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.asm.result.TertiaryResultBase;
+import org.apache.flink.graph.library.clustering.TriangleListingBase;
 import org.apache.flink.graph.library.clustering.directed.TriangleListing.Result;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.graph.utils.proxy.OptionalBoolean;
+import org.apache.flink.graph.utils.MurmurHash;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Generates a listing of distinct triangles from the input graph.
  *
@@ -68,62 +65,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class TriangleListing<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
-	// Optional configuration
-	private OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
-
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Normalize the triangle listing such that for each result (K0, K1, K2)
-	 * the vertex IDs are sorted K0 < K1 < K2.
-	 *
-	 * @param sortTriangleVertices whether to output each triangle's vertices in sorted order
-	 * @return this
-	 */
-	public TriangleListing<K, VV, EV> setSortTriangleVertices(boolean sortTriangleVertices) {
-		this.sortTriangleVertices.set(sortTriangleVertices);
-
-		return this;
-	}
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
-	@Override
-	protected String getAlgorithmName() {
-		return TriangleListing.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		TriangleListing rhs = (TriangleListing) other;
-
-		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
-	}
+extends TriangleListingBase<K, VV, EV, Result<K>> {
 
 	/*
 	 * Implementation notes:
@@ -349,8 +291,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFieldsFirst("0; 1; 2")
-	@ForwardedFieldsSecond("0; 1")
+	@ForwardedFieldsFirst("0->vertexId0; 1->vertexId1; 2->vertexId2")
+	@ForwardedFieldsSecond("0->vertexId0; 1->vertexId1")
 	private static final class ProjectTriangles<T>
 	implements JoinFunction<Tuple4<T, T, T, ByteValue>, Tuple3<T, T, ByteValue>, Result<T>> {
 		private Result<T> output = new Result<>();
@@ -358,10 +300,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public Result<T> join(Tuple4<T, T, T, ByteValue> triplet, Tuple3<T, T, ByteValue> edge)
 				throws Exception {
-			output.f0 = triplet.f0;
-			output.f1 = triplet.f1;
-			output.f2 = triplet.f2;
-			output.f3.setValue((byte) (triplet.f3.getValue() | edge.f2.getValue()));
+			output.setVertexId0(triplet.f0);
+			output.setVertexId1(triplet.f1);
+			output.setVertexId2(triplet.f2);
+			output.setBitmask((byte) (triplet.f3.getValue() | edge.f2.getValue()));
 			return output;
 		}
 	}
@@ -378,29 +320,29 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public Result<T> map(Result<T> value)
 				throws Exception {
 			// by the triangle listing algorithm we know f1 < f2
-			if (value.f0.compareTo(value.f1) > 0) {
-				byte bitmask = value.f3.getValue();
+			if (value.getVertexId0().compareTo(value.getVertexId1()) > 0) {
+				byte bitmask = value.getBitmask().getValue();
 
-				T tempVal = value.f0;
-				value.f0 = value.f1;
+				T tempVal = value.getVertexId0();
+				value.setVertexId0(value.getVertexId1());
 
-				if (tempVal.compareTo(value.f2) < 0) {
-					value.f1 = tempVal;
+				if (tempVal.compareTo(value.getVertexId2()) < 0) {
+					value.setVertexId1(tempVal);
 
 					int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
 					int f0f2 = (bitmask & 0b001100) >>> 2;
 					int f1f2 = (bitmask & 0b000011) << 2;
 
-					value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
+					value.setBitmask((byte) (f0f1 | f0f2 | f1f2));
 				} else {
-					value.f1 = value.f2;
-					value.f2 = tempVal;
+					value.setVertexId1(value.getVertexId2());
+					value.setVertexId2(tempVal);
 
 					int f0f1 = (bitmask & 0b000011) << 4;
 					int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1);
 					int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1);
 
-					value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
+					value.setBitmask((byte) (f0f1 | f0f2 | f1f2));
 				}
 			}
 
@@ -409,49 +351,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	/**
-	 * Wraps {@link Tuple4} to encapsulate results from the directed Triangle Listing algorithm.
+	 * A result for the directed Triangle Listing algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Tuple4<T, T, T, ByteValue>
-	implements PrintableResult, TertiaryResult<T> {
-		/**
-		 * No-args constructor.
-		 */
-		public Result() {
-			f3 = new ByteValue();
-		}
-
-		@Override
-		public T getVertexId0() {
-			return f0;
-		}
-
-		@Override
-		public void setVertexId0(T value) {
-			f0 = value;
-		}
-
-		@Override
-		public T getVertexId1() {
-			return f1;
-		}
-
-		@Override
-		public void setVertexId1(T value) {
-			f1 = value;
-		}
-
-		@Override
-		public T getVertexId2() {
-			return f2;
-		}
-
-		@Override
-		public void setVertexId2(T value) {
-			f2 = value;
-		}
+	extends TertiaryResultBase<T>
+	implements PrintableResult {
+		private ByteValue bitmask = new ByteValue();
 
 		/**
 		 * Get the bitmask indicating the presence of the six potential
@@ -462,16 +369,37 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @see EdgeOrder
 		 */
 		public ByteValue getBitmask() {
-			return f3;
+			return bitmask;
 		}
 
 		/**
-		 * Format values into a human-readable string.
+		 * Set the bitmask indicating the presence of the six potential
+		 * connecting edges.
 		 *
-		 * @return verbose string
+		 * @param bitmask the edge bitmask
+		 *
+		 * @see EdgeOrder
 		 */
+		public void setBitmask(ByteValue bitmask) {
+			this.bitmask = bitmask;
+		}
+
+		private void setBitmask(byte bitmask) {
+			this.bitmask.setValue(bitmask);
+		}
+
+		@Override
+		public String toString() {
+			return "(" + getVertexId0()
+				+ "," + getVertexId1()
+				+ "," + getVertexId2()
+				+ "," + bitmask
+				+ ")";
+		}
+
+		@Override
 		public String toPrintableString() {
-			byte bitmask = f3.getValue();
+			byte bitmask = getBitmask().getValue();
 
 			return "1st vertex ID: " + getVertexId0()
 				+ ", 2nd vertex ID: " + getVertexId1()
@@ -492,8 +420,28 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				return "<->";
 			} else {
 				throw new IllegalArgumentException("Bitmask is missing an edge (mask = "
-					+ mask + ", shift = " + shift);
+					+ mask + ", shift = " + shift + ")");
 			}
 		}
+
+		// ----------------------------------------------------------------------------------------
+
+		public static final int HASH_SEED = 0x0846ea21;
+
+		private transient MurmurHash hasher;
+
+		@Override
+		public int hashCode() {
+			if (hasher == null) {
+				hasher = new MurmurHash(HASH_SEED);
+			}
+
+			return hasher.reset()
+				.hash(getVertexId0().hashCode())
+				.hash(getVertexId1().hashCode())
+				.hash(getVertexId2().hashCode())
+				.hash(bitmask.getValue())
+				.hash();
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/98a15004/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index e94310f..3cb4b87 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -27,14 +27,14 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.asm.result.UnaryResultBase;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
 import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
@@ -99,12 +99,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	@Override
-	protected String getAlgorithmName() {
-		return LocalClusteringCoefficient.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		Preconditions.checkNotNull(other);
 
 		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
@@ -186,13 +181,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public void flatMap(TriangleListing.Result<T> value, Collector<Tuple2<T, LongValue>> out)
 				throws Exception {
-			output.f0 = value.f0;
+			output.f0 = value.getVertexId0();
 			out.collect(output);
 
-			output.f0 = value.f1;
+			output.f0 = value.getVertexId1();
 			out.collect(output);
 
-			output.f0 = value.f2;
+			output.f0 = value.getVertexId2();
 			out.collect(output);
 		}
 	}
@@ -218,8 +213,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFieldsFirst("0; 1")
-	@ForwardedFieldsSecond("0")
+	@ForwardedFieldsFirst("0->vertexId0; 1->degree")
+	@ForwardedFieldsSecond("0->vertexId0")
 	private static class JoinVertexDegreeWithTriangleCount<T>
 	implements JoinFunction<Vertex<T, LongValue>, Tuple2<T, LongValue>, Result<T>> {
 		private LongValue zero = new LongValue(0);
@@ -229,35 +224,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public Result<T> join(Vertex<T, LongValue> vertexAndDegree, Tuple2<T, LongValue> vertexAndTriangleCount)
 				throws Exception {
-			output.f0 = vertexAndDegree.f0;
-			output.f1 = vertexAndDegree.f1;
-			output.f2 = (vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1;
+			output.setVertexId0(vertexAndDegree.f0);
+			output.setDegree(vertexAndDegree.f1);
+			output.setTriangleCount((vertexAndTriangleCount == null) ? zero : vertexAndTriangleCount.f1);
 
 			return output;
 		}
 	}
 
 	/**
-	 * Wraps {@link Tuple3} to encapsulate results from the Local Clustering Coefficient algorithm.
+	 * A result for the undirected Local Clustering Coefficient algorithm.
 	 *
 	 * @param <T> ID type
 	 */
 	public static class Result<T>
-	extends Tuple3<T, LongValue, LongValue>
-	implements PrintableResult, UnaryResult<T> {
-		private static final int HASH_SEED = 0xc23937c1;
-
-		private MurmurHash hasher = new MurmurHash(HASH_SEED);
-
-		@Override
-		public T getVertexId0() {
-			return f0;
-		}
+	extends UnaryResultBase<T>
+	implements PrintableResult {
+		private LongValue degree;
 
-		@Override
-		public void setVertexId0(T value) {
-			f0 = value;
-		}
+		private LongValue triangleCount;
 
 		/**
 		 * Get the vertex degree.
@@ -265,7 +250,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return vertex degree
 		 */
 		public LongValue getDegree() {
-			return f1;
+			return degree;
+		}
+
+		/**
+		 * Set the vertex degree.
+		 *
+		 * @param degree vertex degree
+		 */
+		public void setDegree(LongValue degree) {
+			this.degree = degree;
 		}
 
 		/**
@@ -275,7 +269,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * @return triangle count
 		 */
 		public LongValue getTriangleCount() {
-			return f2;
+			return triangleCount;
+		}
+
+		/**
+		 * Set the number of triangles containing this vertex; equivalently,
+		 * this is the number of edges between neighbors of this vertex.
+		 *
+		 * @param triangleCount triangle count
+		 */
+		public void setTriangleCount(LongValue triangleCount) {
+			this.triangleCount = triangleCount;
 		}
 
 		/**
@@ -295,24 +299,43 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
 		}
 
+		@Override
+		public String toString() {
+			return "(" + getVertexId0()
+				+ "," + degree
+				+ "," + triangleCount
+				+ ")";
+		}
+
 		/**
 		 * Format values into a human-readable string.
 		 *
 		 * @return verbose string
 		 */
+		@Override
 		public String toPrintableString() {
 			return "Vertex ID: " + getVertexId0()
-				+ ", vertex degree: " + getDegree()
-				+ ", triangle count: " + getTriangleCount()
+				+ ", vertex degree: " + degree
+				+ ", triangle count: " + triangleCount
 				+ ", local clustering coefficient: " + getLocalClusteringCoefficientScore();
 		}
 
+		// ----------------------------------------------------------------------------------------
+
+		public static final int HASH_SEED = 0xc23937c1;
+
+		private transient MurmurHash hasher;
+
 		@Override
 		public int hashCode() {
+			if (hasher == null) {
+				hasher = new MurmurHash(HASH_SEED);
+			}
+
 			return hasher.reset()
-				.hash(f0.hashCode())
-				.hash(f1.getValue())
-				.hash(f2.getValue())
+				.hash(getVertexId0().hashCode())
+				.hash(degree.getValue())
+				.hash(triangleCount.getValue())
 				.hash();
 		}
 	}


Mime
View raw message