Return-Path: X-Original-To: apmail-flink-commits-archive@minotaur.apache.org Delivered-To: apmail-flink-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 62BDA18BE6 for ; Fri, 9 Oct 2015 16:05:40 +0000 (UTC) Received: (qmail 75974 invoked by uid 500); 9 Oct 2015 16:05:40 -0000 Delivered-To: apmail-flink-commits-archive@flink.apache.org Received: (qmail 75927 invoked by uid 500); 9 Oct 2015 16:05:40 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 75803 invoked by uid 99); 9 Oct 2015 16:05:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Oct 2015 16:05:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 47525E07EE; Fri, 9 Oct 2015 16:05:39 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: vasia@apache.org To: commits@flink.apache.org Date: Fri, 09 Oct 2015 16:05:49 -0000 Message-Id: <30cc43786caa49888453aceba3c57a58@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [11/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala deleted file mode 100644 index 3dc90fc..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithEdgesITCase.scala +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala._ -import org.apache.flink.graph.Edge -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.graph.scala.utils.EdgeToTuple3Map -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class JoinWithEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends -MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testWithEdgesInputDataset { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new - EdgeToTuple3Map[Long, Long]), new AddValuesMapper) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + - "90\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithEdgesInputDatasetSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdges(graph.getEdges.map(new - EdgeToTuple3Map[Long, Long]), (originalValue: Long, tupleValue: Long) => - originalValue + tupleValue) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,46\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + - "90\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithEdgesOnSource { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges - .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => - originalValue + tupleValue) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + - "90\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithEdgesOnSourceSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnSource[Long](graph.getEdges - .map(new ProjectSourceAndValueMapper), (originalValue: Long, tupleValue: Long) => - originalValue + tupleValue) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,25\n" + "2,3,46\n" + "3,4,68\n" + "3,5,69\n" + "4,5," + - "90\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithEdgesOnTarget { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges - .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => - originalValue + tupleValue) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + - "80\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithEdgesOnTargetSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithEdgesOnTarget[Long](graph.getEdges - .map(new ProjectTargetAndValueMapper), (originalValue: Long, tupleValue: Long) => - originalValue + tupleValue) - val res = result.getEdges.collect().toList - expectedResult = "1,2,24\n" + "1,3,26\n" + "2,3,36\n" + "3,4,68\n" + "3,5,70\n" + "4,5," + - "80\n" + "5,1,102\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - - final class AddValuesMapper extends MapFunction[(Long, Long), Long] { - @throws(classOf[Exception]) - def map(tuple: (Long, Long)): Long = { - tuple._1 + tuple._2 - } - } - - final class ProjectSourceAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] { - @throws(classOf[Exception]) - def map(edge: Edge[Long, Long]): (Long, Long) = { - (edge.getSource, edge.getValue) - } - } - - final class ProjectTargetAndValueMapper extends MapFunction[Edge[Long, Long], (Long, Long)] { - @throws(classOf[Exception]) - def map(edge: Edge[Long, Long]): (Long, Long) = { - (edge.getTarget, edge.getValue) - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala deleted file mode 100644 index 98ee8b6..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/JoinWithVerticesITCase.scala +++ /dev/null @@ -1,74 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.graph.scala.utils.VertexToTuple2Map -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class JoinWithVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends -MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testJoinWithVertexSet { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result: Graph[Long, Long, Long] = graph.joinWithVertices(graph.getVertices.map(new - VertexToTuple2Map[Long, Long]), new AddValuesMapper) - val res = result.getVertices.collect().toList - expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testJoinWithVertexSetSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val tupleSet = graph.getVertices.map(new VertexToTuple2Map[Long, Long]) - val result: Graph[Long, Long, Long] = graph.joinWithVertices[Long](tupleSet, - (originalvalue: Long, tuplevalue: Long) => originalvalue + tuplevalue) - val res = result.getVertices.collect().toList - expectedResult = "1,2\n" + "2,4\n" + "3,6\n" + "4,8\n" + "5,10\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - - final class AddValuesMapper extends MapFunction[(Long, Long), Long] { - @throws(classOf[Exception]) - def map(tuple: (Long, Long)): Long = { - tuple._1 + tuple._2 - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala deleted file mode 100644 index bdfd569..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapEdgesITCase.scala +++ /dev/null @@ -1,81 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala._ -import org.apache.flink.graph.Edge -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class MapEdgesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends -MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testWithSameValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapEdges(new AddOneMapper).getEdges.collect().toList - expectedResult = "1,2,13\n" + - "1,3,14\n" + "" + - "2,3,24\n" + - "3,4,35\n" + - "3,5,36\n" + - "4,5,46\n" + - "5,1,52\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithSameValueSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapEdges(edge => edge.getValue + 1) - .getEdges.collect().toList - expectedResult = "1,2,13\n" + - "1,3,14\n" + "" + - "2,3,24\n" + - "3,4,35\n" + - "3,5,36\n" + - "4,5,46\n" + - "5,1,52\n" - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - final class AddOneMapper extends MapFunction[Edge[Long, Long], Long] { - @throws(classOf[Exception]) - def map(edge: Edge[Long, Long]): Long = { - edge.getValue + 1 - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala deleted file mode 100644 index 2e51d90..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/MapVerticesITCase.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.common.functions.MapFunction -import org.apache.flink.api.scala._ -import org.apache.flink.graph.Vertex -import org.apache.flink.graph.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class MapVerticesITCase(mode: MultipleProgramsTestBase.TestExecutionMode) extends -MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testWithSameValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(new AddOneMapper).getVertices.collect().toList - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,6\n"; - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testWithSameValueSugar { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.mapVertices(vertex => vertex.getValue + 1).getVertices.collect().toList - expectedResult = "1,2\n" + - "2,3\n" + - "3,4\n" + - "4,5\n" + - "5,6\n"; - TestBaseUtils.compareResultAsTuples(res.asJava, expectedResult) - } - - final class AddOneMapper extends MapFunction[Vertex[Long, Long], Long] { - @throws(classOf[Exception]) - def map(vertex: Vertex[Long, Long]): Long = { - vertex.getValue + 1 - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala deleted file mode 100644 index dcd1deb..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnEdgesMethodsITCase.scala +++ /dev/null @@ -1,150 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.scala._ -import org.apache.flink.graph._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.graph.scala.{EdgesFunction, EdgesFunctionWithVertexValue, Graph} -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.apache.flink.util.Collector -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class ReduceOnEdgesMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) - extends MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testAllNeighborsWithValueGreaterThanFour { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour, - EdgeDirection.ALL).collect().toList - expectedResult = "(5,1)\n" + "(5,3)\n" + "(5,4)" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - - @Test - @throws(classOf[Exception]) - def testAllNeighbors { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.groupReduceOnEdges(new SelectNeighbors, EdgeDirection.ALL) - .collect().toList - expectedResult = "(1,2)\n" + "(1,3)\n" + "(1,5)\n" + "(2,1)\n" + "(2,3)\n" + - "(3,1)\n" + "(3,2)\n" + "(3,4)\n" + "(3,5)\n" + "(4,3)\n" + "(4,5)\n" + - "(5,1)\n" + "(5,3)\n" + "(5,4)" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testLowestWeightOutNeighborNoValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new - SelectMinWeightNeighborNoValue, EdgeDirection.OUT) - val res = verticesWithLowestOutNeighbor.collect().toList - expectedResult = "(1,12)\n" + "(2,23)\n" + "(3,34)\n" + "(4,45)\n" + "(5,51)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testLowestWeightInNeighborNoValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val verticesWithLowestOutNeighbor: DataSet[(Long, Long)] = graph.reduceOnEdges(new - SelectMinWeightNeighborNoValue, EdgeDirection.IN) - val res = verticesWithLowestOutNeighbor.collect().toList - expectedResult = "(1,51)\n" + "(2,12)\n" + "(3,13)\n" + "(4,34)\n" + "(5,35)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testMaxWeightAllNeighbors { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val verticesWithMaxEdgeWeight: DataSet[(Long, Long)] = graph.reduceOnEdges(new - SelectMaxWeightNeighborNoValue, EdgeDirection.ALL) - val res = verticesWithMaxEdgeWeight.collect().toList - expectedResult = "(1,51)\n" + "(2,23)\n" + "(3,35)\n" + "(4,45)\n" + "(5,51)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - final class SelectNeighborsValueGreaterThanFour extends EdgesFunctionWithVertexValue[Long, - Long, Long, (Long, Long)] { - @throws(classOf[Exception]) - override def iterateEdges(v: Vertex[Long, Long], edges: Iterable[Edge[Long, Long]], out: - Collector[(Long, Long)]): Unit = { - for (edge <- edges) { - if (v.getValue > 4) { - if (v.getId == edge.getTarget) { - out.collect((v.getId, edge.getSource)) - } - else { - out.collect((v.getId, edge.getTarget)) - } - } - } - } - } - - final class SelectNeighbors extends EdgesFunction[Long, Long, (Long, Long)] { - @throws(classOf[Exception]) - override def iterateEdges(edges: Iterable[(Long, Edge[Long, Long])], out: Collector[ - (Long, Long)]) { - for (edge <- edges) { - if (edge._1.equals(edge._2.getTarget)) { - out.collect(new Tuple2[Long, Long](edge._1, edge._2.getSource)) - } - else { - out.collect(new Tuple2[Long, Long](edge._1, edge._2.getTarget)) - } - } - } - } - - final class SelectMinWeightNeighborNoValue extends ReduceEdgesFunction[Long] { - override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = { - Math.min(firstEdgeValue, secondEdgeValue) - } - } - - final class SelectMaxWeightNeighborNoValue extends ReduceEdgesFunction[Long] { - override def reduceEdges(firstEdgeValue: Long, secondEdgeValue: Long): Long = { - Math.max(firstEdgeValue, secondEdgeValue) - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala b/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala deleted file mode 100644 index aef5493..0000000 --- a/flink-staging/flink-gelly-scala/src/test/scala/org/apache/flink/graph/scala/test/operations/ReduceOnNeighborMethodsITCase.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph.scala.test.operations - -import org.apache.flink.api.scala._ -import org.apache.flink.graph.scala.test.TestGraphUtils -import org.apache.flink.graph.scala.{NeighborsFunctionWithVertexValue, _} -import org.apache.flink.graph.{Edge, EdgeDirection, ReduceNeighborsFunction, Vertex} -import org.apache.flink.test.util.{MultipleProgramsTestBase, TestBaseUtils} -import org.apache.flink.util.Collector -import org.junit.rules.TemporaryFolder -import org.junit.runner.RunWith -import org.junit.runners.Parameterized -import org.junit.{After, Before, Rule, Test} -import _root_.scala.collection.JavaConverters._ - -@RunWith(classOf[Parameterized]) -class ReduceOnNeighborMethodsITCase(mode: MultipleProgramsTestBase.TestExecutionMode) - extends MultipleProgramsTestBase(mode) { - - private var expectedResult: String = null - - @Test - @throws(classOf[Exception]) - def testSumOfAllNeighborsNoValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.ALL) - .collect().toList - expectedResult = "(1,10)\n" + "(2,4)\n" + "(3,12)\n" + "(4,8)\n" + "(5,8)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testSumOfOutNeighborsNoValue { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val res = graph.reduceOnNeighbors(new SumNeighbors, EdgeDirection.OUT).collect().toList - expectedResult = "(1,5)\n" + "(2,3)\n" + "(3,9)\n" + "(4,5)\n" + "(5,1)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testSumOfAllNeighbors { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnNeighbors(new SumAllNeighbors, EdgeDirection.ALL) - val res = result.collect().toList - expectedResult = "(1,11)\n" + "(2,6)\n" + "(3,15)\n" + "(4,12)\n" + "(5,13)\n" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - @Test - @throws(classOf[Exception]) - def testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo = { - val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment - val graph: Graph[Long, Long, Long] = Graph.fromDataSet(TestGraphUtils - .getLongLongVertexData(env), TestGraphUtils.getLongLongEdgeData(env), env) - val result = graph.groupReduceOnNeighbors(new - SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo, EdgeDirection.IN) - val res = result.collect().toList - expectedResult = "(3,59)\n" + "(3,118)\n" + "(4,204)\n" + "(4,102)\n" + "(5,570)\n" + "(5,285)" - TestBaseUtils.compareResultAsText(res.asJava, expectedResult) - } - - final class SumNeighbors extends ReduceNeighborsFunction[Long] { - override def reduceNeighbors(firstNeighbor: Long, secondNeighbor: Long): Long = { - firstNeighbor + secondNeighbor - } - } - - final class SumAllNeighbors extends NeighborsFunctionWithVertexValue[Long, Long, Long, (Long, - Long)] { - @throws(classOf[Exception]) - def iterateNeighbors(vertex: Vertex[Long, Long], neighbors: Iterable[(Edge[Long, Long], - Vertex[Long, Long])], out: Collector[(Long, Long)]) { - var sum: Long = 0 - for (neighbor <- neighbors) { - sum += neighbor._2.getValue - } - out.collect((vertex.getId, sum + vertex.getValue)) - } - } - - final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo extends - NeighborsFunction[Long, Long, Long, (Long, Long)] { - @throws(classOf[Exception]) - def iterateNeighbors(neighbors: Iterable[(Long, Edge[Long, Long], Vertex[Long, Long])], - out: Collector[(Long, Long)]) { - var sum: Long = 0 - var next: (Long, Edge[Long, Long], Vertex[Long, Long]) = null - val neighborsIterator: Iterator[(Long, Edge[Long, Long], Vertex[Long, Long])] = - neighbors.iterator - while (neighborsIterator.hasNext) { - next = neighborsIterator.next - sum += next._3.getValue * next._2.getValue - } - if (next._1 > 2) { - out.collect(new Tuple2[Long, Long](next._1, sum)) - out.collect(new Tuple2[Long, Long](next._1, sum * 2)) - } - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/pom.xml ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml deleted file mode 100644 index 9dce170..0000000 --- a/flink-staging/flink-gelly/pom.xml +++ /dev/null @@ -1,67 +0,0 @@ - - - - - 4.0.0 - - - org.apache.flink - flink-staging - 0.10-SNAPSHOT - .. - - - flink-gelly - flink-gelly - - jar - - - - org.apache.flink - flink-java - ${project.version} - - - org.apache.flink - flink-clients - ${project.version} - - - org.apache.flink - flink-test-utils - ${project.version} - test - - - org.apache.flink - flink-optimizer - ${project.version} - test-jar - test - - - com.google.guava - guava - ${guava.version} - - - http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java deleted file mode 100644 index d84badb..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -import org.apache.flink.api.java.tuple.Tuple3; - -/** - * An Edge represents a link between two {@link Vertex vertices}, - * the source and the target and can carry an attached value. - * For edges with no value, use {@link org.apache.flink.types.NullValue} as the value type. - * - * @param the key type for the sources and target vertices - * @param the edge value type - */ -public class Edge extends Tuple3{ - - private static final long serialVersionUID = 1L; - - public Edge(){} - - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; - } - - /** - * Reverses the direction of this Edge. - * @return a new Edge, where the source is the original Edge's target - * and the target is the original Edge's source. - */ - public Edge reverse() { - return new Edge(this.f1, this.f0, this.f2); - } - - public void setSource(K src) { - this.f0 = src; - } - - public K getSource() { - return this.f0; - } - - public void setTarget(K target) { - this.f1 = target; - } - - public K getTarget() { - return f1; - } - - public void setValue(V value) { - this.f2 = value; - } - - public V getValue() { - return f2; - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java deleted file mode 100644 index 0a055bb..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -/** - * The EdgeDirection is used to select a node's neighborhood - * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)}, - * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}, - * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}, - * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}, - * {@link Graph#reduceOnEdges(ReduceEdgesFunction, EdgeDirection)} and - * {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)} - * methods. - */ -public enum EdgeDirection { - IN, - OUT, - ALL -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java deleted file mode 100644 index bf1d6a2..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -import java.io.Serializable; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.util.Collector; - -/** - * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method. - * - * @param the vertex key type - * @param the edge value type - * @param the type of the return value - */ -public interface EdgesFunction extends Function, Serializable { - - void iterateEdges(Iterable>> edges, Collector out) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java deleted file mode 100644 index 0b0ab0e..0000000 --- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.graph; - -import java.io.Serializable; - -import org.apache.flink.api.common.functions.Function; -import org.apache.flink.util.Collector; - -/** - * Interface to be implemented by the function applied to a vertex neighborhood - * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)} - * method. - * - * @param the vertex key type - * @param the vertex value type - * @param the edge value type - * @param the type of the return value - */ -public interface EdgesFunctionWithVertexValue extends Function, Serializable { - - void iterateEdges(Vertex v, Iterable> edges, Collector out) throws Exception; -}