flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Different CoGroup behavior inside DeltaIteration
Date Mon, 16 Nov 2015 22:02:42 GMT
It is actually very important that the co group in delta iterations works
like that.
If the CoGroup touched every element in the solution set, the "decreasing
work" effect would not happen.

The delta iterations are designed for cases where specific updates to the
solution are made, driven by the workset.
Driving an operator by solution set contents would result in a "bulk
iteration" style pattern, so the idea would be to use a proper bulk
iteration for those cases.

Does that make sense?



On Mon, Nov 16, 2015 at 10:54 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Hi,
>
> this is an artifact of how the solution set is internally implemented.
> Usually, a CoGroup is executed using a sort-merge strategy, i.e., both
> input are sorted, merged, and handed to the CoGroup function in a streaming
> fashion. Both inputs are treated equally, and if one of both inputs does
> not contain a key which is contained in the other input, the CoGroup
> function is called with an empty iterator.
>
> The solution set of a delta iteration is stored in a hash table (with only
> one entry per key). When a solution set is coGrouped with another data set,
> the other input is sorted and probed against the hash table. The solution
> set iterator of the CoGroup function will contain one element if the hash
> table contains an element and be empty if the hash table doesn't contain an
> entry for the key. However, the hash table will not check that all elements
> of the hash table have been looked-up in order to identify elements of the
> solution set for which no corresponding element was present in the other
> data set.
>
> So, the CoGroup with a solution set works only in one direction as stated
> in the documentation. This is kind of intended by the way the solution set
> CoGroup is implemented, but we should definitely updated the documentation
> to cover this case!
>
> If you have a use case that requires a solution set CoGroup with the
> missing behavior you should open a JIRA issue.
> Otherwise it would be great if you could also open a JIRA issue to extend
> the documentation.
>
> Thank you, Fabian
>
> 2015-11-16 1:02 GMT+01:00 Truong Duc Kien <duckientruong@gmail.com>:
>
>> Hi,
>>
>> When running CoGroup between the solution set and a different dataset
>> inside a DeltaIteration, the CoGroupFunction only get called for items
>> that exist in the other dataset, simillar to an inner join. This is not
>> the documented behavior for CoGroup:
>>
>> If a DataSet has a group with no matching key in the other DataSet,
>>> the CoGroupFunction is called with an empty group for the non-existing
>>> group.
>>>
>>
>> The following code shows the problem.
>>
>> import org.apache.flink.api.scala._
>> import org.apache.flink.util.Collector
>>
>> object CoGroupExample {
>>
>>   def coGroupFuntion(first: Iterator[(Int, Int)],
>>                      second: Iterator[(Int, Int)],
>>                      out: Collector[(Int, Int)]): Unit = {
>>     if (second.hasNext) {
>>       out.collect(second.next)
>>     } else {
>>       printf("Not in second set: %s\n", first.next)
>>       println("These two lines doesn't appear when " +
>>         "running cogroup on solution set")
>>     }
>>   }
>>
>>   def main(args: Array[String]): Unit = {
>>     val env = ExecutionEnvironment.getExecutionEnvironment
>>     env.getConfig.disableSysoutLogging()
>>
>>     val d1 = env.fromElements(
>>       new Tuple2(1, 1),
>>       new Tuple2(2, 1) ,
>>       new Tuple2(3, 1)
>>     )
>>
>>     d1.iterateDelta(d1, 1, Array{0}) {
>>       (solutionSet, workSet) => {
>>         val f = workSet.filter(_._1 != 1)
>>         println("Cogroup on solution set with delta iteration")
>>         val newSolutionSet = solutionSet.coGroup(f)
>>           .where(0)
>>           .equalTo(0)
>>           .apply(coGroupFuntion _)
>>         (newSolutionSet, newSolutionSet)
>>       }
>>     }.print()
>>
>>     println("Normal cogroup")
>>     val d2 = d1.filter(_._1 != 1)
>>     d1.coGroup(d2).where(0).equalTo(0).apply(coGroupFuntion _).print()
>>   }
>> }
>>
>>
>>
>> Is this the expected behavior or should I file a bug about this ?
>>
>> Best regards,
>> Kien Truong
>>
>
>

Mime
View raw message