Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BCEA919E94 for ; Wed, 13 Apr 2016 18:29:11 +0000 (UTC) Received: (qmail 17224 invoked by uid 500); 13 Apr 2016 18:29:11 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 17185 invoked by uid 500); 13 Apr 2016 18:29:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 17176 invoked by uid 99); 13 Apr 2016 18:29:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 13 Apr 2016 18:29:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 706C1DFB73; Wed, 13 Apr 2016 18:29:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Date: Wed, 13 Apr 2016 18:29:11 -0000 Message-Id: <88e009da72e14b808673af2932f3b099@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] flink git commit: [FLINK-2909] [gelly] Graph Generators Repository: flink Updated Branches: refs/heads/master 5350bc48a -> b0a7a1b81 http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java new file mode 100644 index 0000000..3bb904e --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/AbstractGeneratorFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +import java.util.ArrayList; +import java.util.List; + +/** + * This base class handles the task of dividing the requested work into the + * appropriate number of blocks of near-equal size. + * + * @param the type of the {@code RandomGenerator} + */ +public abstract class AbstractGeneratorFactory +implements RandomGenerableFactory { + + // A large computation will run in parallel but blocks are generated on + // and distributed from a single node. This limit should be greater + // than the maximum expected parallelism. + public static final int MAXIMUM_BLOCK_COUNT = 1 << 20; + + // This should be sufficiently large relative to the cost of instantiating + // and initializing the random generator and sufficiently small relative to + // the cost of generating random values. + protected abstract int getMinimumCyclesPerBlock(); + + protected abstract RandomGenerable next(); + + @Override + public List> getRandomGenerables(long elementCount, int cyclesPerElement) { + long cycles = elementCount * cyclesPerElement; + int blockCount = Math.min((int) Math.ceil(cycles / (float) getMinimumCyclesPerBlock()), MAXIMUM_BLOCK_COUNT); + + long elementsPerBlock = elementCount / blockCount; + long elementRemainder = elementCount % blockCount; + + List> blocks = new ArrayList<>(blockCount); + long blockStart = 0; + + for (int blockIndex = 0 ; blockIndex < blockCount ; blockIndex++) { + if (blockIndex == blockCount - elementRemainder) { + elementsPerBlock++; + } + + RandomGenerable randomGenerable = next(); + + blocks.add(new BlockInfo<>(randomGenerable, blockIndex, blockCount, blockStart, elementsPerBlock)); + + blockStart += elementsPerBlock; + } + + return blocks; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java new file mode 100644 index 0000000..5e30a3f --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/BlockInfo.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +/** + * Defines a source of randomness and a unit of work. + * + * @param the type of the {@code RandomGenerator} + */ +public class BlockInfo { + + private final RandomGenerable randomGenerable; + + private final int blockIndex; + + private final int blockCount; + + private final long firstElement; + + private final long elementCount; + + public BlockInfo(RandomGenerable randomGenerable, int blockIndex, int blockCount, long firstElement, long elementCount) { + this.randomGenerable = randomGenerable; + this.blockIndex = blockIndex; + this.blockCount = blockCount; + this.firstElement = firstElement; + this.elementCount = elementCount; + } + + /** + * @return the source of randomness + */ + public RandomGenerable getRandomGenerable() { + return randomGenerable; + } + + /** + * @return the index of this block within the list of blocks + */ + public int getBlockIndex() { + return blockIndex; + } + + /** + * @return the total number of blocks + */ + public int getBlockCount() { + return blockCount; + } + + /** + * @return the index of the first element in this block + */ + public long getFirstElement() { + return firstElement; + } + + /** + * @return the total number of elements across all blocks + */ + public long getElementCount() { + return elementCount; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java new file mode 100644 index 0000000..2024cae --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/JDKRandomGeneratorFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.JDKRandomGenerator; + +/** + * Uses a seeded {@link JDKRandomGenerator} to generate seeds for the + * distributed collection of {@link JDKRandomGenerator}. + */ +public class JDKRandomGeneratorFactory +extends AbstractGeneratorFactory { + + public static final long DEFAULT_SEED = 0x4b6f7e18198de7a4L; + + public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20; + + private final JDKRandomGenerator random = new JDKRandomGenerator(); + + public JDKRandomGeneratorFactory() { + this(DEFAULT_SEED); + } + + public JDKRandomGeneratorFactory(long seed) { + random.setSeed(seed); + } + + @Override + protected int getMinimumCyclesPerBlock() { + return MINIMUM_CYCLES_PER_BLOCK; + } + + @Override + protected JDKRandomGenerable next() { + return new JDKRandomGenerable(random.nextLong()); + } + + private static class JDKRandomGenerable + implements RandomGenerable { + + private final long seed; + + public JDKRandomGenerable(long seed) { + this.seed = seed; + } + + @Override + public JDKRandomGenerator generator() { + JDKRandomGenerator random = new JDKRandomGenerator(); + + random.setSeed(seed); + + return random; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java new file mode 100644 index 0000000..22a7b04 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/MersenneTwisterFactory.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.MersenneTwister; + +/** + * Uses a seeded {@link MersenneTwister} to generate seeds for the + * distributed collection of {@link MersenneTwister}. + */ +public class MersenneTwisterFactory +extends AbstractGeneratorFactory { + + public static final long DEFAULT_SEED = 0x74c8cc8a58a9ceb9L; + + public static final int MINIMUM_CYCLES_PER_BLOCK = 1 << 20; + + private final MersenneTwister random = new MersenneTwister(); + + public MersenneTwisterFactory() { + this(DEFAULT_SEED); + } + + public MersenneTwisterFactory(long seed) { + random.setSeed(seed); + } + + @Override + protected int getMinimumCyclesPerBlock() { + return MINIMUM_CYCLES_PER_BLOCK; + } + + @Override + protected MersenneTwisterGenerable next() { + return new MersenneTwisterGenerable(random.nextLong()); + } + + private static class MersenneTwisterGenerable + implements RandomGenerable { + + private final long seed; + + public MersenneTwisterGenerable(long seed) { + this.seed = seed; + } + + @Override + public MersenneTwister generator() { + MersenneTwister random = new MersenneTwister(); + + random.setSeed(seed); + + return random; + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java new file mode 100644 index 0000000..318b508 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerable.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +/** + * A RandomGenerable provides deferred instantiation and initialization of a + * RandomGenerator. This allows pre-processing or discovery to be distributed + * and performed in parallel by Flink tasks. + * + * A distributed PRNG is described by Matsumoto and Takuji in + * "Dynamic Creation of Pseudorandom Number Generators". + * + * @param the type of the {@code RandomGenerator} + */ +public interface RandomGenerable { + + /** + * Returns an initialized {@link RandomGenerator}. + * + * @return a {@code RandomGenerator} of type {@code T} + */ + T generator(); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java new file mode 100644 index 0000000..ead29fc --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/random/RandomGenerableFactory.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator.random; + +import org.apache.commons.math3.random.RandomGenerator; + +import java.util.List; + +/** + * A {@code RandomGenerableFactory} returns a scale-free collection of sources + * of pseudorandomness which can be used to perform repeatable parallel + * computation regardless of parallelism. + * + *
+ * {@code
+ * RandomGenerableFactory factory = new JDKRandomGeneratorFactory()
+ *
+ * List> generatorBlocks = factory
+ *     .getRandomGenerables(elementCount, cyclesPerElement);
+ *
+ * DataSet<...> generatedEdges = env
+ *     .fromCollection(generatorBlocks)
+ *         .name("Random generators")
+ *     .flatMap(...
+ * }
+ * 
+ * + * @param the type of the {@code RandomGenerator} + */ +public interface RandomGenerableFactory { + + /** + * The amount of work ({@code elementCount * cyclerPerElement}) is used to + * generate a list of blocks of work of near-equal size. + * + * @param elementCount number of elements, as indexed in the {@code BlockInfo} + * @param cyclesPerElement number of cycles of the PRNG per element + * @return the list of configuration blocks + */ + List> getRandomGenerables(long elementCount, int cyclesPerElement); +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java new file mode 100644 index 0000000..1cac80b --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/AbstractGraphTest.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.junit.Before; + +public class AbstractGraphTest { + + protected ExecutionEnvironment env; + + @Before + public void setup() { + env = ExecutionEnvironment.createCollectionsEnvironment(); + env.getConfig().disableSysoutLogging(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java new file mode 100644 index 0000000..af47fdc --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CompleteGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexCount = 4; + + Graph graph = new CompleteGraph(env, vertexCount) + .generate(); + + String vertices = "0; 1; 2; 3"; + String edges = "0,1; 0,2; 0,3; 1,0; 1,2; 1,3; 2,0; 2,1; 2,3; 3,0; 3,1; 3,2"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 10; + + Graph graph = new CompleteGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(vertexCount - 1, minInDegree); + assertEquals(vertexCount - 1, minOutDegree); + assertEquals(vertexCount - 1, maxInDegree); + assertEquals(vertexCount - 1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new CompleteGraph(env, 10) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java new file mode 100644 index 0000000..fb6799b --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class CycleGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new CycleGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" + + "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8; 9,0; 0,9"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new CycleGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * vertexCount, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(2, minInDegree); + assertEquals(2, minOutDegree); + assertEquals(2, maxInDegree); + assertEquals(2, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new CycleGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java new file mode 100644 index 0000000..bc1ef77 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class EmptyGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new EmptyGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = null; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new EmptyGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(0, graph.numberOfEdges()); + + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(0, maxInDegree); + assertEquals(0, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new EmptyGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java new file mode 100644 index 0000000..f3fa7db --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class GridGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new GridGraph(env) + .addDimension(2, false) + .addDimension(3, false) + .generate(); + + // 0 1 2 + // 3 4 5 + String vertices = "0; 1; 2; 3; 4; 5"; + String edges = "0,1; 0,3; 1,0; 1,2; 1,4; 2,1; 2,5; 3,0; 3,4; 4,1;" + + "4,3; 4,5; 5,2; 5,4"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + Graph graph = new GridGraph(env) + .addDimension(2, true) + .addDimension(3, true) + .addDimension(5, true) + .addDimension(7, true) + .generate(); + + // Each vertex is the source of one edge in the first dimension of size 2, + // and the source of two edges in each dimension of size greater than 2. + assertEquals(2*3*5*7, graph.numberOfVertices()); + assertEquals(7 * 2*3*5*7, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(7, minInDegree); + assertEquals(7, minOutDegree); + assertEquals(7, maxInDegree); + assertEquals(7, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new GridGraph(env) + .addDimension(3, false) + .addDimension(5, false) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java new file mode 100644 index 0000000..77eed89 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class HypercubeGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int dimensions = 3; + + Graph graph = new HypercubeGraph(env, dimensions) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7"; + String edges = "0,1; 0,2; 0,4; 1,0; 1,3; 1,5; 2,0; 2,3; 2,6; 3,1; 3,2; 3,7;" + + "4,0; 4,5; 4,6; 5,1; 5,4; 5,7; 6,2; 6,4; 6,7; 7,3; 7,6; 7,5"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int dimensions = 10; + + Graph graph = new HypercubeGraph(env, dimensions) + .generate(); + + assertEquals(1 << dimensions, graph.numberOfVertices()); + assertEquals(dimensions * (1 << dimensions), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(dimensions, minInDegree); + assertEquals(dimensions, minOutDegree); + assertEquals(dimensions, maxInDegree); + assertEquals(dimensions, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new HypercubeGraph(env, 4) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java new file mode 100644 index 0000000..b8a409f --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class PathGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + Graph graph = new PathGraph(env, 10) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 1,2; 2,1; 2,3; 3,2; 3,4; 4,3; 4,5; 5,4;" + + "5,6; 6,5; 6,7; 7,6; 7,8; 8,7; 8,9; 9,8"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new PathGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(2, maxInDegree); + assertEquals(2, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new PathGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java new file mode 100644 index 0000000..a06c63f --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/RMatGraphTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.commons.math3.random.JDKRandomGenerator; +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory; +import org.apache.flink.graph.generator.random.RandomGenerableFactory; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RMatGraphTest +extends AbstractGraphTest { + + @Test + public void testGraphMetrics() + throws Exception { + long vertexCount = 100; + + long edgeCount = 1000; + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + Graph graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount) + .generate(); + + assertTrue(vertexCount >= graph.numberOfVertices()); + assertEquals(edgeCount, graph.numberOfEdges()); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + RandomGenerableFactory rnd = new JDKRandomGeneratorFactory(); + + Graph graph = new RMatGraph<>(env, rnd, 100, 1000) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java new file mode 100644 index 0000000..3877717 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class SingletonEdgeGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexPairCount = 5; + + Graph graph = new SingletonEdgeGraph(env, vertexPairCount) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 2,3; 3,2; 4,5; 5,4; 6,7; 7,6; 8,9; 9,8"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexPairCount = 10; + + Graph graph = new SingletonEdgeGraph(env, vertexPairCount) + .generate(); + + assertEquals(2 * vertexPairCount, graph.numberOfVertices()); + assertEquals(2 * vertexPairCount, graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(1, maxInDegree); + assertEquals(1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new SingletonEdgeGraph(env, 10) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java new file mode 100644 index 0000000..2b090db --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.io.DiscardingOutputFormat; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.types.LongValue; +import org.apache.flink.types.NullValue; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class StarGraphTest +extends AbstractGraphTest { + + @Test + public void testGraph() + throws Exception { + int vertexCount = 10; + + Graph graph = new StarGraph(env, vertexCount) + .generate(); + + String vertices = "0; 1; 2; 3; 4; 5; 6; 7; 8; 9"; + String edges = "0,1; 1,0; 0,2; 2,0; 0,3; 3,0; 0,4; 4,0; 0,5; 5,0;" + + "0,6; 6,0; 0,7; 7,0; 0,8; 8,0; 0,9; 9,0"; + + TestUtils.compareGraph(graph, vertices, edges); + } + + @Test + public void testGraphMetrics() + throws Exception { + int vertexCount = 100; + + Graph graph = new StarGraph(env, vertexCount) + .generate(); + + assertEquals(vertexCount, graph.numberOfVertices()); + assertEquals(2 * (vertexCount - 1), graph.numberOfEdges()); + + long minInDegree = graph.inDegrees().min(1).collect().get(0).f1; + long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1; + long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1; + long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1; + + assertEquals(1, minInDegree); + assertEquals(1, minOutDegree); + assertEquals(vertexCount - 1, maxInDegree); + assertEquals(vertexCount - 1, maxOutDegree); + } + + @Test + public void testParallelism() + throws Exception { + int parallelism = 2; + + Graph graph = new StarGraph(env, 100) + .setParallelism(parallelism) + .generate(); + + graph.getVertices().output(new DiscardingOutputFormat>()); + graph.getEdges().output(new DiscardingOutputFormat>()); + + TestUtils.verifyParallelism(env, parallelism); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/b0a7a1b8/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java new file mode 100644 index 0000000..3ea5a44 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.generator; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.optimizer.Optimizer; +import org.apache.flink.optimizer.costs.DefaultCostEstimator; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.OptimizedPlan; +import org.apache.flink.optimizer.plan.PlanNode; +import org.apache.flink.test.util.TestBaseUtils; + +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertTrue; + +public final class TestUtils { + + /** + * Compare graph vertices and edges against expected values. + * + * @param graph graph under test + * @param expectedVertices vertex labels separated by semi-colons; whitespace is ignored + * @param expectedEdges edges of the form "source,target" separated by semi-colons; whitespace is ignored + * @param the key type for edge and vertex identifiers + * @param the value type for vertices + * @param the value type for edges + * @throws Exception + */ + public static void compareGraph(Graph graph, String expectedVertices, String expectedEdges) + throws Exception { + // Vertices + if (expectedVertices != null) { + List resultVertices = new ArrayList<>(); + + for (Vertex vertex : graph.getVertices().collect()) { + resultVertices.add(vertex.f0.toString()); + } + + TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n")); + } + + // Edges + if (expectedEdges != null) { + List resultEdges = new ArrayList<>(); + + for (Edge edge : graph.getEdges().collect()) { + resultEdges.add(edge.f0.toString() + "," + edge.f1.toString()); + } + + TestBaseUtils.compareResultAsText(resultEdges, expectedEdges.replaceAll("\\s","").replace(";", "\n")); + } + } + + /** + * Verify operator parallelism. + * + * @param env the Flink execution environment. + * @param expectedParallelism expected operator parallelism + */ + public static void verifyParallelism(ExecutionEnvironment env, int expectedParallelism) { + env.setParallelism(2 * expectedParallelism); + + Optimizer compiler = new Optimizer(null, new DefaultCostEstimator(), new Configuration()); + OptimizedPlan optimizedPlan = compiler.compile(env.createProgramPlan()); + + List queue = new ArrayList<>(); + queue.addAll(optimizedPlan.getDataSinks()); + + while (queue.size() > 0) { + PlanNode node = queue.remove(queue.size() - 1); + + // Data sources may have parallelism of 1, so simply check that the node + // parallelism has not been increased by setting the default parallelism + assertTrue("Wrong parallelism for " + node.toString(), node.getParallelism() <= expectedParallelism); + + for (Channel channel : node.getInputs()) { + queue.add(channel.getSource()); + } + } + } +}