Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 76F88107F4 for ; Wed, 4 Mar 2015 20:33:16 +0000 (UTC) Received: (qmail 67591 invoked by uid 500); 4 Mar 2015 20:33:03 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 67515 invoked by uid 500); 4 Mar 2015 20:33:03 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 67430 invoked by uid 99); 4 Mar 2015 20:33:03 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 04 Mar 2015 20:33:03 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5340EE104E; Wed, 4 Mar 2015 20:33:03 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Wed, 04 Mar 2015 20:33:08 -0000 Message-Id: <8ce466d989184b338476acbeabf775fe@git.apache.org> In-Reply-To: <66f43b15b7204cbc9fc9a2e55f038278@git.apache.org> References: <66f43b15b7204cbc9fc9a2e55f038278@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [6/7] flink git commit: [gelly] refactored tests; removed duplicate data from TestGraphUtils http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java deleted file mode 100644 index 4e2c858..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/MapVerticesITCase.java +++ /dev/null @@ -1,233 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.common.functions.MapFunction; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple1; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; -import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class MapVerticesITCase extends MultipleProgramsTestBase { - - public MapVerticesITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testWithSameValue() throws Exception { - /* - * Test mapVertices() keeping the same value type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> mappedVertices = graph.mapVertices(new AddOneMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,6\n"; - } - - @Test - public void testWithStringValue() throws Exception { - /* - * Test mapVertices() and change the value type to String - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> mappedVertices = graph.mapVertices(new ToStringMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,one\n" + - "2,two\n" + - "3,three\n" + - "4,four\n" + - "5,five\n"; - } - - @Test - public void testWithtuple1Value() throws Exception { - /* - * Test mapVertices() and change the value type to a Tuple1 - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet>> mappedVertices = graph.mapVertices(new ToTuple1Mapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(1)\n" + - "2,(2)\n" + - "3,(3)\n" + - "4,(4)\n" + - "5,(5)\n"; - } - - @Test - public void testWithCustomType() throws Exception { - /* - * Test mapVertices() and change the value type to a custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> mappedVertices = graph.mapVertices(new ToCustomTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(T,1)\n" + - "2,(T,2)\n" + - "3,(T,3)\n" + - "4,(T,4)\n" + - "5,(T,5)\n"; - } - - @Test - public void testWithCustomParametrizedType() throws Exception { - /* - * Test mapVertices() and change the value type to a parameterized custom type - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet>> mappedVertices = graph.mapVertices( - new ToCustomParametrizedTypeMapper()).getVertices(); - - mappedVertices.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,(1.0,1)\n" + - "2,(2.0,2)\n" + - "3,(3.0,3)\n" + - "4,(4.0,4)\n" + - "5,(5.0,5)\n"; - } - - @SuppressWarnings("serial") - private static final class AddOneMapper implements MapFunction, Long> { - public Long map(Vertex value) throws Exception { - return value.getValue()+1; - } - } - - @SuppressWarnings("serial") - private static final class ToStringMapper implements MapFunction, String> { - public String map(Vertex vertex) throws Exception { - String stringValue; - if (vertex.getValue() == 1) { - stringValue = "one"; - } - else if (vertex.getValue() == 2) { - stringValue = "two"; - } - else if (vertex.getValue() == 3) { - stringValue = "three"; - } - else if (vertex.getValue() == 4) { - stringValue = "four"; - } - else if (vertex.getValue() == 5) { - stringValue = "five"; - } - else { - stringValue = ""; - } - return stringValue; - } - } - - @SuppressWarnings("serial") - private static final class ToTuple1Mapper implements MapFunction, Tuple1> { - public Tuple1 map(Vertex vertex) throws Exception { - Tuple1 tupleValue = new Tuple1(); - tupleValue.setFields(vertex.getValue()); - return tupleValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomTypeMapper implements MapFunction, DummyCustomType> { - public DummyCustomType map(Vertex vertex) throws Exception { - DummyCustomType dummyValue = new DummyCustomType(); - dummyValue.setIntField(vertex.getValue().intValue()); - return dummyValue; - } - } - - @SuppressWarnings("serial") - private static final class ToCustomParametrizedTypeMapper implements MapFunction, - DummyCustomParameterizedType> { - - public DummyCustomParameterizedType map(Vertex vertex) throws Exception { - DummyCustomParameterizedType dummyValue = new DummyCustomParameterizedType(); - dummyValue.setIntField(vertex.getValue().intValue()); - dummyValue.setTField(new Double(vertex.getValue())); - return dummyValue; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java deleted file mode 100644 index 29d76f0..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnEdgesMethodsITCase.java +++ /dev/null @@ -1,317 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.EdgesFunction; -import org.apache.flink.graph.EdgesFunctionWithVertexValue; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase { - - public ReduceOnEdgesMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testLowestWeightOutNeighbor() throws Exception { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testLowestWeightInNeighbor() throws Exception { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - - @Test - public void testMaxWeightEdge() throws Exception { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - - @Test - public void testLowestWeightOutNeighborNoValue() throws Exception { - /* - * Get the lowest-weight out-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightNeighborNoValue(), EdgeDirection.OUT); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testLowestWeightInNeighborNoValue() throws Exception { - /* - * Get the lowest-weight in-neighbor - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithLowestOutNeighbor = - graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN); - verticesWithLowestOutNeighbor.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,1\n" + - "3,1\n" + - "4,3\n" + - "5,3\n"; - } - - @Test - public void testMaxWeightAllNeighbors() throws Exception { - /* - * Get the maximum weight among all edges - * of a vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithMaxEdgeWeight = - graph.reduceOnEdges(new SelectMaxWeightNeighborNoValue(), EdgeDirection.ALL); - verticesWithMaxEdgeWeight.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,51\n" + - "2,23\n" + - "3,35\n" + - "4,45\n" + - "5,51\n"; - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue> { - - public Tuple2 iterateEdges( - Vertex v, - Iterable> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getTarget(); - } - } - return new Tuple2(v.getId(), minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue> { - - public Tuple2 iterateEdges(Vertex v, - Iterable> edges) { - - long weight = Long.MIN_VALUE; - - for (Edge edge: edges) { - if (edge.getValue() > weight) { - weight = edge.getValue(); - } - } - return new Tuple2(v.getId(), weight); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightNeighborNoValue implements EdgesFunction> { - - public Tuple2 iterateEdges(Iterable>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getTarget(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2(vertexId, minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction> { - - public Tuple2 iterateEdges(Iterable>> edges) { - - long weight = Long.MIN_VALUE; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() > weight) { - weight = edge.f1.getValue(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2(vertexId, weight); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue> { - - public Tuple2 iterateEdges( - Vertex v, - Iterable> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - - for (Edge edge: edges) { - if (edge.getValue() < weight) { - weight = edge.getValue(); - minNeighorId = edge.getSource(); - } - } - return new Tuple2(v.getId(), minNeighorId); - } - } - - @SuppressWarnings("serial") - private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction> { - - public Tuple2 iterateEdges(Iterable>> edges) { - - long weight = Long.MAX_VALUE; - long minNeighorId = 0; - long vertexId = -1; - long i=0; - - for (Tuple2> edge: edges) { - if (edge.f1.getValue() < weight) { - weight = edge.f1.getValue(); - minNeighorId = edge.f1.getSource(); - } - if (i==0) { - vertexId = edge.f0; - } i++; - } - return new Tuple2(vertexId, minNeighorId); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java deleted file mode 100644 index d385399..0000000 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/ReduceOnNeighborMethodsITCase.java +++ /dev/null @@ -1,303 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.test; - -import java.util.Iterator; - -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.api.java.tuple.Tuple3; -import org.apache.flink.graph.Edge; -import org.apache.flink.graph.EdgeDirection; -import org.apache.flink.graph.Graph; -import org.apache.flink.graph.NeighborsFunction; -import org.apache.flink.graph.NeighborsFunctionWithVertexValue; -import org.apache.flink.graph.Vertex; -import org.apache.flink.test.util.MultipleProgramsTestBase; -import org.junit.After; -import org.junit.Before; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; - -@RunWith(Parameterized.class) -public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase { - - public ReduceOnNeighborMethodsITCase(MultipleProgramsTestBase.ExecutionMode mode){ - super(mode); - } - - private String resultPath; - private String expectedResult; - - @Rule - public TemporaryFolder tempFolder = new TemporaryFolder(); - - @Before - public void before() throws Exception{ - resultPath = tempFolder.newFile().toURI().toString(); - } - - @After - public void after() throws Exception{ - compareResultsByLinesInMemory(expectedResult, resultPath); - } - - @Test - public void testSumOfOutNeighbors() throws Exception { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testSumOfInNeighbors() throws Exception { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - } - - @Test - public void testSumOfOAllNeighbors() throws Exception { - /* - * Get the sum of all neighbor values - * including own vertex value - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,11\n" + - "2,6\n" + - "3,15\n" + - "4,12\n" + - "5,13\n"; - } - - @Test - public void testSumOfOutNeighborsNoValue() throws Exception { - /* - * Get the sum of out-neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumOutNeighborsNoValue(), EdgeDirection.OUT); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,5\n" + - "2,3\n" + - "3,9\n" + - "4,5\n" + - "5,1\n"; - } - - @Test - public void testSumOfInNeighborsNoValue() throws Exception { - /* - * Get the sum of in-neighbor values - * times the edge weights for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSum = - graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN); - - verticesWithSum.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,255\n" + - "2,12\n" + - "3,59\n" + - "4,102\n" + - "5,285\n"; - } - - @Test - public void testSumOfAllNeighborsNoValue() throws Exception { - /* - * Get the sum of all neighbor values - * for each vertex - */ - final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); - Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), - TestGraphUtils.getLongLongEdgeData(env), env); - - DataSet> verticesWithSumOfOutNeighborValues = - graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL); - - verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath); - env.execute(); - - expectedResult = "1,10\n" + - "2,4\n" + - "3,12\n" + - "4,8\n" + - "5,8\n"; - } - - @SuppressWarnings("serial") - private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue> { - - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { - - long sum = 0; - for (Tuple2, Vertex> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2(vertex.getId(), sum); - } - } - - @SuppressWarnings("serial") - private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue> { - - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { - - long sum = 0; - for (Tuple2, Vertex> neighbor : neighbors) { - sum += neighbor.f0.getValue() * neighbor.f1.getValue(); - } - return new Tuple2(vertex.getId(), sum); - } - } - - @SuppressWarnings("serial") - private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue> { - - public Tuple2 iterateNeighbors(Vertex vertex, - Iterable, Vertex>> neighbors) { - - long sum = 0; - for (Tuple2, Vertex> neighbor : neighbors) { - sum += neighbor.f1.getValue(); - } - return new Tuple2(vertex.getId(), sum + vertex.getValue()); - } - } - - @SuppressWarnings("serial") - private static final class SumOutNeighborsNoValue implements NeighborsFunction> { - - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2(next.f0, sum); - } - } - - @SuppressWarnings("serial") - private static final class SumInNeighborsNoValue implements NeighborsFunction> { - - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue() * next.f1.getValue(); - } - return new Tuple2(next.f0, sum); - } - } - - @SuppressWarnings("serial") - private static final class SumAllNeighborsNoValue implements NeighborsFunction> { - - public Tuple2 iterateNeighbors( - Iterable, Vertex>> neighbors) { - - long sum = 0; - Tuple3, Vertex> next = null; - Iterator, Vertex>> neighborsIterator = - neighbors.iterator(); - while(neighborsIterator.hasNext()) { - next = neighborsIterator.next(); - sum += next.f2.getValue(); - } - return new Tuple2(next.f0, sum); - } - } -} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java index 9f6569b..75355f0 100644 --- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/TestGraphUtils.java @@ -33,27 +33,47 @@ public class TestGraphUtils { public static final DataSet> getLongLongVertexData( ExecutionEnvironment env) { - List> vertices = new ArrayList>(); - vertices.add(new Vertex(1L, 1L)); - vertices.add(new Vertex(2L, 2L)); - vertices.add(new Vertex(3L, 3L)); - vertices.add(new Vertex(4L, 4L)); - vertices.add(new Vertex(5L, 5L)); - - return env.fromCollection(vertices); + + return env.fromCollection(getLongLongVertices()); } public static final DataSet> getLongLongEdgeData( ExecutionEnvironment env) { - List> edges = new ArrayList>(); - edges.add(new Edge(1L, 2L, 12L)); - edges.add(new Edge(1L, 3L, 13L)); - edges.add(new Edge(2L, 3L, 23L)); - edges.add(new Edge(3L, 4L, 34L)); - edges.add(new Edge(3L, 5L, 35L)); - edges.add(new Edge(4L, 5L, 45L)); - edges.add(new Edge(5L, 1L, 51L)); - + + return env.fromCollection(getLongLongEdges()); + } + + public static final DataSet> getLongLongEdgeInvalidSrcData( + ExecutionEnvironment env) { + List> edges = getLongLongEdges(); + + edges.remove(1); + edges.add(new Edge(13L, 3L, 13L)); + + return env.fromCollection(edges); + } + + public static final DataSet> getLongLongEdgeInvalidTrgData( + ExecutionEnvironment env) { + List> edges = getLongLongEdges(); + + edges.remove(0); + edges.add(new Edge(13L, 3L, 13L)); + + return env.fromCollection(edges); + } + + public static final DataSet> getLongLongEdgeInvalidSrcTrgData( + ExecutionEnvironment env) { + List> edges = getLongLongEdges(); + + edges.remove(0); + edges.remove(1); + edges.remove(2); + edges.add(new Edge(13L, 3L, 13L)); + edges.add(new Edge(1L, 12L, 12L)); + edges.add(new Edge(13L, 33L, 13L)); + return env.fromCollection(edges); } @@ -193,12 +213,10 @@ public class TestGraphUtils { */ public static final DataSet> getLongLongInvalidVertexData( ExecutionEnvironment env) { - List> vertices = new ArrayList>(); + List> vertices = getLongLongVertices(); + + vertices.remove(0); vertices.add(new Vertex(15L, 1L)); - vertices.add(new Vertex(2L, 2L)); - vertices.add(new Vertex(3L, 3L)); - vertices.add(new Vertex(4L, 4L)); - vertices.add(new Vertex(5L, 5L)); return env.fromCollection(vertices); } http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java new file mode 100755 index 0000000..185d922 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/LabelPropagationExampleITCase.java @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.example; + +import com.google.common.base.Charsets; +import com.google.common.io.Files; +import org.apache.flink.graph.example.LabelPropagationExample; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; + +@RunWith(Parameterized.class) +public class LabelPropagationExampleITCase extends MultipleProgramsTestBase { + + public LabelPropagationExampleITCase(ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testSingleIteration() throws Exception { + /* + * Test one iteration of label propagation example with a simple graph + */ + + final String vertices = "1 10\n" + + "2 10\n" + + "3 30\n" + + "4 40\n" + + "5 40\n" + + "6 40\n" + + "7 70\n"; + + final String edges = "1 3\n" + + "2 3\n" + + "4 7\n" + + "5 7\n" + + "6 7\n" + + "7 3\n"; + + String verticesPath = createTempFile(vertices); + String edgesPath = createTempFile(edges); + + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"}); + + expectedResult = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,40\n" + + "5,40\n" + + "6,40\n" + + "7,40\n"; + } + + @Test + public void testTieBreaker() throws Exception { + /* + * Test the label propagation example where a tie must be broken + */ + + final String vertices = "1 10\n" + + "2 10\n" + + "3 10\n" + + "4 10\n" + + "5 0\n" + + "6 20\n" + + "7 20\n" + + "8 20\n" + + "9 20\n"; + + final String edges = "1 5\n" + + "2 5\n" + + "3 5\n" + + "4 5\n" + + "6 5\n" + + "7 5\n" + + "8 5\n" + + "9 5\n"; + + String verticesPath = createTempFile(vertices); + String edgesPath = createTempFile(edges); + + LabelPropagationExample.main(new String[] {verticesPath, edgesPath, resultPath, "1"}); + + expectedResult = "1,10\n" + + "2,10\n" + + "3,10\n" + + "4,10\n" + + "5,20\n" + + "6,20\n" + + "7,20\n" + + "8,20\n" + + "9,20\n"; + } + + // ------------------------------------------------------------------------- + // Util methods + // ------------------------------------------------------------------------- + + private String createTempFile(final String rows) throws Exception { + File tempFile = tempFolder.newFile(); + Files.write(rows, tempFile, Charsets.UTF_8); + return tempFile.toURI().toString(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java new file mode 100644 index 0000000..8c363a5 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class DegreesITCase extends MultipleProgramsTestBase { + + public DegreesITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testOutDegrees() throws Exception { + /* + * Test outDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,1\n"; + } + + @Test + public void testOutDegreesWithNoOutEdges() throws Exception { + /* + * Test outDegrees() no outgoing edges + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,3\n" + + "2,1\n" + + "3,1\n" + + "4,1\n" + + "5,0\n"; + } + + @Test + public void testInDegrees() throws Exception { + /* + * Test inDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.inDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,1\n" + + "2,1\n" + + "3,2\n" + + "4,1\n" + + "5,2\n"; + } + + @Test + public void testInDegreesWithNoInEdge() throws Exception { + /* + * Test inDegrees() no ingoing edge + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env); + + graph.inDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0\n" + + "2,1\n" + + "3,1\n" + + "4,1\n" + + "5,3\n"; + } + + @Test + public void testGetDegrees() throws Exception { + /* + * Test getDegrees() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,3\n" + + "2,2\n" + + "3,4\n" + + "4,2\n" + + "5,3\n"; + } + + @Test + public void testGetDegreesWithDisconnectedData() throws Exception { + /* + * Test getDegrees() with disconnected data + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = + Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env); + + graph.outDegrees().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,1\n" + + "3,0\n" + + "4,1\n" + + "5,0\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java new file mode 100644 index 0000000..975d21a --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/FromCollectionITCase.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class FromCollectionITCase extends MultipleProgramsTestBase { + + public FromCollectionITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testFromCollectionVerticesEdges() throws Exception { + /* + * Test fromCollection(vertices, edges): + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), + TestGraphUtils.getLongLongEdges(), env); + + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testFromCollectionEdgesNoInitialValue() throws Exception { + /* + * Test fromCollection(edges) with no initial value for the vertices + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), + env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(null)\n" + + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; + } + + @Test + public void testFromCollectionEdgesWithInitialValue() throws Exception { + /* + * Test fromCollection(edges) with vertices initialised by a + * function that takes the id and doubles it + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromCollection(TestGraphUtils.getLongLongEdges(), + new InitVerticesMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2\n" + + "2,4\n" + + "3,6\n" + + "4,8\n" + + "5,10\n"; + } + + @SuppressWarnings("serial") + private static final class InitVerticesMapper implements MapFunction { + public Long map(Long vertexId) { + return vertexId * 2; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java new file mode 100644 index 0000000..6848dad --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationITCase.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomParameterizedType; +import org.apache.flink.graph.validation.InvalidVertexIdsValidator; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.apache.flink.types.NullValue; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphCreationITCase extends MultipleProgramsTestBase { + + public GraphCreationITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testCreateWithoutVertexValues() throws Exception { + /* + * Test create() with edge dataset and no vertex values + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(null)\n" + + "2,(null)\n" + + "3,(null)\n" + + "4,(null)\n" + + "5,(null)\n"; + } + + @Test + public void testCreateWithMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns the id as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), + new AssignIdAsValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n"; + } + + @Test + public void testCreateWithCustomVertexValue() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a parametrized custom vertex value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignCustomVertexValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(2.0,0)\n" + + "2,(4.0,1)\n" + + "3,(6.0,2)\n" + + "4,(8.0,3)\n" + + "5,(10.0,4)\n"; + } + + @Test + public void testValidate() throws Exception { + /* + * Test validate(): + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> vertices = TestGraphUtils.getLongLongVertexData(env); + DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); + + Graph graph = Graph.fromDataSet(vertices, edges, env); + DataSet result = graph.validate(new InvalidVertexIdsValidator()); + + result.writeAsText(resultPath); + env.execute(); + + expectedResult = "true\n"; + } + + @Test + public void testValidateWithInvalidIds() throws Exception { + /* + * Test validate() - invalid vertex ids + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + DataSet> vertices = TestGraphUtils.getLongLongInvalidVertexData(env); + DataSet> edges = TestGraphUtils.getLongLongEdgeData(env); + + Graph graph = Graph.fromDataSet(vertices, edges, env); + DataSet result = graph.validate(new InvalidVertexIdsValidator()); + result.writeAsText(resultPath); + env.execute(); + + expectedResult = "false\n"; + } + + @SuppressWarnings("serial") + private static final class AssignIdAsValueMapper implements MapFunction { + public Long map(Long vertexId) { + return vertexId; + } + } + + @SuppressWarnings("serial") + private static final class AssignCustomVertexValueMapper implements + MapFunction> { + + DummyCustomParameterizedType dummyValue = + new DummyCustomParameterizedType(); + + public DummyCustomParameterizedType map(Long vertexId) { + dummyValue.setIntField(vertexId.intValue()-1); + dummyValue.setTField(vertexId*2.0); + return dummyValue; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java new file mode 100644 index 0000000..010ae1d --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphCreationWithMapperITCase.java @@ -0,0 +1,159 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.graph.test.TestGraphUtils.DummyCustomType; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphCreationWithMapperITCase extends MultipleProgramsTestBase { + + public GraphCreationWithMapperITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testWithDoubleValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongEdgeData(env), + new AssignDoubleValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; + } + + @Test + public void testWithTuple2ValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a Tuple2 as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph, Long> graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignTuple2ValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(2,42)\n" + + "2,(4,42)\n" + + "3,(6,42)\n" + + "4,(8,42)\n" + + "5,(10,42)\n"; + } + + @Test + public void testWithConstantValueMapper() throws Exception { + /* + * Test create() with edge dataset with String key type + * and a mapper that assigns a double constant as value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet(TestGraphUtils.getStringLongEdgeData(env), + new AssignDoubleConstantMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,0.1\n" + + "2,0.1\n" + + "3,0.1\n" + + "4,0.1\n" + + "5,0.1\n"; + } + + @Test + public void testWithDCustomValueMapper() throws Exception { + /* + * Test create() with edge dataset and a mapper that assigns a custom vertex value + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + Graph graph = Graph.fromDataSet( + TestGraphUtils.getLongLongEdgeData(env), new AssignCustomValueMapper(), env); + + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,(F,0)\n" + + "2,(F,1)\n" + + "3,(F,2)\n" + + "4,(F,3)\n" + + "5,(F,4)\n"; + } + + @SuppressWarnings("serial") + private static final class AssignDoubleValueMapper implements MapFunction { + public Double map(Long value) { + return 0.1d; + } + } + + @SuppressWarnings("serial") + private static final class AssignTuple2ValueMapper implements MapFunction> { + public Tuple2 map(Long vertexId) { + return new Tuple2(vertexId*2, 42l); + } + } + + @SuppressWarnings("serial") + private static final class AssignDoubleConstantMapper implements MapFunction { + public Double map(String value) { + return 0.1d; + } + } + + @SuppressWarnings("serial") + private static final class AssignCustomValueMapper implements MapFunction { + public DummyCustomType map(Long vertexId) { + return new DummyCustomType(vertexId.intValue()-1, false); + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java new file mode 100644 index 0000000..502d529 --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphMutationsITCase.java @@ -0,0 +1,274 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphMutationsITCase extends MultipleProgramsTestBase { + + public GraphMutationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testAddVertex() throws Exception { + /* + * Test addVertex() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(6L, 1L, 61L)); + graph = graph.addVertex(new Vertex(6L, 6L), edges); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + + @Test + public void testAddVertexExisting() throws Exception { + /* + * Test addVertex() -- add an existing vertex + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> edges = new ArrayList>(); + edges.add(new Edge(1L, 5L, 15L)); + graph = graph.addVertex(new Vertex(1L, 1L), edges); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "1,5,15\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddVertexNoEdges() throws Exception { + /* + * Test addVertex() -- add vertex with empty edge set + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + List> edges = new ArrayList>(); + graph = graph.addVertex(new Vertex(6L, 6L), edges); + graph.getVertices().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,1\n" + + "2,2\n" + + "3,3\n" + + "4,4\n" + + "5,5\n" + + "6,6\n"; + } + + @Test + public void testRemoveVertex() throws Exception { + /* + * Test removeVertex() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeVertex(new Vertex(5L, 5L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n"; + } + + @Test + public void testRemoveInvalidVertex() throws Exception { + /* + * Test removeVertex() -- remove an invalid vertex + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeVertex(new Vertex(6L, 6L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testAddEdge() throws Exception { + /* + * Test addEdge() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.addEdge(new Vertex(6L, 6L), new Vertex(1L, 1L), + 61L); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } + + @Test + public void testAddExistingEdge() throws Exception { + /* + * Test addEdge() -- add already existing edge + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.addEdge(new Vertex(1L, 1L), new Vertex(2L, 2L), + 12L); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testRemoveVEdge() throws Exception { + /* + * Test removeEdge() -- simple case + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeEdge(new Edge(5L, 1L, 51L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @Test + public void testRemoveInvalidEdge() throws Exception { + /* + * Test removeEdge() -- invalid edge + */ + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph = graph.removeEdge(new Edge(6L, 1L, 61L)); + graph.getEdges().writeAsCsv(resultPath); + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/flink/blob/e1e03062/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java new file mode 100644 index 0000000..6c4f1ef --- /dev/null +++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.test.operations; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.graph.test.TestGraphUtils; +import org.apache.flink.test.util.MultipleProgramsTestBase; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class GraphOperationsITCase extends MultipleProgramsTestBase { + + public GraphOperationsITCase(MultipleProgramsTestBase.ExecutionMode mode){ + super(mode); + } + + private String resultPath; + private String expectedResult; + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + + @Before + public void before() throws Exception{ + resultPath = tempFolder.newFile().toURI().toString(); + } + + @After + public void after() throws Exception{ + compareResultsByLinesInMemory(expectedResult, resultPath); + } + + @Test + public void testUndirected() throws Exception { + /* + * Test getUndirected() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.getUndirected().getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "1,2,12\n" + "2,1,12\n" + + "1,3,13\n" + "3,1,13\n" + + "2,3,23\n" + "3,2,23\n" + + "3,4,34\n" + "4,3,34\n" + + "3,5,35\n" + "5,3,35\n" + + "4,5,45\n" + "5,4,45\n" + + "5,1,51\n" + "1,5,51\n"; + } + + @Test + public void testReverse() throws Exception { + /* + * Test reverse() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + graph.reverse().getEdges().writeAsCsv(resultPath); + env.execute(); + expectedResult = "2,1,12\n" + + "3,1,13\n" + + "3,2,23\n" + + "4,3,34\n" + + "5,3,35\n" + + "5,4,45\n" + + "1,5,51\n"; + } + + @SuppressWarnings("serial") + @Test + public void testSubGraph() throws Exception { + /* + * Test subgraph: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.subgraph(new FilterFunction>() { + public boolean filter(Vertex vertex) throws Exception { + return (vertex.getValue() > 2); + } + }, + new FilterFunction>() { + public boolean filter(Edge edge) throws Exception { + return (edge.getValue() > 34); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,5,35\n" + + "4,5,45\n"; + } + + @SuppressWarnings("serial") + @Test + public void testFilterVertices() throws Exception { + /* + * Test filterOnVertices: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnVertices(new FilterFunction>() { + public boolean filter(Vertex vertex) throws Exception { + return (vertex.getValue() > 2); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n"; + } + + @SuppressWarnings("serial") + @Test + public void testFilterEdges() throws Exception { + /* + * Test filterOnEdges: + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.filterOnEdges(new FilterFunction>() { + public boolean filter(Edge edge) throws Exception { + return (edge.getValue() > 34); + } + }).getEdges().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n"; + } + + @Test + public void testNumberOfVertices() throws Exception { + /* + * Test numberOfVertices() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.numberOfVertices().writeAsText(resultPath); + + env.execute(); + expectedResult = "5"; + } + + @Test + public void testNumberOfEdges() throws Exception { + /* + * Test numberOfEdges() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.numberOfEdges().writeAsText(resultPath); + + env.execute(); + expectedResult = "7"; + } + + @Test + public void testVertexIds() throws Exception { + /* + * Test getVertexIds() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.getVertexIds().writeAsText(resultPath); + + env.execute(); + expectedResult = "1\n2\n3\n4\n5\n"; + } + + @Test + public void testEdgesIds() throws Exception { + /* + * Test getEdgeIds() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + graph.getEdgeIds().writeAsCsv(resultPath); + + env.execute(); + expectedResult = "1,2\n" + "1,3\n" + + "2,3\n" + "3,4\n" + + "3,5\n" + "4,5\n" + + "5,1\n"; + } + + @Test + public void testUnion() throws Exception { + /* + * Test union() + */ + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + Graph graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), + TestGraphUtils.getLongLongEdgeData(env), env); + + List> vertices = new ArrayList>(); + List> edges = new ArrayList>(); + + vertices.add(new Vertex(6L, 6L)); + edges.add(new Edge(6L, 1L, 61L)); + + graph = graph.union(Graph.fromCollection(vertices, edges, env)); + + graph.getEdges().writeAsCsv(resultPath); + + env.execute(); + + expectedResult = "1,2,12\n" + + "1,3,13\n" + + "2,3,23\n" + + "3,4,34\n" + + "3,5,35\n" + + "4,5,45\n" + + "5,1,51\n" + + "6,1,61\n"; + } +} \ No newline at end of file