flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Truong Duc Kien <duckientru...@gmail.com>
Subject Different CoGroup behavior inside DeltaIteration
Date Mon, 16 Nov 2015 00:02:25 GMT

When running CoGroup between the solution set and a different dataset
inside a DeltaIteration, the CoGroupFunction only get called for items
that exist in the other dataset, simillar to an inner join. This is not
the documented behavior for CoGroup:

> If a DataSet has a group with no matching key in the other DataSet,
> the CoGroupFunction is called with an empty group for the non-existing
> group.

The following code shows the problem.

import org.apache.flink.api.scala._
import org.apache.flink.util.Collector

object CoGroupExample {

   def coGroupFuntion(first: Iterator[(Int, Int)],
                      second: Iterator[(Int, Int)],
                      out: Collector[(Int, Int)]): Unit = {
     if (second.hasNext) {
     } else {
       printf("Not in second set: %s\n", first.next)
       println("These two lines doesn't appear when " +
         "running cogroup on solution set")

   def main(args: Array[String]): Unit = {
     val env = ExecutionEnvironment.getExecutionEnvironment

     val d1 = env.fromElements(
       new Tuple2(1, 1),
       new Tuple2(2, 1) ,
       new Tuple2(3, 1)

     d1.iterateDelta(d1, 1, Array{0}) {
       (solutionSet, workSet) => {
         val f = workSet.filter(_._1 != 1)
         println("Cogroup on solution set with delta iteration")
         val newSolutionSet = solutionSet.coGroup(f)
           .apply(coGroupFuntion _)
         (newSolutionSet, newSolutionSet)

     println("Normal cogroup")
     val d2 = d1.filter(_._1 != 1)
     d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()

Is this the expected behavior or should I file a bug about this ?

Best regards,
Kien Truong

View raw message