pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1146574 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relational...
Date Thu, 14 Jul 2011 06:10:26 GMT
Author: daijy
Date: Thu Jul 14 06:10:25 2011
New Revision: 1146574

URL: http://svn.apache.org/viewvc?rev=1146574&view=rev
Log:
PIG-1916: Nested cross

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jul 14 06:10:25 2011
@@ -23,6 +23,9 @@ Trunk (unreleased changes)
 INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
+
+PIG-1916: Nested cross (zjshen via daijy)
+
 PIG-2128: Generating the jar file takes a lot of time and is unnecessary when running Pig
local mode (julien)
 
 PIG-2121: e2e test harness should use ant instead of make (gates)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
Thu Jul 14 06:10:25 2011
@@ -60,6 +60,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -926,6 +927,18 @@ public class MRCompiler extends PhyPlanV
     }
     
     @Override
+    public void visitCross(POCross op) throws VisitorException {
+        try{
+            nonBlocking(op);
+            phyToMROpMap.put(op, curMROp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
+    @Override
     public void visitStream(POStream op) throws VisitorException{
         try{
             nonBlocking(op);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
(original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
Thu Jul 14 06:10:25 2011
@@ -247,6 +247,10 @@ public class PhyPlanVisitor extends Plan
         //do nothing
     }
     
+    public void visitCross(POCross cross) throws VisitorException{
+        //do nothing
+    }
+    
     public void visitFRJoin(POFRJoin join) throws VisitorException {
         //do nothing
     }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java?rev=1146574&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
(added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCross.java
Thu Jul 14 06:10:25 2011
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
+/**
+ * Recover this class for nested cross operation.
+ * 
+ * 
+ */
+public class POCross extends PhysicalOperator {
+
+    private static final long serialVersionUID = 1L;
+
+    protected DataBag[] inputBags;
+
+    protected Tuple[] data;
+
+    protected transient Iterator<Tuple>[] its;
+
+    public POCross(OperatorKey k) {
+        super(k);
+    }
+
+    public POCross(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+    }
+
+    public POCross(OperatorKey k, int rp) {
+        super(k, rp);
+    }
+
+    public POCross(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCross(this);
+    }
+
+    @Override
+    public String name() {
+        return getAliasString() + "POCross" + "["
+                + DataType.findTypeName(resultType) + "]" + " - "
+                + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            illustrator.addData(tOut);
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(
+                    (Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            for (int i = 0; i < data.length; i++) {
+                lineageTracer.union(tOut, data[i]);
+            }
+            return tOut;
+        } else {
+            return (Tuple) out;
+        }
+    }
+
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = new Result();
+        int noItems = inputs.size();
+        if (inputBags == null) {
+            accumulateData();
+        }
+
+        if (its != null) {
+            // we check if we are done with processing
+            // we do that by checking if all the iterators are used up
+            boolean finished = true;
+            for (int i = 0; i < its.length; i++) {
+                if (inputBags[i].size() == 0) {
+                    finished = true;
+                    break;
+                }
+                finished &= !its[i].hasNext();
+            }
+            if (finished) {
+                res.returnStatus = POStatus.STATUS_EOP;
+                // reset inputBags, its, data to null so that in the next round
+                // of getNext, the new input data will be loaded.
+                inputBags = null;
+                its = null;
+                data = null;
+                return res;
+            }
+
+        }
+
+        if (data == null) {
+            // getNext being called for the first time or starting on new input
+            // data we instantiate the template array and start populating it
+            // with data
+            data = new Tuple[noItems];
+            for (int i = 0; i < noItems; ++i) {
+                data[i] = its[i].next();
+
+            }
+            res.result = createTuple(data);
+            res.returnStatus = POStatus.STATUS_OK;
+            return res;
+        } else {
+            for (int index = noItems - 1; index >= 0; --index) {
+                if (its[index].hasNext()) {
+                    data[index] = its[index].next();
+                    res.result = createTuple(data);
+                    res.returnStatus = POStatus.STATUS_OK;
+                    return res;
+                } else {
+                    // reset this index's iterator so cross product can be
+                    // achieved we would be resetting this way only for the
+                    // indexes from the end when the first index which needs to
+                    // be flattened has reached the last element in its
+                    // iterator, we won't come here - instead, we reset all
+                    // iterators at the beginning of this method.
+                    its[index] = (inputBags[index]).iterator();
+                    data[index] = its[index].next();
+                }
+
+            }
+        }
+
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    private void accumulateData() throws ExecException {
+        int count = 0;
+        inputBags = new DataBag[inputs.size()];
+
+        its = new Iterator[inputs.size()];
+        for (PhysicalOperator op : inputs) {
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
+            inputBags[count] = bag;
+            for (Result res = op.getNext(dummyTuple); res.returnStatus != POStatus.STATUS_EOP;
res = op
+                    .getNext(dummyTuple)) {
+                if (res.returnStatus == POStatus.STATUS_NULL)
+                    continue;
+                if (res.returnStatus == POStatus.STATUS_ERR)
+                    throw new ExecException(
+                            "Error accumulating data in the local Cross operator");
+                if (res.returnStatus == POStatus.STATUS_OK)
+                    bag.add((Tuple) res.result);
+            }
+            its[count++] = bag.iterator();
+        }
+    }
+
+    private Tuple createTuple(Tuple[] data) throws ExecException {
+        Tuple out = TupleFactory.getInstance().newTuple();
+
+        for (int i = 0; i < data.length; ++i) {
+            Tuple t = data[i];
+            int size = t.size();
+            for (int j = 0; j < size; ++j) {
+                out.append(t.get(j));
+            }
+        }
+
+        return illustratorMarkup(out, out, 0);
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java Thu Jul 14 06:10:25
2011
@@ -28,12 +28,21 @@ public class LOCross extends LogicalRela
     
     private static final long serialVersionUID = 2L;
     //private static Log log = LogFactory.getLog(LOFilter.class);
-
+    
+    protected boolean nested = false;
         
     public LOCross(LogicalPlan plan) {
         super("LOCross", plan);       
     }
 
+    public boolean isNested() {
+        return nested;
+    }
+
+    public void setNested(boolean nested) {
+        this.nested = nested;
+    }
+
     @Override
     public LogicalSchema getSchema() throws FrontendException {        
         // if schema is calculated before, just return

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
(original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
Thu Jul 14 06:10:25 2011
@@ -36,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -296,131 +297,148 @@ public class LogToPhyTranslationVisitor 
     public void visit(LOCross cross) throws FrontendException {
         String scope = DEFAULT_SCOPE;
         List<Operator> inputs = cross.getPlan().getPredecessors(cross);
-        
-        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
-                scope, nodeGen.getNextNodeId(scope)), cross
-                .getRequestedParallelisam());
-        poGlobal.setAlias(cross.getAlias());
-        POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
-                .getNextNodeId(scope)), cross.getRequestedParallelisam());
-        poGlobal.setAlias(cross.getAlias());
-        currentPlan.add(poGlobal);
-        currentPlan.add(poPackage);
-        
-        int count = 0;
-        
-        try {
-            currentPlan.connect(poGlobal, poPackage);
-            List<Boolean> flattenLst = Arrays.asList(true, true);
+                if (cross.isNested()) {
+            POCross physOp = new POCross(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam());
+            physOp.setAlias(physOp.getAlias());
+            currentPlan.add(physOp);
+            physOp.setResultType(DataType.BAG);
+            logToPhyMap.put(cross, physOp);
+            for (Operator op : cross.getPlan().getPredecessors(cross)) {
+                PhysicalOperator from = logToPhyMap.get(op);
+                try {
+                    currentPlan.connect(from, physOp);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG,
e);
+                }
+            }
+        } else {
+            POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), cross
+                    .getRequestedParallelisam());
+            poGlobal.setAlias(cross.getAlias());
+            POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), cross.getRequestedParallelisam());
+            poGlobal.setAlias(cross.getAlias());
+            currentPlan.add(poGlobal);
+            currentPlan.add(poPackage);
             
-            for (Operator op : inputs) {
-                PhysicalPlan fep1 = new PhysicalPlan();
-                ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
-                ce1.setValue(inputs.size());
-                ce1.setResultType(DataType.INTEGER);
-                fep1.add(ce1);
-                
-                ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
-                ce2.setValue(count);
-                ce2.setResultType(DataType.INTEGER);
-                fep1.add(ce2);
-                /*Tuple ce1val = TupleFactory.getInstance().newTuple(2);
-                ce1val.set(0,inputs.size());
-                ce1val.set(1,count);
-                ce1.setValue(ce1val);
-                ce1.setResultType(DataType.TUPLE);*/
-                
-                POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(),
Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
-                gfc.setAlias(cross.getAlias());
-                gfc.setResultType(DataType.BAG);
-                fep1.addAsLeaf(gfc);
-                gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2));
-                /*fep1.add(gfc);
-                fep1.connect(ce1, gfc);
-                fep1.connect(ce2, gfc);*/
-                
-                PhysicalPlan fep2 = new PhysicalPlan();
-                POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam());
-                feproj.setAlias(cross.getAlias());
-                feproj.setResultType(DataType.TUPLE);
-                feproj.setStar(true);
-                feproj.setOverloaded(false);
-                fep2.add(feproj);
-                List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
-                
-                POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), fePlans, flattenLst );
-                fe.setAlias(cross.getAlias());
-                currentPlan.add(fe);
-                currentPlan.connect(logToPhyMap.get(op), fe);
+            int count = 0;
+            
+            try {
+                currentPlan.connect(poGlobal, poPackage);
+                List<Boolean> flattenLst = Arrays.asList(true, true);
                 
-                POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
-                        scope, nodeGen.getNextNodeId(scope)), cross
-                        .getRequestedParallelisam());
-                physOp.setAlias(cross.getAlias());
-                List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
-                for(int i=0;i<inputs.size();i++){
-                    PhysicalPlan lrp1 = new PhysicalPlan();
-                    POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), i);
-                    lrproj1.setAlias(cross.getAlias());
-                    lrproj1.setOverloaded(false);
-                    lrproj1.setResultType(DataType.INTEGER);
-                    lrp1.add(lrproj1);
-                    lrPlans.add(lrp1);
+                for (Operator op : inputs) {
+                    PhysicalPlan fep1 = new PhysicalPlan();
+                    ConstantExpression ce1 = new ConstantExpression(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                    ce1.setValue(inputs.size());
+                    ce1.setResultType(DataType.INTEGER);
+                    fep1.add(ce1);
+                    
+                    ConstantExpression ce2 = new ConstantExpression(new OperatorKey(scope,
nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam());
+                    ce2.setValue(count);
+                    ce2.setResultType(DataType.INTEGER);
+                    fep1.add(ce2);
+                    /*Tuple ce1val = TupleFactory.getInstance().newTuple(2);
+                    ce1val.set(0,inputs.size());
+                    ce1val.set(1,count);
+                    ce1.setValue(ce1val);
+                    ce1.setResultType(DataType.TUPLE);*/
+                    
+                    POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(),
Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
+                    gfc.setAlias(cross.getAlias());
+                    gfc.setResultType(DataType.BAG);
+                    fep1.addAsLeaf(gfc);
+                    gfc.setInputs(Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2));
+                    /*fep1.add(gfc);
+                    fep1.connect(ce1, gfc);
+                    fep1.connect(ce2, gfc);*/
+                    
+                    PhysicalPlan fep2 = new PhysicalPlan();
+                    POProject feproj = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam());
+                    feproj.setAlias(cross.getAlias());
+                    feproj.setResultType(DataType.TUPLE);
+                    feproj.setStar(true);
+                    feproj.setOverloaded(false);
+                    fep2.add(feproj);
+                    List<PhysicalPlan> fePlans = Arrays.asList(fep1, fep2);
+                    
+                    POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), fePlans, flattenLst );
+                    fe.setAlias(cross.getAlias());
+                    currentPlan.add(fe);
+                    currentPlan.connect(logToPhyMap.get(op), fe);
+                    
+                    POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                            scope, nodeGen.getNextNodeId(scope)), cross
+                            .getRequestedParallelisam());
+                    physOp.setAlias(cross.getAlias());
+                    List<PhysicalPlan> lrPlans = new ArrayList<PhysicalPlan>();
+                    for(int i=0;i<inputs.size();i++){
+                        PhysicalPlan lrp1 = new PhysicalPlan();
+                        POProject lrproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), i);
+                        lrproj1.setAlias(cross.getAlias());
+                        lrproj1.setOverloaded(false);
+                        lrproj1.setResultType(DataType.INTEGER);
+                        lrp1.add(lrproj1);
+                        lrPlans.add(lrp1);
+                    }
+                    
+                    physOp.setCross(true);
+                    physOp.setIndex(count++);
+                    physOp.setKeyType(DataType.TUPLE);
+                    physOp.setPlans(lrPlans);
+                    physOp.setResultType(DataType.TUPLE);
+                    
+                    currentPlan.add(physOp);
+                    currentPlan.connect(fe, physOp);
+                    currentPlan.connect(physOp, poGlobal);
                 }
-                
-                physOp.setCross(true);
-                physOp.setIndex(count++);
-                physOp.setKeyType(DataType.TUPLE);
-                physOp.setPlans(lrPlans);
-                physOp.setResultType(DataType.TUPLE);
-                
-                currentPlan.add(physOp);
-                currentPlan.connect(fe, physOp);
-                currentPlan.connect(physOp, poGlobal);
+            } catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG,
e1);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new VisitorException(msg, errCode, PigException.BUG, e);
             }
-        } catch (PlanException e1) {
-            int errCode = 2015;
-            String msg = "Invalid physical operators in the physical plan" ;
-            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG,
e1);
-        } catch (ExecException e) {
-            int errCode = 2058;
-            String msg = "Unable to set index on newly create POLocalRearrange.";
-            throw new VisitorException(msg, errCode, PigException.BUG, e);
-        }
-        
-        poPackage.setKeyType(DataType.TUPLE);
-        poPackage.setResultType(DataType.TUPLE);
-        poPackage.setNumInps(count);
-        boolean inner[] = new boolean[count];
-        for (int i=0;i<count;i++) {
-            inner[i] = true;
-        }
-        poPackage.setInner(inner);
-        
-        List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
-        List<Boolean> flattenLst = new ArrayList<Boolean>();
-        for(int i=1;i<=count;i++){
-            PhysicalPlan fep1 = new PhysicalPlan();
-            POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), i);
-            feproj1.setAlias(cross.getAlias());
-            feproj1.setResultType(DataType.BAG);
-            feproj1.setOverloaded(false);
-            fep1.add(feproj1);
-            fePlans.add(fep1);
-            flattenLst.add(true);
-        }
-        
-        POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), fePlans, flattenLst );
-        fe.setAlias(cross.getAlias());
-        currentPlan.add(fe);
-        try{
-            currentPlan.connect(poPackage, fe);
-        }catch (PlanException e1) {
-            int errCode = 2015;
-            String msg = "Invalid physical operators in the physical plan" ;
-            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG,
e1);
+            
+            poPackage.setKeyType(DataType.TUPLE);
+            poPackage.setResultType(DataType.TUPLE);
+            poPackage.setNumInps(count);
+            boolean inner[] = new boolean[count];
+            for (int i=0;i<count;i++) {
+                inner[i] = true;
+            }
+            poPackage.setInner(inner);
+            
+            List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+            List<Boolean> flattenLst = new ArrayList<Boolean>();
+            for(int i=1;i<=count;i++){
+                PhysicalPlan fep1 = new PhysicalPlan();
+                POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), i);
+                feproj1.setAlias(cross.getAlias());
+                feproj1.setResultType(DataType.BAG);
+                feproj1.setOverloaded(false);
+                fep1.add(feproj1);
+                fePlans.add(fep1);
+                flattenLst.add(true);
+            }
+            
+            POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
cross.getRequestedParallelisam(), fePlans, flattenLst );
+            fe.setAlias(cross.getAlias());
+            currentPlan.add(fe);
+            try{
+                currentPlan.connect(poPackage, fe);
+            }catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG,
e1);
+            }
+            logToPhyMap.put(cross, fe);
         }
-        logToPhyMap.put(cross, fe);
     }
     
     @Override

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Thu Jul 14 06:10:25 2011
@@ -438,6 +438,7 @@ nested_op : nested_proj
           | nested_sort
           | nested_distinct
           | nested_limit
+          | nested_cross
 ;
 
 nested_proj 
@@ -460,6 +461,12 @@ nested_limit 
     : ^( LIMIT nested_op_input ( INTEGER | expr ) )
 ;
 
+nested_cross : ^( CROSS nested_op_input_list )
+;
+
+nested_op_input_list : nested_op_input+
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu Jul 14 06:10:25 2011
@@ -415,6 +415,7 @@ nested_op : nested_proj
           | nested_sort
           | nested_distinct
           | nested_limit
+          | nested_cross
 ;
 
 nested_proj 
@@ -438,9 +439,17 @@ nested_limit 
     : ^( LIMIT { sb.append($LIMIT.text).append(" "); }  nested_op_input ( INTEGER { sb.append("
").append($INTEGER.text); } | expr ) )
 ;
 
+nested_cross
+    : ^( CROSS { sb.append($CROSS.text).append(" "); }  nested_op_input_list )
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 
+nested_op_input_list 
+    : nested_op_input ( { sb.append(", "); } nested_op_input)*
+;
+
 stream_clause 
     : ^( STREAM { sb.append($STREAM.text).append(" "); } rel { sb.append(" THROUGH "); }
         ( EXECCOMMAND { sb.append($EXECCOMMAND.text); }

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu Jul 14 06:10:25 2011
@@ -436,6 +436,7 @@ nested_op : nested_proj
           | nested_sort
           | nested_distinct
           | nested_limit
+          | nested_cross
 ;
 
 nested_proj : ^( NESTED_PROJ col_ref col_ref+ )
@@ -454,9 +455,15 @@ nested_distinct : ^( DISTINCT nested_op_
 nested_limit : ^( LIMIT nested_op_input ( INTEGER | expr ) )
 ;
 
+nested_cross : ^( CROSS nested_op_input_list )
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 
+nested_op_input_list : nested_op_input+
+;
+
 stream_clause : ^( STREAM rel ( EXECCOMMAND | IDENTIFIER ) as_clause? )
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Jul 14 06:10:25 2011
@@ -958,6 +958,13 @@ public class LogicalPlanBuilder {
         return op;
     }
     
+    Operator buildNestedCrossOp(SourceLocation loc, LogicalPlan plan, String alias, List<Operator>
inputOpList) {
+        LOCross op = new LOCross( plan );
+        op.setNested(true);
+        buildNestedOp( loc, plan, op, alias, inputOpList );
+        return op;
+    }
+    
     private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator
op,
             String alias, Operator inputOp) {
         op.setLocation( loc );
@@ -965,6 +972,16 @@ public class LogicalPlanBuilder {
         plan.add( op );
         plan.connect( inputOp, op );
     }
+    
+    private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator
op,
+            String alias, List<Operator> inputOpList) {
+        op.setLocation( loc );
+        setAlias( op, alias );
+        plan.add( op );
+        for (Operator inputOp : inputOpList) {
+            plan.connect( inputOp, op );
+        }
+    }
 
     static LOSort createNestedSortOp(LogicalPlan plan) {
         return new LOSort( plan );

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Jul 14 06:10:25 2011
@@ -1163,6 +1163,7 @@ nested_op[String alias] returns[Operator
  | nested_sort [$alias] { $op = $nested_sort.op; }
  | nested_distinct[$alias] { $op = $nested_distinct.op; }
  | nested_limit[$alias] { $op = $nested_limit.op; }
+ | nested_cross[$alias] { $op = $nested_cross.op; }
 ;
 
 nested_proj[String alias] returns[Operator op]
@@ -1247,6 +1248,17 @@ scope GScope;
   ) )
 ;
 
+nested_cross[String alias] returns[Operator op]
+@init {
+    Operator inputOp = null;
+}
+ : ^( CROSS nested_op_input_list )
+   {
+       SourceLocation loc = new SourceLocation( (PigParserNode)$CROSS );
+       $op = builder.buildNestedCrossOp( loc, $foreach_plan::innerPlan, $alias, $nested_op_input_list.opList
);
+   }
+;
+
 nested_op_input returns[Operator op]
 @init {
     LogicalExpressionPlan plan = new LogicalExpressionPlan();
@@ -1263,6 +1275,11 @@ nested_op_input returns[Operator op]
    }
 ;
 
+nested_op_input_list returns[List<Operator> opList]
+@init { $opList = new ArrayList<Operator>(); }
+ : ( nested_op_input { $opList.add( $nested_op_input.op ); } )+
+;
+
 stream_clause returns[String alias]
 @init {
     StreamingCommand cmd = null;

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu Jul 14 06:10:25 2011
@@ -568,6 +568,7 @@ nested_op : nested_filter
           | nested_sort
           | nested_distinct
           | nested_limit
+          | nested_cross
 ;
 
 nested_proj : col_ref PERIOD col_ref_list
@@ -590,9 +591,16 @@ nested_distinct : DISTINCT^ nested_op_in
 nested_limit : LIMIT^ nested_op_input ( INTEGER | expr )
 ;
 
+nested_cross : CROSS^ nested_op_input_list
+;
+
 nested_op_input : col_ref | nested_proj
 ;
 
+nested_op_input_list : nested_op_input ( COMMA nested_op_input )*
+        -> nested_op_input+
+;
+
 stream_clause : STREAM^ rel THROUGH! ( EXECCOMMAND | alias ) as_clause?
 ;
 

Modified: pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java?rev=1146574&r1=1146573&r2=1146574&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestForEachNestedPlanLocal.java Thu Jul 14 06:10:25
2011
@@ -26,6 +26,7 @@ import junit.framework.TestCase;
 import junit.framework.Assert;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.Random;
 import java.io.*;
 import java.text.DecimalFormat;
@@ -90,8 +91,94 @@ public class TestForEachNestedPlanLocal 
         Assert.assertEquals(3L, count[2]);
     }
 
+    @Test
+    public void testNestedCrossTwoRelations() throws Exception {
+        File[] tmpFiles = generateDataSetFilesForNestedCross();
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new
String[] {
+                "({('user1','usa','user1','usa','10'),('user1','usa','user1','usa','30'),('user1','usa','user1','china','20')})",
+                "({('user2','usa','user2','usa','20'),('user2','usa','user2','usa','20')})",
+                "({('user3','singapore','user3','usa','10'),('user3','singapore','user3','singapore','20')})",
+                "({})" });
+        pig.registerQuery("user = load '"
+                + Util.generateURI(tmpFiles[0].toString(), pig.getPigContext())
+                + "' as (uid, region);");
+        pig.registerQuery("session = load '"
+                + Util.generateURI(tmpFiles[1].toString(), pig.getPigContext())
+                + "' as (uid, region, duration);");
+        pig.registerQuery("C = cogroup user by uid, session by uid;");
+        pig.registerQuery("D = foreach C {"
+                + "crossed = cross user, session;"
+                + "generate crossed;" + "}");
+        Iterator<Tuple> expectedItr = expectedResults.iterator();
+        Iterator<Tuple> actualItr = pig.openIterator("D");
+        while (expectedItr.hasNext() && actualItr.hasNext()) {
+            Tuple expectedTuple = expectedItr.next();
+            Tuple actualTuple = actualItr.next();
+            assertEquals(expectedTuple, actualTuple);
+        }
+        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+    }
+    
+    @Test
+    public void testNestedCrossTwoRelationsComplex() throws Exception {
+        File[] tmpFiles = generateDataSetFilesForNestedCross();
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new
String[] {
+                "({('user1','usa','user1','usa','10'),('user1','usa','user1','usa','30')})",
+                "({('user2','usa','user2','usa','20')})",
+                "({('user3','singapore','user3','singapore','20')})",
+                "({})" });
+        pig.registerQuery("user = load '"
+                + Util.generateURI(tmpFiles[0].toString(), pig.getPigContext())
+                + "' as (uid, region);");
+        pig.registerQuery("session = load '"
+                + Util.generateURI(tmpFiles[1].toString(), pig.getPigContext())
+                + "' as (uid, region, duration);");
+        pig.registerQuery("C = cogroup user by uid, session by uid;");
+        pig.registerQuery("D = foreach C {"
+                + "distinct_session = distinct session;"
+                + "crossed = cross user, distinct_session;"
+                + "filtered = filter crossed by user::region == distinct_session::region;"
+                + "generate filtered;" + "}");
+        Iterator<Tuple> expectedItr = expectedResults.iterator();
+        Iterator<Tuple> actualItr = pig.openIterator("D");
+        while (expectedItr.hasNext() && actualItr.hasNext()) {
+            Tuple expectedTuple = expectedItr.next();
+            Tuple actualTuple = actualItr.next();
+            assertEquals(expectedTuple, actualTuple);
+        }
+        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+    }
 
-
+    @Test
+    public void testNestedCrossThreeRelations() throws Exception {
+        File[] tmpFiles = generateDataSetFilesForNestedCross();
+        List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStringAsByteArray(new
String[] {
+                "({('user1','usa','user1','usa','10','user1','admin','male'),('user1','usa','user1','usa','30','user1','admin','male'),('user1','usa','user1','china','20','user1','admin','male')})",
+                "({('user2','usa','user2','usa','20','user2','guest','male'),('user2','usa','user2','usa','20','user2','guest','male')})",
+                "({('user3','singapore','user3','usa','10','user3','user','female'),('user3','singapore','user3','singapore','20','user3','user','female')})",
+                "({})" });
+        pig.registerQuery("user = load '"
+                + Util.generateURI(tmpFiles[0].toString(), pig.getPigContext())
+                + "' as (uid, region);");
+        pig.registerQuery("session = load '"
+                + Util.generateURI(tmpFiles[1].toString(), pig.getPigContext())
+                + "' as (uid, region, duration);");
+        pig.registerQuery("profile = load '"
+                + Util.generateURI(tmpFiles[2].toString(), pig.getPigContext())
+                + "' as (uid, role, gender);");
+        pig.registerQuery("C = cogroup user by uid, session by uid, profile by uid;");
+        pig.registerQuery("D = foreach C {"
+                + "crossed = cross user, session, profile;"
+                + "generate crossed;" + "}");
+        Iterator<Tuple> expectedItr = expectedResults.iterator();
+        Iterator<Tuple> actualItr = pig.openIterator("D");
+        while (expectedItr.hasNext() && actualItr.hasNext()) {
+            Tuple expectedTuple = expectedItr.next();
+            Tuple actualTuple = actualItr.next();
+            assertEquals(expectedTuple, actualTuple);
+        }
+        assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+    }
 
     /*
     @Test
@@ -169,5 +256,35 @@ public class TestForEachNestedPlanLocal 
 
         return fp1;
     }
+    
+    private File[] generateDataSetFilesForNestedCross() throws IOException {
+        File userFile = File.createTempFile("user", "txt");
+        PrintStream userPS = new PrintStream(new FileOutputStream(userFile));
+        userPS.println("user1\tusa");
+        userPS.println("user2\tusa");
+        userPS.println("user3\tsingapore");
+        userPS.println("user4\tchina");
+        userPS.close();
+        File sessionFile = File.createTempFile("session", "txt");
+        PrintStream sessionPS = new PrintStream(new FileOutputStream(
+                sessionFile));
+        sessionPS.println("user3\tusa\t10");
+        sessionPS.println("user3\tsingapore\t20");
+        sessionPS.println("user2\tusa\t20");
+        sessionPS.println("user2\tusa\t20");
+        sessionPS.println("user1\tusa\t10");
+        sessionPS.println("user1\tusa\t30");
+        sessionPS.println("user1\tchina\t20");
+        sessionPS.close();
+        File profileFile = File.createTempFile("profile", "txt");
+        PrintStream profilePS = new PrintStream(new FileOutputStream(
+                profileFile));
+        profilePS.println("user1\tadmin\tmale");
+        profilePS.println("user2\tguest\tmale");
+        profilePS.println("user3\tuser\tfemale");
+        profilePS.println("user4\tuser\tfemale");
+        profilePS.close();
+        return new File[] { userFile, sessionFile, profileFile };
+    }
 
 }



Mime
View raw message