pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1744179 - in /pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine: physicalLayer/relationalOperators/ spark/converter/
Date Tue, 17 May 2016 04:33:57 GMT
Author: xuefu
Date: Tue May 17 04:33:57 2016
New Revision: 1744179

URL: http://svn.apache.org/viewvc?rev=1744179&view=rev
Log:
PIG-4876: OutputConsumeIterator can't handle the last buffered tuples for some Operators (Xianda
via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1744179&r1=1744178&r2=1744179&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
Tue May 17 04:33:57 2016
@@ -70,6 +70,15 @@ public class POCollectedGroup extends Ph
 
     private transient boolean useDefaultBag;
 
+    //For Spark
+    private boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
     public POCollectedGroup(OperatorKey k) {
         this(k, -1, null);
     }
@@ -132,7 +141,7 @@ public class POCollectedGroup extends Ph
             if (inp.returnStatus == POStatus.STATUS_EOP) {
                 // Since the output is buffered, we need to flush the last
                 // set of records when the close method is called by mapper.
-                if (this.parentPlan.endOfAllInput) {
+                if (this.parentPlan.endOfAllInput || isEndOfInput()) {
                     return getStreamCloseResult();
                 } else {
                     break;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1744179&r1=1744178&r2=1744179&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
Tue May 17 04:33:57 2016
@@ -122,6 +122,15 @@ public class POMergeCogroup extends Phys
         this.endOfRecordMark = endOfRecordMark;
     }
 
+    //For Spark
+    private boolean endOfInput = false;
+    public boolean isEndOfInput() {
+        return endOfInput;
+    }
+    public void setEndOfInput (boolean isEndOfInput) {
+        endOfInput = isEndOfInput;
+    }
+
     @Override
     public Result getNextTuple() throws ExecException {
 
@@ -145,7 +154,7 @@ public class POMergeCogroup extends Phys
                 break;
 
             case POStatus.STATUS_EOP:
-                if(!this.parentPlan.endOfAllInput)
+                if(!(this.parentPlan.endOfAllInput || isEndOfInput()))
                     return baseInp;
 
                 if(lastTime)

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java?rev=1744179&r1=1744178&r2=1744179&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/CollectedGroupConverter.java
Tue May 17 04:33:57 2016
@@ -73,22 +73,12 @@ public class CollectedGroupConverter imp
 
                         @Override
                         protected Result getNextResult() throws ExecException {
-
-                            // if endOfAllInput was set as true by the predecessors, but
input.hasNext() is true.
-                            // it means that the predecessor has consumed all of its input,
-                            // but poCollectedGroup still hasn't consumed all of its input.
-                            //
-                            // set endOfAllInput as false here, so that POCollectedGroup.getNextTuple()
can work correctly
-                            if (poCollectedGroup.getParentPlan().endOfAllInput &&
input.hasNext()) {
-                                poCollectedGroup.getParentPlan().endOfAllInput = false;
-                            }
-
                             return poCollectedGroup.getNextTuple();
                         }
 
                         @Override
                         protected void endOfInput() {
-                            poCollectedGroup.getParentPlan().endOfAllInput = true;
+                            poCollectedGroup.setEndOfInput(true);
                         }
                     };
                 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java?rev=1744179&r1=1744178&r2=1744179&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
(original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/MergeCogroupConverter.java
Tue May 17 04:33:57 2016
@@ -66,7 +66,7 @@ public class MergeCogroupConverter imple
 
                         @Override
                         protected void endOfInput() {
-                            poMergeCogroup.getParentPlan().endOfAllInput = true;
+                            poMergeCogroup.setEndOfInput(true);
                         }
                     };
                 }



Mime
View raw message