pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1604691 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Date Mon, 23 Jun 2014 04:58:46 GMT
Author: daijy
Date: Mon Jun 23 04:58:46 2014
New Revision: 1604691

URL: http://svn.apache.org/r1604691
Log:
PIG-4035: Fix CollectedGroup e2e tests for tez

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1604691&r1=1604690&r2=1604691&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jun 23 04:58:46 2014
@@ -40,6 +40,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4035: Fix CollectedGroup e2e tests for tez (daijy)
+
 PIG-4034: Exclude TestTezAutoParallelism when -Dhadoopversion=20 (cheolsoo)
 
 PIG-4033: Fix MergeSparseJoin e2e tests on tez (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1604691&r1=1604690&r2=1604691&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
Mon Jun 23 04:58:46 2014
@@ -130,15 +130,7 @@ public class POCollectedGroup extends Ph
         // Since the output is buffered, we need to flush the last
         // set of records when the close method is called by mapper.
         if (this.parentPlan.endOfAllInput) {
-            if (outputBag != null) {
-                Tuple tup = mTupleFactory.newTuple(2);
-                tup.set(0, prevKey);
-                tup.set(1, outputBag);
-                outputBag = null;
-                return new Result(POStatus.STATUS_OK, tup);
-            }
-
-            return new Result(POStatus.STATUS_EOP, null);
+            return getStreamCloseResult();
         }
 
         Result inp = null;
@@ -146,8 +138,13 @@ public class POCollectedGroup extends Ph
 
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP ||
-                    inp.returnStatus == POStatus.STATUS_ERR) {
+            if (inp.returnStatus == POStatus.STATUS_EOP) {
+                if (this.parentPlan.endOfAllInput) {
+                    return getStreamCloseResult();
+                } else {
+                    break;
+                }
+            } else if (inp.returnStatus == POStatus.STATUS_ERR) {
                 break;
             }
 
@@ -278,4 +275,16 @@ public class POCollectedGroup extends Ph
     public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
         return null;
     }
+
+    private Result getStreamCloseResult() throws ExecException {
+        if (outputBag != null) {
+            Tuple tup = mTupleFactory.newTuple(2);
+            tup.set(0, prevKey);
+            tup.set(1, outputBag);
+            outputBag = null;
+            return new Result(POStatus.STATUS_OK, tup);
+        }
+
+        return new Result(POStatus.STATUS_EOP, null);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1604691&r1=1604690&r2=1604691&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jun
23 04:58:46 2014
@@ -34,6 +34,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.LoadFunc;
@@ -551,13 +552,46 @@ public class TezCompiler extends PhyPlan
         }
     }
 
-    // visit methods in alphabetical order
-
     @Override
     public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
-        int errCode = 2034;
-        String msg = "Cannot compile " + op.getClass().getSimpleName();
-        throw new TezCompilerException(msg, errCode, PigException.BUG);
+
+        List<PhysicalOperator> roots = curTezOp.plan.getRoots();
+        if(roots.size() != 1){
+            int errCode = 2171;
+            String errMsg = "Expected one but found more then one root physical operator
in physical plan.";
+            throw new TezCompilerException(errMsg,errCode,PigException.BUG);
+        }
+
+        PhysicalOperator phyOp = roots.get(0);
+        if(! (phyOp instanceof POLoad)){
+            int errCode = 2172;
+            String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
+            throw new TezCompilerException(errMsg,errCode,PigException.BUG);
+        }
+
+        LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
+        try {
+            if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
+                int errCode = 2249;
+                throw new TezCompilerException("While using 'collected' on group; data must
be loaded via loader implementing CollectableLoadFunc.", errCode);
+            }
+            ((CollectableLoadFunc)loadFunc).ensureAllKeyInstancesInSameSplit();
+        } catch (TezCompilerException e){
+            throw (e);
+        } catch (IOException e) {
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+
+        try{
+            nonBlocking(op);
+            phyToTezOpMap.put(op, curTezOp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
     }
 
     @Override



Mime
View raw message