flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-2254) Add Bipartite Graph Support for Gelly
Date Mon, 05 Dec 2016 19:41:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-2254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15723161#comment-15723161
] 

ASF GitHub Bot commented on FLINK-2254:
---------------------------------------

Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90937167
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top
vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g.
there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects,
like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular
author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply
algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can
be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they
are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with
values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB,
EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they
both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple
with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB,
EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they
both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data
about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull()
{
    +
    +		// Join edges with bottom vertices to preserve edges values
    +		DataSet<Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>>
ds = edges.join(bottomVertices)
    +			.where(1)
    +			.equalTo(0);
    +
    +		DataSet<Edge<KB, Projection<KT, VVT, EV, VVB>>> newEdges = ds.join(ds)
    +			.where(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>,
KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>
value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			.equalTo(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>,
KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>
value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			// Filter all pairs with the same bottom id
    +			.filter(new FilterFunction<Tuple2<
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>>>() {
    +				@Override
    +				public boolean filter(Tuple2<
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> value)
throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1.f0;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			// Join with top vertices to preserve top vertices values
    +			.join(topVertices)
    --- End diff --
    
    Is it not more efficient to do the Cartesian join last if we assume that the |vertices|
<< |bipartite edges| << |simple edges|? The code can be reused between the full
projection functions: first join the bipartite edge with top vertex, then join the result
with the bottom vertex (perhaps using the Projection class below with NullValue where appropriate).


> Add Bipartite Graph Support for Gelly
> -------------------------------------
>
>                 Key: FLINK-2254
>                 URL: https://issues.apache.org/jira/browse/FLINK-2254
>             Project: Flink
>          Issue Type: New Feature
>          Components: Gelly
>    Affects Versions: 0.10.0
>            Reporter: Andra Lungu
>            Assignee: Ivan Mushketyk
>              Labels: requires-design-doc
>
> A bipartite graph is a graph for which the set of vertices can be divided into two disjoint
sets such that each edge having a source vertex in the first set, will have a target vertex
in the second set. We would like to support efficient operations for this type of graphs along
with a set of metrics(http://jponnela.com/web_documents/twomode.pdf). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message