pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xianda Ke (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-4876) OutputConsumeIterator can't handle the last buffered tuples for some Operators
Date Fri, 29 Apr 2016 03:10:12 GMT

    [ https://issues.apache.org/jira/browse/PIG-4876?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15263459#comment-15263459
] 

Xianda Ke commented on PIG-4876:
--------------------------------

*The fragility & complexity of sharing a flag between operators*:

hi [~mohitsabharwal], the flag may be reset by input.hasNext() or input.next() if any proceding
operator reach at its end.
{code:title=OutputConsumerIterator.java}
if (result == null) {
  // it does not work if called here. input.hasNext() or input.next() may reset the flag
  //beginOfInput();  
  if (!input.hasNext()) {
     done = true;
     return;
  }
  Tuple v1 = input.next();
  beginOfInput();   // it seems ok to insert the call here (?), need to be tested carefully
  attach(v1);         
  }

  if (!input.hasNext()) {
    endOfInput();  
    // Another issue:       
    // in MR mode, flag was set after the last input tuple is consumed by getNextResult()
    // here, the flag was set before the last input is consumed in getNextResult()
    // it doesn't matter for MergeCogroup, CollectedGroup. These operators work. but it is
a problem for the current implemetation of POMergeJoin
    // the implemetation of POMergeJoin is tightly coupling with MR runPipeline. 
    // even beginOfInput() is called when attaching,  we still have to add some dirty code
in getNextResult() for MergeJoin
    // it seem that beginOfInput() is not good enough to solve all the problems. 

    // it is not so easy if we just move endOfInput() after getNextResult() in the case of
POStatus.STATUS_EOP:
    // i have struggled with regression bugs for days.                       
  }

  result = getNextResult();

{code}

Summaries:
Option (a): don't touch non-spark code by reseting the flag. 
>From my point of view, it is just a workaround. it seems there no need to add abstract
method beginOfInput().  
Well, I will have a try, and do some testing for beginOfInput()

Option (b): add a flag for each operator in spark mode. These flags only work in spark mode,
should not affect the logic in MR mode.

> OutputConsumeIterator can't handle the last buffered tuples for some Operators
> ------------------------------------------------------------------------------
>
>                 Key: PIG-4876
>                 URL: https://issues.apache.org/jira/browse/PIG-4876
>             Project: Pig
>          Issue Type: Sub-task
>          Components: spark
>            Reporter: Xianda Ke
>            Assignee: Xianda Ke
>             Fix For: spark-branch
>
>         Attachments: PIG-4876.patch
>
>
> Some Operators, such as MergeCogroup, Stream, CollectedGroup etc buffer some input records
to constitute the result tuples. The last result tuples are buffered in the operator.  These
Operators need a flag to indicate the end of input, so that they can flush and constitute
their last tuples.
> Currently, the flag 'parentPlan.endOfAllInput' is targeted for flushing the buffered
tuples in MR mode.  But it does not work with OutputConsumeIterator in Spark mode.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message