flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From greghogan <...@git.apache.org>
Subject [GitHub] flink pull request #2564: [FLINK-2254] Add BipartiateGraph class
Date Mon, 05 Dec 2016 19:40:22 GMT
Github user greghogan commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2564#discussion_r90937167
  
    --- Diff: flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/BipartiteGraph.java
---
    @@ -0,0 +1,364 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.graph;
    +
    +import org.apache.flink.api.common.functions.FilterFunction;
    +import org.apache.flink.api.common.functions.MapFunction;
    +import org.apache.flink.api.java.DataSet;
    +import org.apache.flink.api.java.ExecutionEnvironment;
    +import org.apache.flink.api.java.functions.KeySelector;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +
    +/**
    + *
    + * Bipartite graph is a graph whose vertices can be divided into two disjoint sets: top
vertices and bottom vertices.
    + * Edges can only exist between a pair of vertices from different vertices sets. E.g.
there can be no vertices between
    + * a pair of top vertices.
    + *
    + * <p>Bipartite graphs are useful to represent graphs with two sets of objects,
like researchers and their publications,
    + * where an edge represents that a particular publication was authored by a particular
author.
    + *
    + * <p>Bipartite interface is different from {@link Graph} interface, so to apply
algorithms that work on a regular graph
    + * a bipartite graph should be first converted into a {@link Graph} instance. This can
be achieved by using
    + * {@link BipartiteGraph#projectionTopSimple()} or
    + * {@link BipartiteGraph#projectionBottomFull()} methods.
    + *
    + * @param <KT> the key type of the top vertices
    + * @param <KB> the key type of the bottom vertices
    + * @param <VVT> the top vertices value type
    + * @param <VVB> the bottom vertices value type
    + * @param <EV> the edge value type
    + */
    +public class BipartiteGraph<KT, KB, VVT, VVB, EV> {
    +	private final ExecutionEnvironment context;
    +	private final DataSet<Vertex<KT, VVT>> topVertices;
    +	private final DataSet<Vertex<KB, VVB>> bottomVertices;
    +	private final DataSet<BipartiteEdge<KT, KB, EV>> edges;
    +
    +	private BipartiteGraph(
    +			DataSet<Vertex<KT, VVT>> topVertices,
    +			DataSet<Vertex<KB, VVB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		this.topVertices = topVertices;
    +		this.bottomVertices = bottomVertices;
    +		this.edges = edges;
    +		this.context = context;
    +	}
    +
    +	/**
    +	 * Create bipartite graph from datasets.
    +	 *
    +	 * @param topVertices  dataset of top vertices in the graph
    +	 * @param bottomVertices dataset of bottom vertices in the graph
    +	 * @param edges dataset of edges between vertices
    +	 * @param context Flink execution context
    +	 * @param <KT> the key type of the top vertices
    +	 * @param <KB> the key type of the bottom vertices
    +	 * @param <VT> the top vertices value type
    +	 * @param <VB> the bottom vertices value type
    +	 * @param <EV> the edge value type
    +	 * @return new bipartite graph created from provided datasets
    +	 */
    +	public static <KT, KB, VT, VB, EV> BipartiteGraph<KT, KB, VT, VB, EV> fromDataSet(
    +			DataSet<Vertex<KT, VT>> topVertices,
    +			DataSet<Vertex<KB, VB>> bottomVertices,
    +			DataSet<BipartiteEdge<KT, KB, EV>> edges,
    +			ExecutionEnvironment context) {
    +		return new BipartiteGraph<>(topVertices, bottomVertices, edges, context);
    +	}
    +
    +	/**
    +	 * Get dataset with top vertices.
    +	 *
    +	 * @return dataset with top vertices
    +	 */
    +	public DataSet<Vertex<KT, VVT>> getTopVertices() {
    +		return topVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with bottom vertices.
    +	 *
    +	 * @return dataset with bottom vertices
    +	 */
    +	public DataSet<Vertex<KB, VVB>> getBottomVertices() {
    +		return bottomVertices;
    +	}
    +
    +	/**
    +	 * Get dataset with graph edges.
    +	 *
    +	 * @return dataset with graph edges
    +	 */
    +	public DataSet<BipartiteEdge<KT, KB, EV>> getEdges() {
    +		return edges;
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only top vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a bottom vertex they
are both connected to.
    +	 *
    +	 * @return top projection of the bipartite graph where every edge contains a tuple with
values of two edges that
    +	 * connect top vertices in the original graph
    +	 */
    +	public Graph<KT, VVT, Tuple2<EV, EV>> projectionTopSimple() {
    +
    +		DataSet<Edge<KT, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(1)
    +			.equalTo(1)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getTopId().equals(edge2.getTopId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>, Edge<KT, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KT, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB,
EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getTopId(),
    +						value.f1.getTopId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(topVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they
both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a tuple
with values of two edges that
    +	 * connect bottom vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Tuple2<EV, EV>> projectionBottomSimple() {
    +
    +		DataSet<Edge<KB, Tuple2<EV, EV>>> newEdges =  edges.join(edges)
    +			.where(0)
    +			.equalTo(0)
    +			.filter(new FilterFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>>() {
    +				@Override
    +				public boolean filter(Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>> value) throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			.map(new MapFunction<Tuple2<BipartiteEdge<KT, KB, EV>, BipartiteEdge<KT,
KB, EV>>, Edge<KB, Tuple2<EV, EV>>>() {
    +				@Override
    +				public Edge<KB, Tuple2<EV, EV>> map(Tuple2<BipartiteEdge<KT, KB,
EV>, BipartiteEdge<KT, KB, EV>> value) throws Exception {
    +					return new Edge<>(
    +						value.f0.getBottomId(),
    +						value.f1.getBottomId(),
    +						new Tuple2<>(
    +							value.f0.getValue(),
    +							value.f1.getValue()));
    +				}
    +			});
    +
    +		return Graph.fromDataSet(bottomVertices, newEdges, context);
    +	}
    +
    +	/**
    +	 * Convert a bipartite into a graph that contains only bottom vertices. An edge between
two vertices in the new
    +	 * graph will exist only if the original bipartite graph contains a top vertex they
both connected to.
    +	 *
    +	 * @return bottom projection of the bipartite graph where every edge contains a data
about projected connection
    +	 * between vertices in the original graph
    +	 */
    +	public Graph<KB, VVB, Projection<KT, VVT, EV, VVB>> projectionBottomFull()
{
    +
    +		// Join edges with bottom vertices to preserve edges values
    +		DataSet<Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>>
ds = edges.join(bottomVertices)
    +			.where(1)
    +			.equalTo(0);
    +
    +		DataSet<Edge<KB, Projection<KT, VVT, EV, VVB>>> newEdges = ds.join(ds)
    +			.where(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>,
KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>
value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			.equalTo(new KeySelector<Tuple2<BipartiteEdge<KT,KB,EV>,Vertex<KB,VVB>>,
KT>() {
    +				@Override
    +				public KT getKey(Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>
value) throws Exception {
    +					return value.f0.getTopId();
    +				}
    +			})
    +			// Filter all pairs with the same bottom id
    +			.filter(new FilterFunction<Tuple2<
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +					Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>>>() {
    +				@Override
    +				public boolean filter(Tuple2<
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>,
    +						Tuple2<BipartiteEdge<KT, KB, EV>, Vertex<KB, VVB>>> value)
throws Exception {
    +					BipartiteEdge<KT, KB, EV> edge1 = value.f0.f0;
    +					BipartiteEdge<KT, KB, EV> edge2 = value.f1.f0;
    +					return !edge1.getBottomId().equals(edge2.getBottomId());
    +				}
    +			})
    +			// Join with top vertices to preserve top vertices values
    +			.join(topVertices)
    --- End diff --
    
    Is it not more efficient to do the Cartesian join last if we assume that the |vertices|
<< |bipartite edges| << |simple edges|? The code can be reused between the full
projection functions: first join the bipartite edge with top vertex, then join the result
with the bottom vertex (perhaps using the Projection class below with NullValue where appropriate).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message