pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Brian Johnson (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-4166) Collected group drops last record when combined with merge join
Date Thu, 16 Oct 2014 19:28:33 GMT

    [ https://issues.apache.org/jira/browse/PIG-4166?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14174127#comment-14174127
] 

Brian Johnson commented on PIG-4166:
------------------------------------

Why does this keep getting pushed back? It isn't handling the final status correctly and will
drop the last record in many instances

> Collected group drops last record when combined with merge join
> ---------------------------------------------------------------
>
>                 Key: PIG-4166
>                 URL: https://issues.apache.org/jira/browse/PIG-4166
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.12.0
>            Reporter: Brian Johnson
>             Fix For: 0.15.0
>
>
> If the final two keys in each relation join, they will never make it to the final output.
The reason is that POMergeJoin does a read-ahead and POCollectedGroup doesn't call processInput
when this.parentPlan.endOfAllInput == true. This prevents the final join from being output
because POMergeJoin never sees endOfAllInput == true.
> {code}
> diff --git a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> index c355d1d..8fd44fa 100644
> --- a/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> +++ b/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
> @@ -127,28 +127,30 @@ public class POCollectedGroup extends PhysicalOperator {
>      @Override
>      public Result getNextTuple() throws ExecException {
>  
> -        // Since the output is buffered, we need to flush the last
> -        // set of records when the close method is called by mapper.
> -        if (this.parentPlan.endOfAllInput) {
> -            if (outputBag != null) {
> -                Tuple tup = mTupleFactory.newTuple(2);
> -                tup.set(0, prevKey);
> -                tup.set(1, outputBag);
> -                outputBag = null;
> -                return new Result(POStatus.STATUS_OK, tup);
> -            }
> -
> -            return new Result(POStatus.STATUS_EOP, null);
> -        }
> +        
>  
>          Result inp = null;
>          Result res = null;
>  
>          while (true) {
>              inp = processInput();
> +
>              if (inp.returnStatus == POStatus.STATUS_EOP ||
>                      inp.returnStatus == POStatus.STATUS_ERR) {
> -                break;
> +               // Since the output is buffered, we need to flush the last
> +                // set of records when the close method is called by mapper.
> +                if (this.parentPlan.endOfAllInput) {
> +                    if (outputBag != null) {
> +                        Tuple tup = mTupleFactory.newTuple(2);
> +                        tup.set(0, prevKey);
> +                        tup.set(1, outputBag);
> +                        outputBag = null;
> +                        return new Result(POStatus.STATUS_OK, tup);
> +                    }
> +
> +                    return new Result(POStatus.STATUS_EOP, null);
> +                } else
> +                       break;
>              }
>  
>              if (inp.returnStatus == POStatus.STATUS_NULL) {
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message