flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject [1/2] flink git commit: [FLINK-2909] [gelly] Graph Generators
Date Wed, 13 Apr 2016 18:29:11 GMT
Repository: flink
Updated Branches:
  refs/heads/master 5350bc48a -> b0a7a1b81


http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
new file mode 100644
index 0000000..3bb904e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This base class handles the task of dividing the requested work into the
+ * appropriate number of blocks of near-equal size.
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public abstract class AbstractGeneratorFactory<T extends RandomGenerator>
+implements RandomGenerableFactory<T> {
+
+	// A large computation will run in parallel but blocks are generated on
+	// and distributed from a single node. This limit should be greater
+	// than the maximum expected parallelism.
+	public static final int MAXIMUM_BLOCK_COUNT = 1 << 20;
+
+	// This should be sufficiently large relative to the cost of instantiating
+	// and initializing the random generator and sufficiently small relative to
+	// the cost of generating random values.
+	protected abstract int getMinimumCyclesPerBlock();
+
+	protected abstract RandomGenerable<T> next();
+
+	@Override
+	public List<BlockInfo<T>> getRandomGenerables(long elementCount, int cyclesPerElement) {
+		long cycles = elementCount * cyclesPerElement;
+		int blockCount = Math.min((int) Math.ceil(cycles / (float) getMinimumCyclesPerBlock()), MAXIMUM_BLOCK_COUNT);
+
+		long elementsPerBlock = elementCount / blockCount;
+		long elementRemainder = elementCount % blockCount;
+
+		List<BlockInfo<T>> blocks = new ArrayList<>(blockCount);
+		long blockStart = 0;
+
+		for (int blockIndex = 0 ; blockIndex < blockCount ; blockIndex++) {
+			if (blockIndex == blockCount - elementRemainder) {
+				elementsPerBlock++;
+			}
+
+			RandomGenerable<T> randomGenerable = next();
+
+			blocks.add(new BlockInfo<>(randomGenerable, blockIndex, blockCount, blockStart, elementsPerBlock));
+
+			blockStart += elementsPerBlock;
+		}
+
+		return blocks;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
new file mode 100644
index 0000000..5e30a3f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+/**
+ * Defines a source of randomness and a unit of work.
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public class BlockInfo<T extends RandomGenerator> {
+
+	private final RandomGenerable<T> randomGenerable;
+
+	private final int blockIndex;
+
+	private final int blockCount;
+
+	private final long firstElement;
+
+	private final long elementCount;
+
+	public BlockInfo(RandomGenerable<T> randomGenerable, int blockIndex, int blockCount, long firstElement, long elementCount) {
+		this.randomGenerable = randomGenerable;
+		this.blockIndex = blockIndex;
+		this.blockCount = blockCount;
+		this.firstElement = firstElement;
+		this.elementCount = elementCount;
+	}
+
+	/**
+	 * @return the source of randomness
+	 */
+	public RandomGenerable<T> getRandomGenerable() {
+		return randomGenerable;
+	}
+
+	/**
+	 * @return the index of this block within the list of blocks
+	 */
+	public int getBlockIndex() {
+		return blockIndex;
+	}
+
+	/**
+	 * @return the total number of blocks
+	 */
+	public int getBlockCount() {
+		return blockCount;
+	}
+
+	/**
+	 * @return the index of the first element in this block
+	 */
+	public long getFirstElement() {
+		return firstElement;
+	}
+
+	/**
+	 * @return the total number of elements across all blocks
+	 */
+	public long getElementCount() {
+		return elementCount;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
new file mode 100644
index 0000000..2024cae
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+
+/**
+ * Uses a seeded {@link JDKRandomGenerator} to generate seeds for the
+ * distributed collection of {@link JDKRandomGenerator}.
+ */
+public class JDKRandomGeneratorFactory
+extends AbstractGeneratorFactory<JDKRandomGenerator> {
+
+	public static final long DEFAULT_SEED = 0x4b6f7e18198de7a4L;
+
+	public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20;
+
+	private final JDKRandomGenerator random = new JDKRandomGenerator();
+
+	public JDKRandomGeneratorFactory() {
+		this(DEFAULT_SEED);
+	}
+
+	public JDKRandomGeneratorFactory(long seed) {
+		random.setSeed(seed);
+	}
+
+	@Override
+	protected int getMinimumCyclesPerBlock() {
+		return MINIMUM_CYCLES_PER_BLOCK;
+	}
+
+	@Override
+	protected JDKRandomGenerable next() {
+		return new JDKRandomGenerable(random.nextLong());
+	}
+
+	private static class JDKRandomGenerable
+	implements RandomGenerable<JDKRandomGenerator> {
+
+		private final long seed;
+
+		public JDKRandomGenerable(long seed) {
+			this.seed = seed;
+		}
+
+		@Override
+		public JDKRandomGenerator generator() {
+			JDKRandomGenerator random = new JDKRandomGenerator();
+
+			random.setSeed(seed);
+
+			return random;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
new file mode 100644
index 0000000..22a7b04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.MersenneTwister;
+
+/**
+ * Uses a seeded {@link MersenneTwister} to generate seeds for the
+ * distributed collection of {@link MersenneTwister}.
+ */
+public class MersenneTwisterFactory
+extends AbstractGeneratorFactory<MersenneTwister> {
+
+	public static final long DEFAULT_SEED = 0x74c8cc8a58a9ceb9L;
+
+	public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20;
+
+	private final MersenneTwister random = new MersenneTwister();
+
+	public MersenneTwisterFactory() {
+		this(DEFAULT_SEED);
+	}
+
+	public MersenneTwisterFactory(long seed) {
+		random.setSeed(seed);
+	}
+
+	@Override
+	protected int getMinimumCyclesPerBlock() {
+		return MINIMUM_CYCLES_PER_BLOCK;
+	}
+
+	@Override
+	protected MersenneTwisterGenerable next() {
+		return new MersenneTwisterGenerable(random.nextLong());
+	}
+
+	private static class MersenneTwisterGenerable
+	implements RandomGenerable<MersenneTwister> {
+
+		private final long seed;
+
+		public MersenneTwisterGenerable(long seed) {
+			this.seed = seed;
+		}
+
+		@Override
+		public MersenneTwister generator() {
+			MersenneTwister random = new MersenneTwister();
+
+			random.setSeed(seed);
+
+			return random;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
new file mode 100644
index 0000000..318b508
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+/**
+ * A RandomGenerable provides deferred instantiation and initialization of a
+ * RandomGenerator. This allows pre-processing or discovery to be distributed
+ * and performed in parallel by Flink tasks.
+ *
+ * A distributed PRNG is described by Matsumoto and Takuji in
+ * "Dynamic Creation of Pseudorandom Number Generators".
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public interface RandomGenerable<T extends RandomGenerator> {
+
+	/**
+	 * Returns an initialized {@link RandomGenerator}.
+	 *
+	 * @return a {@code RandomGenerator} of type {@code T}
+	 */
+	T generator();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
new file mode 100644
index 0000000..ead29fc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator.random;
+
+import org.apache.commons.math3.random.RandomGenerator;
+
+import java.util.List;
+
+/**
+ * A {@code RandomGenerableFactory} returns a scale-free collection of sources
+ * of pseudorandomness which can be used to perform repeatable parallel
+ * computation regardless of parallelism.
+ *
+ * <pre>
+ * {@code
+ * RandomGenerableFactory<JDKRandomGenerator> factory = new JDKRandomGeneratorFactory()
+ *
+ * List<BlockInfo<T>> generatorBlocks = factory
+ *     .getRandomGenerables(elementCount, cyclesPerElement);
+ *
+ * DataSet<...> generatedEdges = env
+ *     .fromCollection(generatorBlocks)
+ *         .name("Random generators")
+ *     .flatMap(...
+ * }
+ * </pre>
+ *
+ * @param <T> the type of the {@code RandomGenerator}
+ */
+public interface RandomGenerableFactory<T extends RandomGenerator> {
+
+	/**
+	 * The amount of work ({@code elementCount * cyclerPerElement}) is used to
+	 * generate a list of blocks of work of near-equal size.
+	 *
+	 * @param elementCount number of elements, as indexed in the {@code BlockInfo}
+	 * @param cyclesPerElement number of cycles of the PRNG per element
+	 * @return the list of configuration blocks
+	 */
+	List<BlockInfo<T>> getRandomGenerables(long elementCount, int cyclesPerElement);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
new file mode 100644
index 0000000..1cac80b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.junit.Before;
+
+public class AbstractGraphTest {
+
+	protected ExecutionEnvironment env;
+
+    @Before
+    public void setup() {
+		env = ExecutionEnvironment.createCollectionsEnvironment();
+		env.getConfig().disableSysoutLogging();
+    }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
new file mode 100644
index 0000000..af47fdc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CompleteGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		int vertexCount = 4;
+
+		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+			.generate();
+
+		String vertices = "0; 1; 2; 3";
+		String edges = "0,1; 0,2; 0,3; 1,0; 1,2; 1,3; 2,0; 2,1; 2,3; 3,0; 3,1; 3,2";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 10;
+
+		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, vertexCount)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(vertexCount - 1, minInDegree);
+		assertEquals(vertexCount - 1, minOutDegree);
+		assertEquals(vertexCount - 1, maxInDegree);
+		assertEquals(vertexCount - 1, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new CompleteGraph(env, 10)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
new file mode 100644
index 0000000..fb6799b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class CycleGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, 10)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" +
+			"5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8; 9,0; 0,9";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 100;
+
+		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, vertexCount)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(2 * vertexCount, graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(2, minInDegree);
+		assertEquals(2, minOutDegree);
+		assertEquals(2, maxInDegree);
+		assertEquals(2, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new CycleGraph(env, 100)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
new file mode 100644
index 0000000..bc1ef77
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EmptyGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, 10)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = null;
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 100;
+
+		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, vertexCount)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(0, graph.numberOfEdges());
+
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(0, maxInDegree);
+		assertEquals(0, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new EmptyGraph(env, 100)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
new file mode 100644
index 0000000..f3fa7db
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GridGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
+			.addDimension(2, false)
+			.addDimension(3, false)
+			.generate();
+
+		// 0 1 2
+		// 3 4 5
+		String vertices = "0; 1; 2; 3; 4; 5";
+		String edges = "0,1; 0,3; 1,0; 1,2; 1,4; 2,1; 2,5; 3,0; 3,4; 4,1;" +
+			"4,3; 4,5; 5,2; 5,4";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = new GridGraph(env)
+			.addDimension(2, true)
+			.addDimension(3, true)
+			.addDimension(5, true)
+			.addDimension(7, true)
+			.generate();
+
+		// Each vertex is the source of one edge in the first dimension of size 2,
+		// and the source of two edges in each dimension of size greater than 2.
+		assertEquals(2*3*5*7, graph.numberOfVertices());
+		assertEquals(7 * 2*3*5*7, graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(7, minInDegree);
+		assertEquals(7, minOutDegree);
+		assertEquals(7, maxInDegree);
+		assertEquals(7, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new GridGraph(env)
+			.addDimension(3, false)
+			.addDimension(5, false)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
new file mode 100644
index 0000000..77eed89
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class HypercubeGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		int dimensions = 3;
+
+		Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7";
+		String edges = "0,1; 0,2; 0,4; 1,0; 1,3; 1,5; 2,0; 2,3; 2,6; 3,1; 3,2; 3,7;" +
+			"4,0; 4,5; 4,6; 5,1; 5,4; 5,7; 6,2; 6,4; 6,7; 7,3; 7,6; 7,5";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int dimensions = 10;
+
+		Graph<LongValue, NullValue, NullValue> graph = new HypercubeGraph(env, dimensions)
+			.generate();
+
+		assertEquals(1 << dimensions, graph.numberOfVertices());
+		assertEquals(dimensions * (1 << dimensions), graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(dimensions, minInDegree);
+		assertEquals(dimensions, minOutDegree);
+		assertEquals(dimensions, maxInDegree);
+		assertEquals(dimensions, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new HypercubeGraph(env, 4)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
new file mode 100644
index 0000000..b8a409f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class PathGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 10)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" +
+				"5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 100;
+
+		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, vertexCount)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(1, minInDegree);
+		assertEquals(1, minOutDegree);
+		assertEquals(2, maxInDegree);
+		assertEquals(2, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new PathGraph(env, 100)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
new file mode 100644
index 0000000..a06c63f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
+import org.apache.flink.graph.generator.random.RandomGenerableFactory;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+public class RMatGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		long vertexCount = 100;
+
+		long edgeCount = 1000;
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.generate();
+
+		assertTrue(vertexCount >= graph.numberOfVertices());
+		assertEquals(edgeCount, graph.numberOfEdges());
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+
+		Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, 100, 1000)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
new file mode 100644
index 0000000..3877717
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class SingletonEdgeGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		int vertexPairCount = 5;
+
+		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,1; 1,0; 2,3; 3,2; 4,5; 5,4; 6,7; 7,6; 8,9; 9,8";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexPairCount = 10;
+
+		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, vertexPairCount)
+			.generate();
+
+		assertEquals(2 * vertexPairCount, graph.numberOfVertices());
+		assertEquals(2 * vertexPairCount, graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(1, minInDegree);
+		assertEquals(1, minOutDegree);
+		assertEquals(1, maxInDegree);
+		assertEquals(1, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new SingletonEdgeGraph(env, 10)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
new file mode 100644
index 0000000..2b090db
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class StarGraphTest
+extends AbstractGraphTest {
+
+	@Test
+	public void testGraph()
+			throws Exception {
+		int vertexCount = 10;
+
+		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+			.generate();
+
+		String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9";
+		String edges = "0,1; 1,0; 0,2; 2,0; 0,3; 3,0; 0,4; 4,0; 0,5; 5,0;" +
+				"0,6; 6,0; 0,7; 7,0; 0,8; 8,0; 0,9; 9,0";
+
+		TestUtils.compareGraph(graph, vertices, edges);
+	}
+
+	@Test
+	public void testGraphMetrics()
+			throws Exception {
+		int vertexCount = 100;
+
+		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, vertexCount)
+			.generate();
+
+		assertEquals(vertexCount, graph.numberOfVertices());
+		assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
+
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+
+		assertEquals(1, minInDegree);
+		assertEquals(1, minOutDegree);
+		assertEquals(vertexCount - 1, maxInDegree);
+		assertEquals(vertexCount - 1, maxOutDegree);
+	}
+
+	@Test
+	public void testParallelism()
+			throws Exception {
+		int parallelism = 2;
+
+		Graph<LongValue,NullValue,NullValue> graph = new StarGraph(env, 100)
+			.setParallelism(parallelism)
+			.generate();
+
+		graph.getVertices().output(new DiscardingOutputFormat<Vertex<LongValue,NullValue>>());
+		graph.getEdges().output(new DiscardingOutputFormat<Edge<LongValue,NullValue>>());
+
+		TestUtils.verifyParallelism(env, parallelism);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
new file mode 100644
index 0000000..3ea5a44
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.generator;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.costs.DefaultCostEstimator;
+import org.apache.flink.optimizer.plan.Channel;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.test.util.TestBaseUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertTrue;
+
+public final class TestUtils {
+
+	/**
+	 * Compare graph vertices and edges against expected values.
+	 *
+	 * @param graph graph under test
+	 * @param expectedVertices vertex labels separated by semi-colons; whitespace is ignored
+	 * @param expectedEdges edges of the form "source,target" separated by semi-colons; whitespace is ignored
+	 * @param <K> the key type for edge and vertex identifiers
+	 * @param <VV> the value type for vertices
+	 * @param <EV> the value type for edges
+	 * @throws Exception
+	 */
+	public static <K,VV,EV> void compareGraph(Graph<K,VV,EV> graph, String expectedVertices, String expectedEdges)
+			throws Exception {
+		// Vertices
+		if (expectedVertices != null) {
+			List<String> resultVertices = new ArrayList<>();
+
+			for (Vertex<K, VV> vertex : graph.getVertices().collect()) {
+				resultVertices.add(vertex.f0.toString());
+			}
+
+			TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n"));
+		}
+
+		// Edges
+		if (expectedEdges != null) {
+			List<String> resultEdges = new ArrayList<>();
+
+			for (Edge<K, EV> edge : graph.getEdges().collect()) {
+				resultEdges.add(edge.f0.toString() + "," + edge.f1.toString());
+			}
+
+			TestBaseUtils.compareResultAsText(resultEdges, expectedEdges.replaceAll("\\s","").replace(";", "\n"));
+		}
+	}
+
+	/**
+	 * Verify operator parallelism.
+	 *
+	 * @param env the Flink execution environment.
+	 * @param expectedParallelism expected operator parallelism
+	 */
+	public static void verifyParallelism(ExecutionEnvironment env, int expectedParallelism) {
+		env.setParallelism(2 * expectedParallelism);
+
+		Optimizer compiler = new Optimizer(null, new DefaultCostEstimator(), new Configuration());
+		OptimizedPlan optimizedPlan = compiler.compile(env.createProgramPlan());
+
+		List<PlanNode> queue = new ArrayList<>();
+		queue.addAll(optimizedPlan.getDataSinks());
+
+		while (queue.size() > 0) {
+			PlanNode node = queue.remove(queue.size() - 1);
+
+			// Data sources may have parallelism of 1, so simply check that the node
+			// parallelism has not been increased by setting the default parallelism
+			assertTrue("Wrong parallelism for " + node.toString(), node.getParallelism() <= expectedParallelism);
+
+			for (Channel channel : node.getInputs()) {
+				queue.add(channel.getSource());
+			}
+		}
+	}
+}


Mime
View raw message