flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Different CoGroup behavior inside DeltaIteration
Date Mon, 16 Nov 2015 21:54:08 GMT

this is an artifact of how the solution set is internally implemented.
Usually, a CoGroup is executed using a sort-merge strategy, i.e., both
input are sorted, merged, and handed to the CoGroup function in a streaming
fashion. Both inputs are treated equally, and if one of both inputs does
not contain a key which is contained in the other input, the CoGroup
function is called with an empty iterator.

The solution set of a delta iteration is stored in a hash table (with only
one entry per key). When a solution set is coGrouped with another data set,
the other input is sorted and probed against the hash table. The solution
set iterator of the CoGroup function will contain one element if the hash
table contains an element and be empty if the hash table doesn't contain an
entry for the key. However, the hash table will not check that all elements
of the hash table have been looked-up in order to identify elements of the
solution set for which no corresponding element was present in the other
data set.

So, the CoGroup with a solution set works only in one direction as stated
in the documentation. This is kind of intended by the way the solution set
CoGroup is implemented, but we should definitely updated the documentation
to cover this case!

If you have a use case that requires a solution set CoGroup with the
missing behavior you should open a JIRA issue.
Otherwise it would be great if you could also open a JIRA issue to extend
the documentation.

Thank you, Fabian

2015-11-16 1:02 GMT+01:00 Truong Duc Kien <duckientruong@gmail.com>:

> Hi,
> When running CoGroup between the solution set and a different dataset
> inside a DeltaIteration, the CoGroupFunction only get called for items
> that exist in the other dataset, simillar to an inner join. This is not
> the documented behavior for CoGroup:
> If a DataSet has a group with no matching key in the other DataSet,
>> the CoGroupFunction is called with an empty group for the non-existing
>> group.
> The following code shows the problem.
> import org.apache.flink.api.scala._
> import org.apache.flink.util.Collector
> object CoGroupExample {
>   def coGroupFuntion(first: Iterator[(Int, Int)],
>                      second: Iterator[(Int, Int)],
>                      out: Collector[(Int, Int)]): Unit = {
>     if (second.hasNext) {
>       out.collect(second.next)
>     } else {
>       printf("Not in second set: %s\n", first.next)
>       println("These two lines doesn't appear when " +
>         "running cogroup on solution set")
>     }
>   }
>   def main(args: Array[String]): Unit = {
>     val env = ExecutionEnvironment.getExecutionEnvironment
>     env.getConfig.disableSysoutLogging()
>     val d1 = env.fromElements(
>       new Tuple2(1, 1),
>       new Tuple2(2, 1) ,
>       new Tuple2(3, 1)
>     )
>     d1.iterateDelta(d1, 1, Array{0}) {
>       (solutionSet, workSet) => {
>         val f = workSet.filter(_._1 != 1)
>         println("Cogroup on solution set with delta iteration")
>         val newSolutionSet = solutionSet.coGroup(f)
>           .where(0)
>           .equalTo(0)
>           .apply(coGroupFuntion _)
>         (newSolutionSet, newSolutionSet)
>       }
>     }.print()
>     println("Normal cogroup")
>     val d2 = d1.filter(_._1 != 1)
>     d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
>   }
> }
> Is this the expected behavior or should I file a bug about this ?
> Best regards,
> Kien Truong

View raw message