Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 48D71200BD9 for ; Fri, 9 Dec 2016 21:12:14 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 476C1160B1D; Fri, 9 Dec 2016 20:12:14 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 96657160AFD for ; Fri, 9 Dec 2016 21:12:12 +0100 (CET) Received: (qmail 82330 invoked by uid 500); 9 Dec 2016 20:12:11 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 82321 invoked by uid 99); 9 Dec 2016 20:12:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 09 Dec 2016 20:12:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 9324CE02A3; Fri, 9 Dec 2016 20:12:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: greg@apache.org To: commits@flink.apache.org Message-Id: <9d8c18b9b0844e7b8bcd59cda5c53282@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: flink git commit: [FLINK-4646] [gelly] Add BipartiateGraph Date: Fri, 9 Dec 2016 20:12:11 +0000 (UTC) archived-at: Fri, 09 Dec 2016 20:12:14 -0000 Repository: flink Updated Branches: refs/heads/master 9ab494a73 -> 365cd987c [FLINK-4646] [gelly] Add BipartiateGraph This closes #2564 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/365cd987 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/365cd987 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/365cd987 Branch: refs/heads/master Commit: 365cd987cc90fa9b399acbb4fe0af3f995f604e3 Parents: 9ab494a Author: Ivan Mushketyk Authored: Tue Sep 27 23:14:09 2016 +0100 Committer: Greg Hogan Committed: Fri Dec 9 13:58:50 2016 -0500 ---------------------------------------------------------------------- .../main/java/org/apache/flink/graph/Edge.java | 12 +- .../flink/graph/bipartite/BipartiteEdge.java | 68 ++++ .../flink/graph/bipartite/BipartiteGraph.java | 319 +++++++++++++++++++ .../flink/graph/bipartite/Projection.java | 76 +++++ .../graph/bipartite/BipartiteEdgeTest.java | 70 ++++ .../graph/bipartite/BipartiteGraphTest.java | 146 +++++++++ .../flink/graph/bipartite/ProjectionTest.java | 76 +++++ .../apache/flink/graph/generator/TestUtils.java | 11 +- .../apache/flink/test/util/TestBaseUtils.java | 74 ++--- 9 files changed, 807 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java index 2bcce29..8e5f916 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Edge.java @@ -34,10 +34,10 @@ public class Edge extends Tuple3{ public Edge(){} - public Edge(K src, K trg, V val) { - this.f0 = src; - this.f1 = trg; - this.f2 = val; + public Edge(K source, K target, V value) { + this.f0 = source; + this.f1 = target; + this.f2 = value; } /** @@ -49,8 +49,8 @@ public class Edge extends Tuple3{ return new Edge<>(this.f1, this.f0, this.f2); } - public void setSource(K src) { - this.f0 = src; + public void setSource(K source) { + this.f0 = source; } public K getSource() { http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java new file mode 100644 index 0000000..167e4ec --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteEdge.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.graph.Edge; + +/** + * A BipartiteEdge represents a link between top and bottom vertices + * in a {@link BipartiteGraph}. It is generalized form of {@link Edge} + * where the source and target vertex IDs can be of different types. + * + * @param the key type of the top vertices + * @param the key type of the bottom vertices + * @param the edge value type + */ +public class BipartiteEdge extends Tuple3 { + + private static final long serialVersionUID = 1L; + + public BipartiteEdge() {} + + public BipartiteEdge(KT topId, KB bottomId, EV value) { + this.f0 = topId; + this.f1 = bottomId; + this.f2 = value; + } + + public KT getTopId() { + return this.f0; + } + + public void setTopId(KT topId) { + this.f0 = topId; + } + + public KB getBottomId() { + return this.f1; + } + + public void setBottomId(KB bottomId) { + this.f1 = bottomId; + } + + public EV getValue() { + return this.f2; + } + + public void setValue(EV value) { + this.f2 = value; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java new file mode 100644 index 0000000..b325103 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/BipartiteGraph.java @@ -0,0 +1,319 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.api.common.functions.FlatJoinFunction; +import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst; +import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.graph.Edge; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.util.Collector; + +/** + * The vertices of a bipartite graph are divided into two disjoint sets, referenced by the names "top" and "bottom". + * Top and bottom vertices with the same key value represent distinct entities and must be specially handled + * when projecting to a simple {@link Graph}. Edges can only exist between a pair of vertices from different vertices + * sets. E.g. there can be no vertices between a pair of top vertices. + * + *

Bipartite graphs are useful to represent graphs with two sets of objects, like researchers and their publications, + * where an edge represents that a particular publication was authored by a particular author. + * + *

Bipartite interface is different from {@link Graph} interface, so to apply algorithms that work on a regular graph + * a bipartite graph should be first converted into a {@link Graph} instance. This can be achieved by using + * the projection methods. + * + * @param the key type of top vertices + * @param the key type of bottom vertices + * @param the vertex value type of top vertices + * @param the vertex value type of bottom vertices + * @param the edge value type + */ +public class BipartiteGraph { + + private final ExecutionEnvironment context; + private final DataSet> topVertices; + private final DataSet> bottomVertices; + private final DataSet> edges; + + private BipartiteGraph( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + this.topVertices = topVertices; + this.bottomVertices = bottomVertices; + this.edges = edges; + this.context = context; + } + + /** + * Create bipartite graph from datasets. + * + * @param topVertices dataset of top vertices in the graph + * @param bottomVertices dataset of bottom vertices in the graph + * @param edges dataset of edges between vertices + * @param context Flink execution context + * @return new bipartite graph created from provided datasets + */ + public static BipartiteGraph fromDataSet( + DataSet> topVertices, + DataSet> bottomVertices, + DataSet> edges, + ExecutionEnvironment context) { + return new BipartiteGraph<>(topVertices, bottomVertices, edges, context); + } + + /** + * Get dataset with top vertices. + * + * @return dataset with top vertices + */ + public DataSet> getTopVertices() { + return topVertices; + } + + /** + * Get dataset with bottom vertices. + * + * @return dataset with bottom vertices + */ + public DataSet> getBottomVertices() { + return bottomVertices; + } + + /** + * Get dataset with graph edges. + * + * @return dataset with graph edges + */ + public DataSet> getEdges() { + return edges; + } + + /** + * Convert a bipartite graph into an undirected graph that contains only top vertices. An edge between two vertices + * in the new graph will exist only if the original bipartite graph contains a bottom vertex they are both + * connected to. + * + * The simple projection performs a single join and returns edges containing the bipartite edge values. + * + * Note: KT must override .equals(). This requirement may be removed in a future release. + * + * @return simple top projection of the bipartite graph + */ + public Graph> projectionTopSimple() { + DataSet>> newEdges = edges.join(edges) + .where(1) + .equalTo(1) + .with(new ProjectionTopSimple()) + .name("Simple top projection"); + + return Graph.fromDataSet(topVertices, newEdges, context); + } + + @ForwardedFieldsFirst("0; 2->2.0") + @ForwardedFieldsSecond("0->1; 2->2.1") + private static class ProjectionTopSimple + implements FlatJoinFunction, BipartiteEdge, Edge>> { + private Tuple2 edgeValues = new Tuple2<>(); + + private Edge> edge = new Edge<>(null, null, edgeValues); + + @Override + public void join(BipartiteEdge first, BipartiteEdge second, Collector>> out) + throws Exception { + if (!first.f0.equals(second.f0)) { + edge.f0 = first.f0; + edge.f1 = second.f0; + + edgeValues.f0 = first.f2; + edgeValues.f1 = second.f2; + + out.collect(edge); + } + } + } + + /** + * Convert a bipartite graph into an undirected graph that contains only bottom vertices. An edge between two + * vertices in the new graph will exist only if the original bipartite graph contains a top vertex they are both + * connected to. + * + * The simple projection performs a single join and returns edges containing the bipartite edge values. + * + * Note: KB must override .equals(). This requirement may be removed in a future release. + * + * @return simple bottom projection of the bipartite graph + */ + public Graph> projectionBottomSimple() { + DataSet>> newEdges = edges.join(edges) + .where(0) + .equalTo(0) + .with(new ProjectionBottomSimple()) + .name("Simple bottom projection"); + + return Graph.fromDataSet(bottomVertices, newEdges, context); + } + + @ForwardedFieldsFirst("1->0; 2->2.0") + @ForwardedFieldsSecond("1; 2->2.1") + private static class ProjectionBottomSimple + implements FlatJoinFunction, BipartiteEdge, Edge>> { + private Tuple2 edgeValues = new Tuple2<>(); + + private Edge> edge = new Edge<>(null, null, edgeValues); + + @Override + public void join(BipartiteEdge first, BipartiteEdge second, Collector>> out) + throws Exception { + if (!first.f1.equals(second.f1)) { + edge.f0 = first.f1; + edge.f1 = second.f1; + + edgeValues.f0 = first.f2; + edgeValues.f1 = second.f2; + + out.collect(edge); + } + } + } + + /** + * Convert a bipartite graph into a graph that contains only top vertices. An edge between two vertices in the new + * graph will exist only if the original bipartite graph contains at least one bottom vertex they both connect to. + * + * The full projection performs three joins and returns edges containing the the connecting vertex ID and value, + * both top vertex values, and both bipartite edge values. + * + * Note: KT must override .equals(). This requirement may be removed in a future release. + * + * @return full top projection of the bipartite graph + */ + public Graph> projectionTopFull() { + DataSet> edgesWithVertices = joinEdgeWithVertices(); + + DataSet>> newEdges = edgesWithVertices.join(edgesWithVertices) + .where(1) + .equalTo(1) + .with(new ProjectionTopFull()) + .name("Full top projection"); + + return Graph.fromDataSet(topVertices, newEdges, context); + } + + private DataSet> joinEdgeWithVertices() { + return edges + .join(topVertices, JoinHint.REPARTITION_HASH_SECOND) + .where(0) + .equalTo(0) + .projectFirst(0, 1, 2) + .>projectSecond(1) + .name("Edge with vertex") + .join(bottomVertices, JoinHint.REPARTITION_HASH_SECOND) + .where(1) + .equalTo(0) + .projectFirst(0, 1, 2, 3) + .>projectSecond(1) + .name("Edge with vertices"); + } + + @ForwardedFieldsFirst("0; 1->2.0; 2->2.4; 3->2.2; 4->2.1") + @ForwardedFieldsSecond("0->1; 2->2.5; 3->2.3") + private static class ProjectionTopFull + implements FlatJoinFunction, Tuple5, Edge>> { + private Projection projection = new Projection<>(); + + private Edge> edge = new Edge<>(null, null, projection); + + @Override + public void join(Tuple5 first, Tuple5 second, Collector>> out) + throws Exception { + if (!first.f0.equals(second.f0)) { + edge.f0 = first.f0; + edge.f1 = second.f0; + + projection.f0 = first.f1; + projection.f1 = first.f4; + projection.f2 = first.f3; + projection.f3 = second.f3; + projection.f4 = first.f2; + projection.f5 = second.f2; + + out.collect(edge); + } + } + } + + /** + * Convert a bipartite graph into a graph that contains only bottom vertices. An edge between two vertices in the + * new graph will exist only if the original bipartite graph contains at least one top vertex they both connect to. + * + * The full projection performs three joins and returns edges containing the the connecting vertex ID and value, + * both bottom vertex values, and both bipartite edge values. + * + * Note: KB must override .equals(). This requirement may be removed in a future release. + * + * @return full bottom projection of the bipartite graph + */ + public Graph> projectionBottomFull() { + DataSet> edgesWithVertices = joinEdgeWithVertices(); + + DataSet>> newEdges = edgesWithVertices.join(edgesWithVertices) + .where(0) + .equalTo(0) + .with(new ProjectionBottomFull()) + .name("Full bottom projection"); + + return Graph.fromDataSet(bottomVertices, newEdges, context); + } + + @ForwardedFieldsFirst("1->0; 2->2.4; 3->2.1; 4->2.2") + @ForwardedFieldsSecond("1; 2->2.5; 4->2.3") + private static class ProjectionBottomFull + implements FlatJoinFunction, Tuple5, Edge>> { + private Projection projection = new Projection<>(); + + private Edge> edge = new Edge<>(null, null, projection); + + @Override + public void join(Tuple5 first, Tuple5 second, Collector>> out) + throws Exception { + if (!first.f1.equals(second.f1)) { + edge.f0 = first.f1; + edge.f1 = second.f1; + + projection.f0 = first.f0; + projection.f1 = first.f3; + projection.f2 = first.f4; + projection.f3 = second.f4; + projection.f4 = first.f2; + projection.f5 = second.f2; + + out.collect(edge); + } + } + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java new file mode 100644 index 0000000..95a9cf6 --- /dev/null +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/bipartite/Projection.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.graph.Vertex; + +/** + * The edge value of a full bipartite projection contains: + *

    + *
  • the ID and vertex value of the connecting vertex
  • + *
  • the vertex value for the source and target vertex
  • + *
  • both edge values from the bipartite edges
  • + *
+ * + * @param the key type of connecting vertices + * @param the vertex value type of connecting vertices + * @param the vertex value type of top or bottom vertices + * @param the edge value type from bipartite edges + */ +public class Projection extends Tuple6 { + + public Projection() {} + + public Projection( + Vertex connectingVertex, + VV sourceVertexValue, VV targetVertexValue, + EV sourceEdgeValue, EV targetEdgeValue) { + this.f0 = connectingVertex.getId(); + this.f1 = connectingVertex.getValue(); + this.f2 = sourceVertexValue; + this.f3 = targetVertexValue; + this.f4 = sourceEdgeValue; + this.f5 = targetEdgeValue; + } + + public KC getIntermediateVertexId() { + return this.f0; + } + + public VVC getIntermediateVertexValue() { + return this.f1; + } + + public VV getsSourceVertexValue() { + return this.f2; + } + + public VV getTargetVertexValue() { + return this.f3; + } + + public EV getSourceEdgeValue() { + return this.f4; + } + + public EV getTargetEdgeValue() { + return this.f5; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java new file mode 100644 index 0000000..ad0106b --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteEdgeTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.graph.bipartite.BipartiteEdge; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class BipartiteEdgeTest { + + private static final int BOTTOM_ID = 0; + private static final int TOP_ID = 1; + private static final String VALUE = "value"; + + private final BipartiteEdge edge = createEdge(); + + @Test + public void testGetBottomId() { + assertEquals(BOTTOM_ID, (long) edge.getBottomId()); + } + + @Test + public void testGetTopId() { + assertEquals(TOP_ID, (long) edge.getTopId()); + } + + @Test + public void testGetValue() { + assertEquals(VALUE, edge.getValue()); + } + + @Test + public void testSetBottomId() { + edge.setBottomId(100); + assertEquals(100, (long) edge.getBottomId()); + } + + @Test + public void testSetTopId() { + edge.setTopId(100); + assertEquals(100, (long) edge.getTopId()); + } + + @Test + public void testSetValue() { + edge.setValue("newVal"); + assertEquals("newVal", edge.getValue()); + } + + private BipartiteEdge createEdge() { + return new BipartiteEdge<>(TOP_ID, BOTTOM_ID, VALUE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java new file mode 100644 index 0000000..366cf8e --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/BipartiteGraphTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.graph.Graph; +import org.apache.flink.graph.Vertex; +import org.apache.flink.test.util.TestBaseUtils; +import org.junit.Test; + +import java.util.Arrays; + +import static org.apache.flink.graph.generator.TestUtils.compareGraph; +import static org.junit.Assert.assertEquals; + +public class BipartiteGraphTest { + + @Test + public void testGetTopVertices() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + + assertEquals( + Arrays.asList( + new Vertex<>(4, "top4"), + new Vertex<>(5, "top5"), + new Vertex<>(6, "top6")), + bipartiteGraph.getTopVertices().collect()); + } + + @Test + public void testGetBottomVertices() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + + assertEquals( + Arrays.asList( + new Vertex<>(1, "bottom1"), + new Vertex<>(2, "bottom2"), + new Vertex<>(3, "bottom3")), + bipartiteGraph.getBottomVertices().collect()); + } + + @Test + public void testSimpleTopProjection() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + Graph> graph = bipartiteGraph.projectionTopSimple(); + + compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5"); + + String expected = + "(5,4,(5-1,4-1))\n" + + "(4,5,(4-1,5-1))\n" + + "(6,5,(6-2,5-2))\n" + + "(5,6,(5-2,6-2))"; + TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected); + } + + @Test + public void testSimpleBottomProjection() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + Graph> graph = bipartiteGraph.projectionBottomSimple(); + + compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2"); + + String expected = + "(3,2,(6-3,6-2))\n" + + "(2,3,(6-2,6-3))\n" + + "(2,1,(5-2,5-1))\n" + + "(1,2,(5-1,5-2))"; + TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected); + } + + @Test + public void testFullTopProjection() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + Graph> graph = bipartiteGraph.projectionTopFull(); + + graph.getEdges().print(); + compareGraph(graph, "4; 5; 6", "5,4; 4,5; 5,6; 6,5"); + + String expected = + "(5,4,(1,bottom1,top5,top4,5-1,4-1))\n" + + "(4,5,(1,bottom1,top4,top5,4-1,5-1))\n" + + "(6,5,(2,bottom2,top6,top5,6-2,5-2))\n" + + "(5,6,(2,bottom2,top5,top6,5-2,6-2))"; + TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected); + } + + @Test + public void testFullBottomProjection() throws Exception { + BipartiteGraph bipartiteGraph = createBipartiteGraph(); + Graph> graph = bipartiteGraph.projectionBottomFull(); + + compareGraph(graph, "1; 2; 3", "1,2; 2,1; 2,3; 3,2"); + + String expected = + "(3,2,(6,top6,bottom3,bottom2,6-3,6-2))\n" + + "(2,3,(6,top6,bottom2,bottom3,6-2,6-3))\n" + + "(2,1,(5,top5,bottom2,bottom1,5-2,5-1))\n" + + "(1,2,(5,top5,bottom1,bottom2,5-1,5-2))"; + TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expected); + } + + private BipartiteGraph createBipartiteGraph() { + ExecutionEnvironment executionEnvironment = ExecutionEnvironment.createCollectionsEnvironment(); + + DataSet> topVertices = executionEnvironment.fromCollection(Arrays.asList( + new Vertex<>(4, "top4"), + new Vertex<>(5, "top5"), + new Vertex<>(6, "top6") + )); + + DataSet> bottomVertices = executionEnvironment.fromCollection(Arrays.asList( + new Vertex<>(1, "bottom1"), + new Vertex<>(2, "bottom2"), + new Vertex<>(3, "bottom3") + )); + + DataSet> edges = executionEnvironment.fromCollection(Arrays.asList( + new BipartiteEdge<>(4, 1, "4-1"), + new BipartiteEdge<>(5, 1, "5-1"), + new BipartiteEdge<>(5, 2, "5-2"), + new BipartiteEdge<>(6, 2, "6-2"), + new BipartiteEdge<>(6, 3, "6-3") + )); + + return BipartiteGraph.fromDataSet(topVertices, bottomVertices, edges, executionEnvironment); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java new file mode 100644 index 0000000..3aafe64 --- /dev/null +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/bipartite/ProjectionTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.graph.bipartite; + +import org.apache.flink.graph.Vertex; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +public class ProjectionTest { + + private static final int ID = 10; + + private static final String VERTEX_VALUE = "vertex-value"; + private static final String SOURCE_EDGE_VALUE = "source-edge-value"; + private static final String TARGET_EDGE_VALUE = "target-edge-value"; + private static final String SOURCE_VERTEX_VALUE = "source-vertex-value"; + private static final String TARGET_VERTEX_VALUE = "target-vertex-value"; + + private Projection projection = createProjection(); + + @Test + public void testIntermediateVertexGetId() { + assertEquals(Integer.valueOf(ID), projection.getIntermediateVertexId()); + } + + @Test + public void testGetIntermediateVertexValue() { + assertEquals(VERTEX_VALUE, projection.getIntermediateVertexValue()); + } + + @Test + public void testGetSourceEdgeValue() { + assertEquals(SOURCE_EDGE_VALUE, projection.getSourceEdgeValue()); + } + + @Test + public void testGetTargetEdgeValue() { + assertEquals(TARGET_EDGE_VALUE, projection.getTargetEdgeValue()); + } + + @Test + public void testGetSourceVertexValue() { + assertEquals(SOURCE_VERTEX_VALUE, projection.getsSourceVertexValue()); + } + + @Test + public void testGetTargetVertexValue() { + assertEquals(TARGET_VERTEX_VALUE, projection.getTargetVertexValue()); + } + + private Projection createProjection() { + return new Projection<>( + new Vertex<>(ID, VERTEX_VALUE), + SOURCE_VERTEX_VALUE, + TARGET_VERTEX_VALUE, + SOURCE_EDGE_VALUE, + TARGET_EDGE_VALUE); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java index 3ea5a44..a302a30 100644 --- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java +++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/TestUtils.java @@ -50,7 +50,12 @@ public final class TestUtils { */ public static void compareGraph(Graph graph, String expectedVertices, String expectedEdges) throws Exception { - // Vertices + compareVertices(graph, expectedVertices); + compareEdges(graph, expectedEdges); + } + + private static void compareVertices(Graph graph, String expectedVertices) + throws Exception { if (expectedVertices != null) { List resultVertices = new ArrayList<>(); @@ -60,8 +65,10 @@ public final class TestUtils { TestBaseUtils.compareResultAsText(resultVertices, expectedVertices.replaceAll("\\s","").replace(";", "\n")); } + } - // Edges + private static void compareEdges(Graph graph, String expectedEdges) + throws Exception { if (expectedEdges != null) { List resultEdges = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/flink/blob/365cd987/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java ---------------------------------------------------------------------- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java index 804b3d4..5e15076 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestBaseUtils.java @@ -18,17 +18,12 @@ package org.apache.flink.test.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import akka.actor.ActorRef; import akka.dispatch.Futures; import akka.pattern.Patterns; import akka.util.Timeout; - import org.apache.commons.io.FileUtils; import org.apache.commons.io.IOUtils; - import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -37,14 +32,10 @@ import org.apache.flink.core.testutils.CommonTestUtils; import org.apache.flink.runtime.messages.TaskManagerMessages; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.util.TestLogger; - import org.apache.hadoop.fs.FileSystem; - import org.junit.Assert; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import scala.concurrent.Await; import scala.concurrent.ExecutionContext; import scala.concurrent.ExecutionContext$; @@ -77,6 +68,9 @@ import java.util.concurrent.TimeUnit; import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestBaseUtils extends TestLogger { private static final Logger LOG = LoggerFactory.getLogger(TestBaseUtils.class); @@ -92,32 +86,32 @@ public class TestBaseUtils extends TestLogger { public static FiniteDuration DEFAULT_TIMEOUT = new FiniteDuration(DEFAULT_AKKA_ASK_TIMEOUT, TimeUnit.SECONDS); // ------------------------------------------------------------------------ - + protected static File logDir; protected TestBaseUtils(){ verifyJvmOptions(); } - + private static void verifyJvmOptions() { long heap = Runtime.getRuntime().maxMemory() >> 20; Assert.assertTrue("Insufficient java heap space " + heap + "mb - set JVM option: -Xmx" + MINIMUM_HEAP_SIZE_MB + "m", heap > MINIMUM_HEAP_SIZE_MB - 50); } - - + + public static LocalFlinkMiniCluster startCluster( int numTaskManagers, int taskManagerNumSlots, boolean startWebserver, boolean startZooKeeper, boolean singleActorSystem) throws Exception { - + Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, taskManagerNumSlots); - + config.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, startWebserver); if (startZooKeeper) { @@ -146,7 +140,7 @@ public class TestBaseUtils extends TestLogger { config.setInteger(ConfigConstants.JOB_MANAGER_WEB_PORT_KEY, 8081); config.setString(ConfigConstants.JOB_MANAGER_WEB_LOG_PATH_KEY, logFile.toString()); - + config.setString(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, logFile.toString()); LocalFlinkMiniCluster cluster = new LocalFlinkMiniCluster(config, singleActorSystem); @@ -164,7 +158,7 @@ public class TestBaseUtils extends TestLogger { if (executor != null) { int numUnreleasedBCVars = 0; int numActiveConnections = 0; - + if (executor.running()) { List tms = executor.getTaskManagersAsJava(); List> bcVariableManagerResponseFutures = new ArrayList<>(); @@ -249,7 +243,7 @@ public class TestBaseUtils extends TestLogger { } return readers; } - + public static BufferedInputStream[] getResultInputStream(String resultPath) throws IOException { return getResultInputStream(resultPath, new String[]{}); } @@ -268,13 +262,13 @@ public class TestBaseUtils extends TestLogger { readAllResultLines(target, resultPath, new String[]{}); } - public static void readAllResultLines(List target, String resultPath, String[] excludePrefixes) + public static void readAllResultLines(List target, String resultPath, String[] excludePrefixes) throws IOException { - + readAllResultLines(target, resultPath, excludePrefixes, false); } - public static void readAllResultLines(List target, String resultPath, + public static void readAllResultLines(List target, String resultPath, String[] excludePrefixes, boolean inOrderOfFiles) throws IOException { final BufferedReader[] readers = getResultReader(resultPath, excludePrefixes, inOrderOfFiles); @@ -453,14 +447,14 @@ public class TestBaseUtils extends TestLogger { public static void compareOrderedResultAsText(List result, String expected, boolean asTuples) { compareResult(result, expected, asTuples, false); } - + private static void compareResult(List result, String expected, boolean asTuples, boolean sort) { String[] expectedStrings = expected.split("\n"); String[] resultStrings = new String[result.size()]; - + for (int i = 0; i < resultStrings.length; i++) { T val = result.get(i); - + if (asTuples) { if (val instanceof Tuple) { Tuple t = (Tuple) val; @@ -480,19 +474,25 @@ public class TestBaseUtils extends TestLogger { resultStrings[i] = (val == null) ? "null" : val.toString(); } } - - assertEquals("Wrong number of elements result", expectedStrings.length, resultStrings.length); if (sort) { Arrays.sort(expectedStrings); Arrays.sort(resultStrings); } - + + // Include content of both arrays to provide more context in case of a test failure + String msg = String.format( + "Different elements in arrays: expected %d elements and received %d\n expected: %s\n received: %s", + expectedStrings.length, resultStrings.length, + Arrays.toString(expectedStrings), Arrays.toString(resultStrings)); + + assertEquals(msg, expectedStrings.length, resultStrings.length); + for (int i = 0; i < expectedStrings.length; i++) { - assertEquals(expectedStrings[i], resultStrings[i]); + assertEquals(msg, expectedStrings[i], resultStrings[i]); } } - + // -------------------------------------------------------------------------------------------- // Comparison methods for tests using sample // -------------------------------------------------------------------------------------------- @@ -523,7 +523,7 @@ public class TestBaseUtils extends TestLogger { // -------------------------------------------------------------------------------------------- // Miscellaneous helper methods // -------------------------------------------------------------------------------------------- - + protected static Collection toParameterList(Configuration ... testConfigs) { ArrayList configs = new ArrayList<>(); for (Configuration testConfig : testConfigs) { @@ -560,7 +560,7 @@ public class TestBaseUtils extends TestLogger { System.err.println("Failed to delete file " + f.getAbsolutePath()); } } - + public static String constructTestPath(Class forClass, String folder) { // we create test path that depends on class to prevent name clashes when two tests // create temp files with the same name @@ -571,7 +571,7 @@ public class TestBaseUtils extends TestLogger { path += (forClass.getName() + "-" + folder); return path; } - + public static String constructTestURI(Class forClass, String folder) { return new File(constructTestPath(forClass, folder)).toURI().toString(); } @@ -597,7 +597,7 @@ public class TestBaseUtils extends TestLogger { return IOUtils.toString(is, connection.getContentEncoding() != null ? connection.getContentEncoding() : "UTF-8"); } - + public static class TupleComparator implements Comparator { @Override @@ -612,7 +612,7 @@ public class TestBaseUtils extends TestLogger { for (int i = 0; i < o1.getArity(); i++) { Object val1 = o1.getField(i); Object val2 = o2.getField(i); - + int cmp; if (val1 != null && val2 != null) { cmp = compareValues(val1, val2); @@ -620,16 +620,16 @@ public class TestBaseUtils extends TestLogger { else { cmp = val1 == null ? (val2 == null ? 0 : -1) : 1; } - + if (cmp != 0) { return cmp; } } - + return 0; } } - + @SuppressWarnings("unchecked") private static > int compareValues(Object o1, Object o2) { if (o1 instanceof Comparable && o2 instanceof Comparable) {