pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1165121 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Date Sun, 04 Sep 2011 21:59:47 GMT
Author: daijy
Date: Sun Sep  4 21:59:47 2011
New Revision: 1165121

URL: http://svn.apache.org/viewvc?rev=1165121&view=rev
Log:
PIG-2163: Improve nested cross to stream one relation

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1165121&r1=1165120&r2=1165121&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Sep  4 21:59:47 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2163: Improve nested cross to stream one relation (zjshen via daijy)
+
 PIG-2249: Enable pig e2e testing on EC2 (gates)
 
 PIG-2256: Upgrade Avro dependency to 1.5.3 (tucu00 via dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1165121&r1=1165120&r2=1165121&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Sun Sep  4 21:59:47 2011
@@ -49,6 +49,8 @@ public class POCross extends PhysicalOpe
     protected Tuple[] data;
 
     protected transient Iterator<Tuple>[] its;
+    
+    protected Tuple tupleOfLastBag;
 
     public POCross(OperatorKey k) {
         super(k);
@@ -112,26 +114,37 @@ public class POCross extends PhysicalOpe
         int noItems = inputs.size();
         if (inputBags == null) {
             accumulateData();
+            if (!loadLastBag()) {
+                res.returnStatus = POStatus.STATUS_EOP;
+                clearMemory();
+                return res;
+            }
         }
 
         if (its != null) {
             // we check if we are done with processing
             // we do that by checking if all the iterators are used up
             boolean finished = true;
+            boolean empty = false;
             for (int i = 0; i < its.length; i++) {
                 if (inputBags[i].size() == 0) {
-                    finished = true;
+                    empty = true;
                     break;
                 }
                 finished &= !its[i].hasNext();
             }
-            if (finished) {
+            if (empty) {
+                // if one bag is empty, there doesn't exist non-null cross product.
+                // simply clear all the input tuples of the first bag and finish.
+                int index = inputs.size() - 1;
+                for (Result resOfLastBag = inputs.get(index).getNext(dummyTuple); resOfLastBag.returnStatus
!=
+                    POStatus.STATUS_EOP; resOfLastBag = inputs.get(index).getNext(dummyTuple));
                 res.returnStatus = POStatus.STATUS_EOP;
-                // reset inputBags, its, data to null so that in the next round
-                // of getNext, the new input data will be loaded.
-                inputBags = null;
-                its = null;
-                data = null;
+                clearMemory();
+                return res;
+            } else if (finished && !loadLastBag()) {
+                res.returnStatus = POStatus.STATUS_EOP;
+                clearMemory();
                 return res;
             }
 
@@ -142,7 +155,8 @@ public class POCross extends PhysicalOpe
             // data we instantiate the template array and start populating it
             // with data
             data = new Tuple[noItems];
-            for (int i = 0; i < noItems; ++i) {
+            data[noItems - 1] = tupleOfLastBag;
+            for (int i = 0; i < noItems - 1; ++i) {
                 data[i] = its[i].next();
 
             }
@@ -150,7 +164,9 @@ public class POCross extends PhysicalOpe
             res.returnStatus = POStatus.STATUS_OK;
             return res;
         } else {
-            for (int index = noItems - 1; index >= 0; --index) {
+            data[noItems - 1] = tupleOfLastBag;
+            int length = noItems - 1;
+            for (int index = 0; index < length; ++index) {
                 if (its[index].hasNext()) {
                     data[index] = its[index].next();
                     res.result = createTuple(data);
@@ -166,20 +182,21 @@ public class POCross extends PhysicalOpe
                     its[index] = (inputBags[index]).iterator();
                     data[index] = its[index].next();
                 }
-
             }
+            res.result = createTuple(data);
+            res.returnStatus = POStatus.STATUS_OK;
+            return res;
         }
-
-        return null;
     }
 
     @SuppressWarnings("unchecked")
     private void accumulateData() throws ExecException {
         int count = 0;
-        inputBags = new DataBag[inputs.size()];
-
-        its = new Iterator[inputs.size()];
-        for (PhysicalOperator op : inputs) {
+        int length = inputs.size() - 1;
+        inputBags = new DataBag[length];
+        its = new Iterator[length];
+        for (int i = 0; i < length; ++i) {
+            PhysicalOperator op = inputs.get(i);
             DataBag bag = BagFactory.getInstance().newDefaultBag();
             inputBags[count] = bag;
             for (Result res = op.getNext(dummyTuple); res.returnStatus != POStatus.STATUS_EOP;
res = op
@@ -209,5 +226,35 @@ public class POCross extends PhysicalOpe
 
         return illustratorMarkup(out, out, 0);
     }
+    
+    private boolean loadLastBag() throws ExecException {
+        Result resOfLastBag = null;
+        int index = inputs.size() - 1;
+        for (resOfLastBag = inputs.get(index).getNext(dummyTuple); resOfLastBag.returnStatus
==
+                POStatus.STATUS_NULL; inputs.get(index).getNext(dummyTuple));
+        switch (resOfLastBag.returnStatus) {
+        case POStatus.STATUS_EOP:
+            return false;
+        case POStatus.STATUS_OK:
+            // each time when an tuple of last bag is ejected, traverse all the
+            // combinations of the tuples from the other n - 1 bags to save the
+            // memory for one bag.
+            tupleOfLastBag = (Tuple) resOfLastBag.result;
+            return true;
+        case POStatus.STATUS_ERR:
+        default:
+            throw new ExecException(
+                    "Error accumulating data in the local Cross operator");
+        }
+    }
+    
+    private void clearMemory() {
+        // reset inputBags, its, data and tupleOfLastBag to null so that in the
+        // next round of getNext, the new input data will be loaded.
+        tupleOfLastBag = null;
+        inputBags = null;
+        its = null;
+        data = null;
+    }
 
 }



Mime
View raw message