flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vasiliki Kalavri <vasilikikala...@gmail.com>
Subject Re: Fast strategy for intersect
Date Wed, 28 Oct 2015 17:45:20 GMT
Hi Martin,

isn't finding the intersection of edges enough in this case?
And assuming there are no duplicate edges, I believe a join should do the
trick.

Cheers,
-Vasia.

On 28 October 2015 at 13:15, Martin Junghanns <m.junghanns@mailbox.org>
wrote:

> Hi all!
>
> While working on FLINK-2905, I was wondering what a good (and fast) way to
> compute the intersect between two data sets (Gelly vertices in my case)
> with unknown size would be.
>
> I came up with three ways to solve this:
>
> Consider two sets:
>
> DataSet<Vertex<K, VV>> verticesLeft =  this.getVertices();
> DataSet<Vertex<K, VV>> verticesRight = graph.getVertices();
>
> Way 1 (join)
>
> intersectVertices = verticesLeft.join(verticesRight)
>  .where(0)
>  .equalTo(0)
>  .with(new JoinFunction<Vertex<K, VV>, /* .. * ./>() {
>   @Override
>   public Vertex<K, VV> join(Vertex<K, VV> first, Vertex<K, VV> second)
>    throws Exception {
>     return first;
>   }
> });
>
> Way 2 (coGroup)
>
> intersectVertices = verticesLeft.coGroup(verticesRight)
>  .where(0)
>  .equalTo(0)
>  .with(new CoGroupFunction<Vertex<K, VV>, /* .. */ >() {
>   @Override
>   public void coGroup(Iterable<Vertex<K, VV>> first,
>         Iterable<Vertex<K, VV>> second,
>         Collector<Vertex<K, VV>> out) throws Exception {
>    Iterator<Vertex<K, VV>> leftIt = first.iterator();
>    Iterator<Vertex<K, VV>> rightIt = second.iterator();
>    if (leftIt.hasNext() && rightIt.hasNext()) {
>     out.collect(leftIt.next());
>    }
>   }
>  });
>
> Way 3 (union + groupBy + aggregate)
>
> intersectVertices = verticesLeft.union(verticesRight)
>  .map(new MapFunction<Vertex<K, VV>, Tuple3<K, VV, Integer>>() {
>   @Override
>   public Tuple3<K, VV, Integer> map(Vertex<K, VV> vertex)
>         throws Exception {
>    return new Tuple3<>(vertex.f0, vertex.f1, 1);
>   }
>  }).withForwardedFields("f0;f1")
>  .groupBy(0) // vertex id
>  .aggregate(Aggregations.SUM, 2)
>  .filter(new FilterFunction<Tuple3<K, VV, Integer>>() {
>   @Override
>   public boolean filter(Tuple3<K, VV, Integer> value) {
>    return value.f2 == 2;
>   }
>  })
>  .map(new MapFunction<Tuple3<K, VV, Integer>, Vertex<K, VV>>() {
>   @Override
>   public Vertex<K, VV> map(Tuple3<K, VV, Integer> vertexWithAggregate) {
>    return new Vertex<>(vertexWithAggregate.f0, vertexWithAggregate.f1);
>   }
>  }).withForwardedFields("f0;f1");
>
> Thanks for your input.
>
> Best,
>
> Martin
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message