pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xianda Ke (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators
Date Fri, 29 Apr 2016 03:10:12 GMT

    [ https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263459#comment-15263459

Xianda Ke commented on PIG-4876:

*The fragility & complexity of sharing a flag between operators*:

hi [~mohitsabharwal], the flag may be reset by input.hasNext() or input.next() if any proceding
operator reach at its end.
if (result == null) {
  // it does not work if called here. input.hasNext() or input.next() may reset the flag
  if (!input.hasNext()) {
     done = true;
  Tuple v1 = input.next();
  beginOfInput();   // it seems ok to insert the call here (?), need to be tested carefully

  if (!input.hasNext()) {
    // Another issue:       
    // in MR mode, flag was set after the last input tuple is consumed by getNextResult()
    // here, the flag was set before the last input is consumed in getNextResult()
    // it doesn't matter for MergeCogroup, CollectedGroup. These operators work. but it is
a problem for the current implemetation of POMergeJoin
    // the implemetation of POMergeJoin is tightly coupling with MR runPipeline. 
    // even beginOfInput() is called when attaching,  we still have to add some dirty code
in getNextResult() for MergeJoin
    // it seem that beginOfInput() is not good enough to solve all the problems. 

    // it is not so easy if we just move endOfInput() after getNextResult() in the case of
    // i have struggled with regression bugs for days.                       

  result = getNextResult();


Option (a): don't touch non-spark code by reseting the flag. 
>From my point of view, it is just a workaround. it seems there no need to add abstract
method beginOfInput().  
Well, I will have a try, and do some testing for beginOfInput()

Option (b): add a flag for each operator in spark mode. These flags only work in spark mode,
should not affect the logic in MR mode.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>                 Key: PIG-4876
>                 URL: https://issues.apache.org/jira/browse/PIG-4876
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>         Attachments: PIG-4876.patch
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some input records
to constitute the result tuples. The last result tuples are buffered in the operator.  These
Operators need a flag to indicate the end of input, so that they can flush and constitute
their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the buffered
tuples in MR mode.  But it does not work with OutputConsumeIterator in Spark mode.

This message was sent by Atlassian JIRA

View raw message