flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [3/4] flink git commit: [FLINK-6393] [gelly] Add Circulant and Echo graph generators
Date Thu, 11 May 2017 17:35:57 GMT
[FLINK-6393] [gelly] Add Circulant and Echo graph generators

This closes #3802


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfaecda3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfaecda3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfaecda3

Branch: refs/heads/release-1.3
Commit: cfaecda3ac9a736213f8bfe643b8f57ce492e243
Parents: 9c1b90a
Author: FlorianFan <fanzhidongyzby@163.com>
Authored: Thu Apr 27 20:41:53 2017 +0800
Committer: Greg Hogan <code@greghogan.com>
Committed: Thu May 11 12:44:14 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/graph_generators.md         |  91 ++++++-
 .../java/org/apache/flink/graph/Runner.java     |  11 +-
 .../graph/drivers/input/CirculantGraph.java     | 129 ++++++++++
 .../flink/graph/drivers/input/EchoGraph.java    |  67 +++++
 .../flink/graph/drivers/input/GridGraph.java    |   3 +-
 .../flink/graph/drivers/EdgeListITCase.java     |  90 ++++++-
 .../flink/graph/generator/CirculantGraph.java   | 246 +++++++++++++++++++
 .../flink/graph/generator/CompleteGraph.java    |  57 +----
 .../apache/flink/graph/generator/EchoGraph.java |  82 +++++++
 .../apache/flink/graph/generator/GridGraph.java |   4 +-
 .../graph/generator/CirculantGraphTest.java     |  88 +++++++
 .../flink/graph/generator/EchoGraphTest.java    | 128 ++++++++++
 12 files changed, 926 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/docs/dev/libs/gelly/graph_generators.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/graph_generators.md b/docs/dev/libs/gelly/graph_generators.md
index 2532ee4..d4ad229 100644
--- a/docs/dev/libs/gelly/graph_generators.md
+++ b/docs/dev/libs/gelly/graph_generators.md
@@ -72,6 +72,42 @@ val graph = new GridGraph(env.getJavaEnv).addDimension(2, wrapEndpoints).addDime
 </div>
 </div>
 
+## Circulant Graph
+
+A [circulant graph](http://mathworld.wolfram.com/CirculantGraph.html) is an
+[oriented graph](http://mathworld.wolfram.com/OrientedGraph.html) configured
+with one or more contiguous ranges of offsets. Edges connect integer vertex IDs
+whose difference equals a configured offset. The circulant graph with no offsets
+is the [empty graph](#empty-graph) and the graph with the maximum range is the
+[complete graph](#complete-graph).
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+
+Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, vertexCount)
+    .addRange(1, 2)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.CirculantGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+
+val graph = new CirculantGraph(env.getJavaEnv, vertexCount).addRange(1, 2).generate()
+{% endhighlight %}
+</div>
+</div>
+
 ## Complete Graph
 
 An undirected graph connecting every distinct pair of vertices.
@@ -83,7 +119,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new CompleteGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -148,7 +184,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new CycleGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -193,6 +229,41 @@ val graph = new CycleGraph(env.getJavaEnv, vertexCount).generate()
     <text x="51" y="199">4</text>
 </svg>
 
+## Echo Graph
+
+An [echo graph](http://mathworld.wolfram.com/EchoGraph.html) is a
+[circulant graph](#circulant-graph) with `n` vertices defined by the width of a
+single range of offsets centered at `n/2`. A vertex is connected to 'far'
+vertices, which connect to 'near' vertices, which connect to 'far' vertices, ....
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+long vertexCount = 5;
+long vertexDegree = 2;
+
+Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree)
+    .generate();
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import org.apache.flink.api.scala._
+import org.apache.flink.graph.generator.EchoGraph
+
+val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment
+
+val vertexCount = 5
+val vertexDegree = 2
+
+val graph = new EchoGraph(env.getJavaEnv, vertexCount, vertexDegree).generate()
+{% endhighlight %}
+</div>
+</div>
+
 ## Empty Graph
 
 A graph containing no edges.
@@ -204,7 +275,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5;
 
-Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new EmptyGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -257,7 +328,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 boolean wrapEndpoints = false;
 
-Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
     .addDimension(2, wrapEndpoints)
     .addDimension(4, wrapEndpoints)
     .generate();
@@ -327,7 +398,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long dimensions = 3;
 
-Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, dimensions)
+Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions)
     .generate();
 {% endhighlight %}
 </div>
@@ -403,7 +474,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 5
 
-Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new PathGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -464,7 +535,7 @@ RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory()
 int vertexCount = 1 << scale;
 int edgeCount = edgeFactor * vertexCount;
 
-Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount,
edgeCount)
+Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount,
edgeCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -505,7 +576,7 @@ int edgeCount = edgeFactor * vertexCount;
 
 boolean clipAndFlip = false;
 
-Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount,
edgeCount)
+Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount,
edgeCount)
     .setConstants(0.57f, 0.19f, 0.19f)
     .setNoise(true, 0.10f)
     .generate();
@@ -542,7 +613,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 long vertexPairCount = 4
 
 // note: configured with the number of vertex pairs
-Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+Graph<LongValue, NullValue, NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
     .generate();
 {% endhighlight %}
 </div>
@@ -607,7 +678,7 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 long vertexCount = 6;
 
-Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+Graph<LongValue, NullValue, NullValue> graph = new StarGraph(env, vertexCount)
     .generate();
 {% endhighlight %}
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index 5ffe681..07cad1f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -35,8 +35,10 @@ import org.apache.flink.graph.drivers.HITS;
 import org.apache.flink.graph.drivers.JaccardIndex;
 import org.apache.flink.graph.drivers.PageRank;
 import org.apache.flink.graph.drivers.TriangleListing;
+import org.apache.flink.graph.drivers.input.CirculantGraph;
 import org.apache.flink.graph.drivers.input.CompleteGraph;
 import org.apache.flink.graph.drivers.input.CycleGraph;
+import org.apache.flink.graph.drivers.input.EchoGraph;
 import org.apache.flink.graph.drivers.input.EmptyGraph;
 import org.apache.flink.graph.drivers.input.GridGraph;
 import org.apache.flink.graph.drivers.input.HypercubeGraph;
@@ -76,9 +78,11 @@ public class Runner {
 	private static final String OUTPUT = "output";
 
 	private static ParameterizedFactory<Input> inputFactory = new ParameterizedFactory<Input>()
+		.addClass(CirculantGraph.class)
 		.addClass(CompleteGraph.class)
 		.addClass(org.apache.flink.graph.drivers.input.CSV.class)
 		.addClass(CycleGraph.class)
+		.addClass(EchoGraph.class)
 		.addClass(EmptyGraph.class)
 		.addClass(GridGraph.class)
 		.addClass(HypercubeGraph.class)
@@ -236,7 +240,12 @@ public class Runner {
 
 		// Input
 
-		input.configure(parameters);
+		try {
+			input.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
+
 		Graph graph = input.create(env);
 
 		// Algorithm

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
new file mode 100644
index 0000000..14ee816
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.client.program.ProgramParametrizationException;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.graph.generator.CirculantGraph.OffsetRange;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;
+
+/**
+ * Generate a {@link org.apache.flink.graph.generator.CirculantGraph}.
+ */
+public class CirculantGraph
+extends GeneratedGraph<LongValue> {
+
+	private static final String PREFIX = "range";
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	private List<OffsetRange> offsetRanges = new ArrayList<>();
+
+	@Override
+	public String getName() {
+		return CirculantGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getUsage() {
+		return "--" + PREFIX + "0 offset:length [--" + PREFIX + "1 offset:length [--" + PREFIX
+ "2 ...]]"
+			+ super.getUsage();
+	}
+
+	@Override
+	public void configure(ParameterTool parameterTool) throws ProgramParametrizationException
{
+		super.configure(parameterTool);
+
+		// add offset ranges as ordered by offset ID (range0, range1, range2, ...)
+
+		Map<Integer, String> offsetRangeMap = new TreeMap<>();
+
+		// first parse all offset ranges into a sorted map
+		for (String key : parameterTool.toMap().keySet()) {
+			if (key.startsWith(PREFIX)) {
+				int offsetId = Integer.parseInt(key.substring(PREFIX.length()));
+				offsetRangeMap.put(offsetId, parameterTool.get(key));
+			}
+		}
+
+		// then store offset ranges in order
+		for (String field : offsetRangeMap.values()) {
+			ProgramParametrizationException exception = new ProgramParametrizationException("Circulant
offset range" +
+				" must use a colon to separate the integer offset and integer length:" + field + "'");
+
+			if (! field.contains(":")) {
+				throw exception;
+			}
+
+			String[] parts = field.split(":");
+
+			if (parts.length != 2) {
+				throw exception;
+			}
+
+			try {
+				long offset = Long.parseLong(parts[0]);
+				long length = Long.parseLong(parts[1]);
+				offsetRanges.add(new OffsetRange(offset, length));
+			} catch (NumberFormatException ex) {
+				throw exception;
+			}
+		}
+	}
+
+	@Override
+	public String getIdentity() {
+		return getTypeName() + " " + getName() + " (" + offsetRanges + ")";
+	}
+
+	@Override
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
+		org.apache.flink.graph.generator.CirculantGraph graph = new org.apache.flink.graph.generator.CirculantGraph(env,
+			vertexCount.getValue());
+
+		for (OffsetRange offsetRange : offsetRanges) {
+			graph.addRange(offsetRange.getOffset(), offsetRange.getLength());
+		}
+
+		return graph
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
new file mode 100644
index 0000000..c9b0874
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.drivers.input;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT;
+import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE;
+
+/**
+ * Generate an {@link org.apache.flink.graph.generator.EchoGraph}.
+ */
+public class EchoGraph
+extends GeneratedGraph<LongValue> {
+
+	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
+		.setMinimumValue(MINIMUM_VERTEX_COUNT);
+
+	private LongParameter vertexDegree = new LongParameter(this, "vertex_degree")
+		.setMinimumValue(MINIMUM_VERTEX_DEGREE);
+
+	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
+	@Override
+	public String getName() {
+		return EchoGraph.class.getSimpleName();
+	}
+
+	@Override
+	public String getIdentity() {
+		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue()
+ ")";
+	}
+
+	@Override
+	protected long vertexCount() {
+		return vertexCount.getValue();
+	}
+
+	@Override
+	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env)
throws Exception {
+		return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue())
+			.setParallelism(littleParallelism.getValue().intValue())
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index b41b86e..2ce3c77 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -54,7 +54,8 @@ extends GeneratedGraph<LongValue> {
 
 	@Override
 	public String getUsage() {
-		return "--dim0 size:wrap_endpoints [--dim1 size:wrap_endpoints [--dim2 ...]]" + super.getUsage();
+		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--"
+ PREFIX + " ...]]"
+			+ super.getUsage();
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
index f566218..d3ba4fb 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/drivers/EdgeListITCase.java
@@ -44,6 +44,50 @@ extends DriverBaseITCase {
 	}
 
 	@Test
+	public void testHashWithCirculantGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000000344448L;
+				break;
+
+			case "long":
+				checksum = 0x0000000000a19d48L;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x000000000c47ca48L;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("CirculantGraph", "hash", "--vertex_count", "42", "--range0", "13:4"),
+			168, checksum);
+	}
+
+	@Test
+	public void testPrintWithCirculantGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("CirculantGraph", "print", "--vertex_count", "42", "--range0", "13:4"),
+			new Checksum(168, 0x0000004bdcc52cbcL));
+	}
+
+	@Test
 	public void testLongDescription() throws Exception {
 		String expected = regexSubstring(new EdgeList().getLongDescription());
 
@@ -142,10 +186,54 @@ extends DriverBaseITCase {
 	}
 
 	@Test
+	public void testHashWithEchoGraph() throws Exception {
+		long checksum;
+		switch (idType) {
+			case "byte":
+			case "nativeByte":
+			case "short":
+			case "nativeShort":
+			case "char":
+			case "nativeChar":
+			case "integer":
+			case "nativeInteger":
+			case "nativeLong":
+				checksum = 0x0000000000a9ddeaL;
+				break;
+
+			case "long":
+				checksum = 0x00000000020d3f2aL;
+				break;
+
+			case "string":
+			case "nativeString":
+				checksum = 0x0000000027e9516aL;
+				break;
+
+			default:
+				throw new IllegalArgumentException("Unknown type: " + idType);
+		}
+
+		expectedChecksum(
+			parameters("EchoGraph", "hash", "--vertex_count", "42", "--vertex_degree", "13"),
+			546, checksum);
+	}
+
+	@Test
+	public void testPrintWithEchoGraph() throws Exception {
+		// skip 'char' since it is not printed as a number
+		Assume.assumeFalse(idType.equals("char") || idType.equals("nativeChar"));
+
+		expectedOutputChecksum(
+			parameters("EchoGraph", "print", "--vertex_count", "42", "--vertex_degree", "13"),
+			new Checksum(546, 0x000000f7190b8fcaL));
+	}
+
+	@Test
 	public void testHashWithEmptyGraph() throws Exception {
 		expectedChecksum(
 			parameters("EmptyGraph", "hash", "--vertex_count", "42"),
-			0, 0x0000000000000000);
+			0, 0x0000000000000000L);
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
new file mode 100644
index 0000000..9569b74
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * @see <a href="http://mathworld.wolfram.com/CirculantGraph.html">Circulant Graph
at Wolfram MathWorld</a>
+ */
+public class CirculantGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
+	public static final int MINIMUM_OFFSET = 1;
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	private List<OffsetRange> offsetRanges = new ArrayList<>();
+
+	/**
+	 * An oriented {@link Graph} with {@code n} vertices where each vertex
+	 * v<sub>i</sub> is connected to vertex v<sub>(i+j)%n</sub> for
each
+	 * configured offset {@code j}.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 */
+	public CirculantGraph(ExecutionEnvironment env, long vertexCount) {
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+	}
+
+	/**
+	 * Required configuration for each range of offsets in the graph.
+	 *
+	 * @param offset first offset appointing the vertices' position
+	 * @param length number of contiguous offsets in range
+	 * @return this
+	 */
+	public CirculantGraph addRange(long offset, long length) {
+		Preconditions.checkArgument(offset >= MINIMUM_OFFSET,
+			"Range offset must be at least " + MINIMUM_OFFSET);
+		Preconditions.checkArgument(length <= vertexCount - offset,
+			"Range length must not be greater than the vertex count minus the range offset.");
+
+		offsetRanges.add(new OffsetRange(offset, length));
+
+		return this;
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate() {
+		// Vertices
+		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env,
parallelism, vertexCount);
+
+		// Edges
+		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount
- 1);
+
+		// Validate ranges
+		Collections.sort(offsetRanges);
+		Iterator<OffsetRange> iter = offsetRanges.iterator();
+		OffsetRange lastRange = iter.next();
+
+		while (iter.hasNext()) {
+			OffsetRange nextRange = iter.next();
+
+			if (lastRange.overlaps(nextRange)) {
+				throw new IllegalArgumentException("Overlapping ranges " + lastRange + " and " + nextRange);
+			}
+
+			lastRange = nextRange;
+		}
+
+		DataSet<Edge<LongValue, NullValue>> edges = env
+			.fromParallelCollection(iterator, LongValue.class)
+				.setParallelism(parallelism)
+				.name("Edge iterators")
+			.flatMap(new LinkVertexToOffsets(vertexCount, offsetRanges))
+				.setParallelism(parallelism)
+				.name("Circulant graph edges");
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, env);
+	}
+
+	@FunctionAnnotation.ForwardedFields("*->f0")
+	private static class LinkVertexToOffsets
+	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
+		private final long vertexCount;
+
+		private final List<OffsetRange> offsetRanges;
+
+		private LongValue target = new LongValue();
+
+		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
+
+		public LinkVertexToOffsets(long vertexCount, List<OffsetRange> offsetRanges) {
+			this.vertexCount = vertexCount;
+			this.offsetRanges = offsetRanges;
+		}
+
+		@Override
+		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>>
out)
+				throws Exception {
+			edge.f0 = source;
+			long sourceID = source.getValue();
+
+			for (OffsetRange offsetRange : offsetRanges) {
+				long targetID = sourceID + offsetRange.getOffset();
+
+				for (long i = offsetRange.getLength(); i > 0; i--) {
+					// add positive offset
+					target.setValue(targetID++ % vertexCount);
+					out.collect(edge);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Stores the start offset and length configuration for an offset range.
+	 */
+	public static class OffsetRange implements Serializable, Comparable<OffsetRange> {
+		private long offset;
+
+		private long length;
+
+		/**
+		 * Construct a range with the given offset and length.
+		 *
+		 * @param offset the range offset
+		 * @param length the range length
+		 */
+		public OffsetRange(long offset, long length) {
+			this.offset = offset;
+			this.length = length;
+		}
+
+		/**
+		 * Get the range offset
+		 *
+		 * @return the offset
+		 */
+		public long getOffset() {
+			return offset;
+		}
+
+		/**
+		 * Get the range length
+		 *
+		 * @return the length
+		 */
+		public long getLength() {
+			return length;
+		}
+
+		/**
+		 * Get the offset of the last index in the range
+		 *
+		 * @return last offset
+		 */
+		public long getLastOffset() {
+			return offset + length - 1;
+		}
+
+		/**
+		 * Return true if and only if the other range and this range share a
+		 * common offset ID.
+		 *
+		 * @param other other range
+		 * @return whether ranges are overlapping
+		 */
+		public boolean overlaps(OffsetRange other) {
+			boolean overlapping = false;
+
+			long lastOffset = getLastOffset();
+			long otherLastOffset = other.getLastOffset();
+
+			// check whether this range contains other
+			overlapping |= (offset <= other.offset && other.offset <= lastOffset);
+			overlapping |= (offset <= otherLastOffset && otherLastOffset <= lastOffset);
+
+			// check whether other contains this range
+			overlapping |= (other.offset <= offset && offset <= otherLastOffset);
+			overlapping |= (other.offset <= lastOffset && lastOffset <= otherLastOffset);
+
+			return overlapping;
+		}
+
+		@Override
+		public String toString() {
+			return Long.toString(offset) + ":" + Long.toString(length);
+		}
+
+		@Override
+		public int compareTo(OffsetRange o) {
+			int cmp = Long.compare(offset, o.offset);
+			if (cmp != 0) {
+				return cmp;
+			}
+
+			return Long.compare(length, o.length);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index 11c0bb0..9dabe56 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -18,17 +18,10 @@
 
 package org.apache.flink.graph.generator;
 
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.LongValueSequenceIterator;
 import org.apache.flink.util.Preconditions;
 
 /**
@@ -61,53 +54,9 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
-		// Vertices
-		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env,
parallelism, vertexCount);
-
-		// Edges
-		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, this.vertexCount
- 1);
-
-		DataSet<Edge<LongValue, NullValue>> edges = env
-			.fromParallelCollection(iterator, LongValue.class)
+		return new CirculantGraph(env, vertexCount)
+			.addRange(1, vertexCount - 1)
 				.setParallelism(parallelism)
-				.name("Edge iterators")
-			.flatMap(new LinkVertexToAll(vertexCount))
-				.setParallelism(parallelism)
-				.name("Complete graph edges");
-
-		// Graph
-		return Graph.fromDataSet(vertices, edges, env);
-	}
-
-	@ForwardedFields("*->f0")
-	private static class LinkVertexToAll
-	implements FlatMapFunction<LongValue, Edge<LongValue, NullValue>> {
-		private final long vertexCount;
-
-		private LongValue target = new LongValue();
-
-		private Edge<LongValue, NullValue> edge = new Edge<>(null, target, NullValue.getInstance());
-
-		public LinkVertexToAll(long vertexCount) {
-			this.vertexCount = vertexCount;
-		}
-
-		@Override
-		public void flatMap(LongValue source, Collector<Edge<LongValue, NullValue>>
out)
-				throws Exception {
-			edge.f0 = source;
-
-			long s = source.getValue();
-			long t = (s + 1) % vertexCount;
-
-			while (s != t) {
-				target.setValue(t);
-				out.collect(edge);
-
-				if (++t == vertexCount) {
-					t = 0;
-				}
-			}
-		}
+				.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
new file mode 100644
index 0000000..c15cdca
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * A {@link CirculantGraph} with {@code n} vertices defined by the width of a
+ * single range of offsets centered at {@code n/2}. A vertex is connected to
+ * 'far' vertices, which connect to 'near' vertices, which connect to 'far'
+ * vertices, ....
+ * <p>
+ * Every {@link Vertex} in the {@link EchoGraph} has the same degree.
+ * and vertices as far as possible are chose to be linked.
+ * {@link EchoGraph} is a specific case of {@link CirculantGraph}.
+ */
+public class EchoGraph
+extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
+
+	public static final int MINIMUM_VERTEX_COUNT = 2;
+
+	public static final int MINIMUM_VERTEX_DEGREE = 1;
+
+	// Required to create the DataSource
+	private final ExecutionEnvironment env;
+
+	// Required configuration
+	private long vertexCount;
+
+	private long vertexDegree;
+
+	/**
+	 * An undirected {@link Graph} whose vertices have the same degree.
+	 *
+	 * @param env the Flink execution environment
+	 * @param vertexCount number of vertices
+	 * @param vertexDegree degree of vertices
+	 */
+	public EchoGraph(ExecutionEnvironment env, long vertexCount, long vertexDegree) {
+		Preconditions.checkArgument(vertexCount >= MINIMUM_VERTEX_COUNT,
+			"Vertex count must be at least " + MINIMUM_VERTEX_COUNT);
+		Preconditions.checkArgument(vertexDegree >= MINIMUM_VERTEX_DEGREE,
+			"Vertex degree must be at least " + MINIMUM_VERTEX_DEGREE);
+		Preconditions.checkArgument(vertexDegree < vertexCount,
+			"Vertex degree must be less than the vertex count.");
+		Preconditions.checkArgument(vertexCount % 2 == 0 ^ vertexDegree % 2 == 0,
+			"Vertex count or vertex degree must be an even number but not both.");
+
+		this.env = env;
+		this.vertexCount = vertexCount;
+		this.vertexDegree = vertexDegree;
+	}
+
+	@Override
+	public Graph<LongValue, NullValue, NullValue> generate() {
+		return new CirculantGraph(env, vertexCount)
+			.addRange((vertexCount - vertexDegree + 1) / 2, vertexDegree)
+			.setParallelism(parallelism)
+			.generate();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index 23a6f98..0570dd2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -85,9 +85,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
-		if (dimensions.isEmpty()) {
-			throw new RuntimeException("No dimensions added to GridGraph");
-		}
+		Preconditions.checkState(!dimensions.isEmpty(), "No dimensions added to GridGraph");
 
 		// Vertices
 		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env,
parallelism, vertexCount);

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
new file mode 100644
index 0000000..aae88ca
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CirculantGraphTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class CirculantGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(4, 3)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" +
+			"2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" +
+			"5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" +
+			"8,2; 8,3; 8,4; 9,3; 9,4; 9,5";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 10;
+		int offset = 4;
+		int length = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(offset, length)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(vertexCount * length, graph.numberOfEdges());
+
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
+
+		assertEquals(length, maxInDegree);
+		assertEquals(length, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new CirculantGraph(env, 10)
+			.addRange(4, 2)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cfaecda3/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
new file mode 100644
index 0000000..777b576
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EchoGraphTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import static org.junit.Assert.assertEquals;
+
+public class EchoGraphTest
+extends AbstractGraphTest {
+
+	@Rule
+	public ExpectedException thrown = ExpectedException.none();
+
+	@Test
+	public void testGraphWithEvenVertexCountWithOddVertexDegree()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,4; 0,5; 0,6; 1,5; 1,6; 1,7; 2,6;" +
+			"2,7; 2,8; 3,7; 3,8; 3,9; 4,0; 4,8; 4,9;" +
+			"5,0; 5,1; 5,9; 6,0; 6,1; 6,2; 7,1; 7,2; 7,3;" +
+			"8,2; 8,3; 8,4; 9,3; 9,4; 9,5";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphWithOddVertexCountWithEvenVertexDegree()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 9, 2)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8";
+		String edges = "0,4; 0,5; 1,5; 1,6; 2,6; 2,7;" +
+			"3,7; 3,8; 4,8; 4,0; 5,0; 5,1;" +
+			"6,1; 6,2; 7,2; 7,3; 8,3; 8,4";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphWithOddVertexCountWithOddVertexDegree()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex count or vertex degree must be an even number but not both.");
+
+		new EchoGraph(env, 5, 3).generate();
+	}
+
+	@Test
+	public void testGraphWithEvenVertexCountWithEvenVertexDegree()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex count or vertex degree must be an even number but not both.");
+
+		new EchoGraph(env, 6, 2).generate();
+	}
+
+	@Test
+	public void testGraphWithVertexDegreeTooLarge()
+			throws Exception {
+		thrown.expect(IllegalArgumentException.class);
+		thrown.expectMessage("Vertex degree must be less than the vertex count.");
+
+		new EchoGraph(env, 8, 8).generate();
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 10;
+		int vertexDegree = 3;
+
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, vertexCount, vertexDegree)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(vertexCount * vertexDegree, graph.numberOfEdges());
+
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
+
+		assertEquals(vertexDegree, maxInDegree);
+		assertEquals(vertexDegree, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue, NullValue, NullValue> graph = new EchoGraph(env, 10, 3)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue, NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue, NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}


Mime
View raw message