pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject svn commit: r1143614 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/newplan/logical/ src/...
Date Thu, 07 Jul 2011 00:27:06 GMT
Author: thejas
Date: Thu Jul  7 00:27:05 2011
New Revision: 1143614

URL: http://svn.apache.org/viewvc?rev=1143614&view=rev
Log:
PIG-1926: Sample/Limit should take scalar (azaroth via thejas)

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVariableValidator.java
    pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    pig/trunk/src/org/apache/pig/newplan/logical/DotLOPrinter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
    pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g
    pig/trunk/test/org/apache/pig/test/TestSample.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Jul  7 00:27:05 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1926: Sample/Limit should take scalar (azaroth via thejas)
+
 PIG-1950: e2e test harness needs to be able to compare to previous version of
 Pig (gates)
 

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Thu Jul  7 00:27:05 2011
@@ -90,6 +90,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.visitor.CastLineageSetter;
 import org.apache.pig.newplan.logical.visitor.ColumnAliasConversionVisitor;
+import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
 import org.apache.pig.newplan.logical.visitor.ProjStarInUdfExpander;
 import org.apache.pig.newplan.logical.visitor.ScalarVisitor;
 import org.apache.pig.newplan.logical.visitor.SchemaAliasVisitor;
@@ -1677,6 +1678,7 @@ public class PigServer {
             
             new UnionOnSchemaSetter( lp ).visit();
             new CastLineageSetter(lp, collector).visit();
+            new ScalarVariableValidator(lp).visit();
         }
 
         private void postProcess() throws IOException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Jul  7 00:27:05 2011
@@ -1023,9 +1023,12 @@ public class MRCompiler extends PhyPlanV
     @Override
     public void visitLimit(POLimit op) throws VisitorException{
         try{
-        	
             MapReduceOper mro = compiledInputs[0];
             mro.limit = op.getLimit();
+            if (op.getLimitPlan() != null) {
+                processUDFs(op.getLimitPlan());
+                mro.limitPlan = op.getLimitPlan();
+            }
             if (!mro.isMapDone()) {
             	// if map plan is open, add a limit for optimization, eventually we
             	// will add another limit to reduce plan
@@ -1042,6 +1045,7 @@ public class MRCompiler extends PhyPlanV
                     if (!pigContext.inIllustrator) {
                         POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
                         pLimit2.setLimit(op.getLimit());
+                        pLimit2.setLimitPlan(op.getLimitPlan());
                         mro.reducePlan.addAsLeaf(pLimit2);
                     } else {
                         mro.reducePlan.addAsLeaf(op);
@@ -2843,7 +2847,7 @@ public class MRCompiler extends PhyPlanV
             // Look for map reduce operators which contains limit operator.
             // If so and the requestedParallelism > 1, add one additional map-reduce
             // operator with 1 reducer into the original plan
-            if (mr.limit!=-1 && mr.requestedParallelism!=1)
+            if ((mr.limit!=-1 || mr.limitPlan!=null) && mr.requestedParallelism!=1)
             {
                 opsToAdjust.add(mr);
             }
@@ -2886,6 +2890,7 @@ public class MRCompiler extends PhyPlanV
                 }
                 POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
                 pLimit2.setLimit(mr.limit);
+                pLimit2.setLimitPlan(mr.limitPlan);
                 limitAdjustMROp.reducePlan.addAsLeaf(pLimit2);
 
                 // If the operator we're following has global sort set, we

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Thu Jul  7 00:27:05 2011
@@ -125,6 +125,9 @@ public class MapReduceOper extends Opera
     // to add additional map reduce operator with 1 reducer after this
     long limit = -1;
 
+    // POLimit can also have an expression. See PIG-1926
+    PhysicalPlan limitPlan = null;
+    
     // Indicates that this MROper is a splitter MROper. 
     // That is, this MROper ends due to a POSPlit operator.
     private boolean splitter = false;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Thu Jul  7 00:27:05 2011
@@ -17,23 +17,20 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
-import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.data.DataType;
-import org.apache.pig.data.Tuple;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
-import org.apache.pig.pen.util.LineageTracer;
 
 public class POLimit extends PhysicalOperator {
 	   /**
@@ -47,6 +44,9 @@ public class POLimit extends PhysicalOpe
     // Number of limited outputs
     long mLimit;
 
+    // The expression plan
+    PhysicalPlan expressionPlan;
+
     public POLimit(OperatorKey k) {
         this(k, -1, null);
     }
@@ -63,28 +63,59 @@ public class POLimit extends PhysicalOpe
         super(k, rp, inputs);
     }
     
-    public void setLimit(long limit)
-    {
+    public void setLimit(long limit) {
     	mLimit = limit;
     }
     
-    public long getLimit()
-    {
+    public long getLimit() {
     	return mLimit;
     }
 
+    public PhysicalPlan getLimitPlan() {
+        return expressionPlan;
+    }
+
+    public void setLimitPlan(PhysicalPlan expressionPlan) {
+        this.expressionPlan = expressionPlan;
+    }
+
     /**
      * Counts the number of tuples processed into static variable soFar, if the number of tuples processed reach the 
      * limit, return EOP; Otherwise, return the tuple 
      */
     @Override
     public Result getNext(Tuple t) throws ExecException {
-        Result res = null;
+        // if it is the first time, evaluate the expression. Otherwise reuse the computed value.
+        if (this.getLimit() < 0 && expressionPlan != null) {
+            PhysicalOperator expression = expressionPlan.getLeaves().get(0);
+            long variableLimit;
+            Result returnValue;
+            switch (expression.getResultType()) {
+            case DataType.LONG:
+                returnValue = expression.getNext(dummyLong);
+                if (returnValue.returnStatus != POStatus.STATUS_OK || returnValue.result == null)
+                    throw new RuntimeException("Unable to evaluate Limit expression: "
+                            + returnValue);
+                variableLimit = (Long) returnValue.result;
+                break;
+            case DataType.INTEGER:
+                returnValue = expression.getNext(dummyInt);
+                if (returnValue.returnStatus != POStatus.STATUS_OK || returnValue.result == null)
+                    throw new RuntimeException("Unable to evaluate Limit expression: "
+                            + returnValue);
+                variableLimit = (Integer) returnValue.result;
+                break;
+            default:
+                throw new RuntimeException("Limit requires an integer parameter");
+            }
+            if (variableLimit <= 0)
+                throw new RuntimeException("Limit requires a positive integer parameter");
+            this.setLimit(variableLimit);
+        }
         Result inp = null;
         while (true) {
             inp = processInput();
-            if (inp.returnStatus == POStatus.STATUS_EOP
-                    || inp.returnStatus == POStatus.STATUS_ERR)
+            if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
                 break;
             
             illustratorMarkup(inp.result, null, 0);
@@ -130,6 +161,7 @@ public class POLimit extends PhysicalOpe
             NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)),
             this.requestedParallelism, this.inputs);
         newLimit.mLimit = this.mLimit;
+        newLimit.expressionPlan = this.expressionPlan.clone();
         newLimit.setAlias(alias);
         return newLimit;
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/DotLOPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/DotLOPrinter.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/DotLOPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/DotLOPrinter.java Thu Jul  7 00:27:05 2011
@@ -19,9 +19,12 @@ package org.apache.pig.newplan.logical;
 
 import java.io.PrintStream;
 import java.util.ArrayList;
-import java.util.List;
-import java.util.LinkedList;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.BaseOperatorPlan;
 import org.apache.pig.newplan.DotPlanDumper;
@@ -33,14 +36,12 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LOStore;
 
-import java.util.HashSet;
-import java.util.Set;
-
 /**
  * This class can print a logical plan in the DOT format. It uses
  * clusters to illustrate nesting. If "verbose" is off, it will skip
@@ -133,6 +134,9 @@ public class DotLOPrinter extends DotPla
         if(op instanceof LOFilter){
             plans.add(((LOFilter)op).getFilterPlan());
         }
+        else if(op instanceof LOLimit){
+            plans.add(((LOLimit)op).getLimitPlan());
+        }
         else if(op instanceof LOForEach){
             plans.add(((LOForEach)op).getInnerPlan());
         }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Thu Jul  7 00:27:05 2011
@@ -32,6 +32,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
@@ -71,6 +72,15 @@ public abstract class AllExpressionVisit
     }
     
     @Override
+    public void visit(LOLimit limit) throws FrontendException {
+        currentOp = limit;
+        if (limit.getLimitPlan() != null) {
+            LogicalExpressionVisitor v = getVisitor(limit.getLimitPlan());
+            v.visit();
+        }
+    }
+ 
+    @Override
     public void visit(LOJoin join) throws FrontendException {
         currentOp = join;
         Collection<LogicalExpressionPlan> c = join.getExpressionPlanValues();

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Thu Jul  7 00:27:05 2011
@@ -17,31 +17,28 @@
  */
 package org.apache.pig.newplan.logical.optimizer;
 
-import java.util.ArrayList;
-import java.util.List;
 import java.io.ByteArrayOutputStream;
-import java.io.PrintStream;
-import java.io.OutputStream;
 import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.List;
 
-import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
-import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.ReverseDependencyOrderWalker;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.MultiMap;
 
 /**
  * A visitor mechanism printing out the logical plan.
@@ -169,6 +166,9 @@ public class LogicalPlanPrinter extends 
         if(node instanceof LOFilter){
             sb.append(planString(((LOFilter)node).getFilterPlan()));
         }
+        else if(node instanceof LOLimit){
+            sb.append(planString(((LOLimit)node).getLimitPlan()));
+        }
         else if(node instanceof LOForEach){
             sb.append(planString(((LOForEach)node).getInnerPlan()));        
         }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Thu Jul  7 00:27:05 2011
@@ -164,6 +164,11 @@ public class SchemaResetter extends Logi
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         loLimit.resetSchema();
+        if (loLimit.getLimitPlan() != null) {
+            FieldSchemaResetter fsResetter = new FieldSchemaResetter(
+                    loLimit.getLimitPlan());
+            fsResetter.visit();
+        }
         loLimit.getSchema();
     }
     

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java Thu Jul  7 00:27:05 2011
@@ -152,6 +152,11 @@ public class UidResetter extends Logical
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         loLimit.resetUid();
+        if (loLimit.getLimitPlan() != null) {
+            ExpressionUidResetter uidResetter = new ExpressionUidResetter(
+                    loLimit.getLimitPlan());
+            uidResetter.visit();
+    }
     }
     
     @Override

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Thu Jul  7 00:27:05 2011
@@ -26,6 +26,7 @@ public class LOFilter extends LogicalRel
 
     private static final long serialVersionUID = 2L;
     private LogicalExpressionPlan filterPlan;
+    private boolean isSample;
         
     public LOFilter(LogicalPlan plan) {
         super("LOFilter", plan);       
@@ -36,6 +37,16 @@ public class LOFilter extends LogicalRel
         this.filterPlan = filterPlan;
     }
     
+    public LOFilter(LogicalPlan plan, boolean sample) {
+        this(plan);
+        isSample = sample;
+    }
+
+    public LOFilter(LogicalPlan plan, LogicalExpressionPlan filterPlan, boolean sample) {
+        this(plan, filterPlan);
+        isSample = sample;
+    }
+    
     public LogicalExpressionPlan getFilterPlan() {
         return filterPlan;
     }
@@ -44,6 +55,10 @@ public class LOFilter extends LogicalRel
         this.filterPlan = filterPlan;
     }
     
+    public boolean isSample() {
+        return isSample;
+    }
+    
     @Override
     public LogicalSchema getSchema() throws FrontendException {
         if (schema!=null)

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Thu Jul  7 00:27:05 2011
@@ -20,18 +20,27 @@ package org.apache.pig.newplan.logical.r
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 
 public class LOLimit extends LogicalRelationalOperator {
+    private static final long serialVersionUID = 2L;
+    private static final long NULL_LIMIT = -1;
 
-    private long mLimit;
+    private long mLimit = NULL_LIMIT;
+    private LogicalExpressionPlan mlimitPlan;
     
-    private static final long serialVersionUID = 2L;
-    //private static Log log = LogFactory.getLog(LOFilter.class);
+    public LOLimit(LogicalPlan plan) {
+        super("LOLimit", plan);
+    }
 
-        
     public LOLimit(LogicalPlan plan, long limit) {
         super("LOLimit", plan);
-        mLimit = limit;
+        this.setLimit(limit);
+    }
+
+    public LOLimit(LogicalPlan plan, LogicalExpressionPlan limitPlan) {
+        super("LOLimit", plan);
+        this.setLimitPlan(limitPlan);
     }
 
     public long getLimit() {
@@ -39,7 +48,15 @@ public class LOLimit extends LogicalRela
     }
 
     public void setLimit(long limit) {
-        mLimit = limit;
+        this.mLimit = limit;
+    }
+    
+    public LogicalExpressionPlan getLimitPlan() {
+        return mlimitPlan;
+    }
+
+    public void setLimitPlan(LogicalExpressionPlan mlimitPlan) {
+        this.mlimitPlan = mlimitPlan;
     }
     
     @Override
@@ -64,9 +81,12 @@ public class LOLimit extends LogicalRela
     
     @Override
     public boolean isEqual(Operator other) throws FrontendException{
-        if (other != null && other instanceof LOLimit && ((LOLimit)other).getLimit() == mLimit)
-            return checkEquality((LogicalRelationalOperator)other);
-        else
+        if (other != null && other instanceof LOLimit) {
+            LOLimit otherLimit = (LOLimit) other;
+            if (this.getLimit() != NULL_LIMIT && this.getLimit() == otherLimit.getLimit()
+                    || this.getLimitPlan() != null && this.getLimitPlan().isEqual(otherLimit.getLimitPlan()))
+                return checkEquality(otherLimit);
+        }
             return false;
     }
     

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Thu Jul  7 00:27:05 2011
@@ -48,13 +48,13 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -70,6 +70,7 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.LinkedMultiMap;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
@@ -80,7 +81,6 @@ import org.apache.pig.newplan.logical.Ut
 import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.impl.util.Utils;
 
 public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
     
@@ -147,7 +147,6 @@ public class LogToPhyTranslationVisitor 
 //        System.err.println("Exiting Load");
     }
     
-    
     @Override
     public void visit(LONative loNative) throws FrontendException{     
         String scope = DEFAULT_SCOPE;
@@ -331,8 +330,6 @@ public class LogToPhyTranslationVisitor 
                 ce1.setValue(ce1val);
                 ce1.setResultType(DataType.TUPLE);*/
                 
-                
-
                 POUserFunc gfc = new POUserFunc(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),cross.getRequestedParallelisam(), Arrays.asList((PhysicalOperator)ce1,(PhysicalOperator)ce2), new FuncSpec(GFCross.class.getName()));
                 gfc.setAlias(cross.getAlias());
                 gfc.setResultType(DataType.BAG);
@@ -607,6 +604,7 @@ public class LogToPhyTranslationVisitor 
     /**
      * This function takes in a List of LogicalExpressionPlan and converts them to 
      * a list of PhysicalPlans
+     * 
      * @param plans
      * @return
      * @throws FrontendException 
@@ -1240,22 +1238,39 @@ public class LogToPhyTranslationVisitor 
     @Override
     public void visit(LOLimit loLimit) throws FrontendException {
         String scope = DEFAULT_SCOPE;
-        POLimit physOp = new POLimit(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loLimit.getRequestedParallelisam());
-        physOp.setLimit(loLimit.getLimit());
-        physOp.setAlias(loLimit.getAlias());
-        currentPlan.add(physOp);
-        physOp.setResultType(DataType.BAG);
-        logToPhyMap.put(loLimit, physOp);
+        POLimit poLimit = new POLimit(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
+                loLimit.getRequestedParallelisam());
+        poLimit.setLimit(loLimit.getLimit());
+        poLimit.setAlias(loLimit.getAlias());
+        poLimit.setResultType(DataType.BAG);
+        currentPlan.add(poLimit);
+        logToPhyMap.put(loLimit, poLimit);
+
+        if (loLimit.getLimitPlan() != null) {
+            // add expression plan to POLimit
+            currentPlans.push(currentPlan);
+            currentPlan = new PhysicalPlan();
+            PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(loLimit.getLimitPlan());
+            pushWalker(childWalker);
+            currentWalker.walk(new ExpToPhyTranslationVisitor(currentWalker.getPlan(), childWalker, loLimit,
+                    currentPlan, logToPhyMap));
+            poLimit.setLimitPlan(currentPlan);
+            popWalker();
+            currentPlan = currentPlans.pop();
+        }
+
         Operator op = loLimit.getPlan().getPredecessors(loLimit).get(0);
 
         PhysicalOperator from = logToPhyMap.get(op);
         try {
-            currentPlan.connect(from, physOp);
+            currentPlan.connect(from, poLimit);
         } catch (PlanException e) {
             int errCode = 2015;
             String msg = "Invalid physical operators in the physical plan" ;
             throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
         }
+        
+        translateSoftLinks(loLimit);
     }
     
     @Override
@@ -1359,7 +1374,6 @@ public class LogToPhyTranslationVisitor 
         translateSoftLinks(loSplitOutput);
 //        System.err.println("Exiting Filter");
     }
-
     /**
      * updates plan with check for empty bag and if bag is empty to flatten a bag
      * with as many null's as dictated by the schema
@@ -1372,7 +1386,6 @@ public class LogToPhyTranslationVisitor 
         try {
             inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
          
-          
             if(inputSchema == null) {
                 int errCode = 1109;
                 String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Thu Jul  7 00:27:05 2011
@@ -49,12 +49,11 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOStore;
 import org.apache.pig.newplan.logical.relational.LOStream;
 import org.apache.pig.newplan.logical.relational.LOUnion;
-import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
-import org.apache.pig.newplan.logical.relational.SchemaNotDefinedException;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.relational.SchemaNotDefinedException;
 
 /**
  * Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned.
@@ -296,7 +295,16 @@ public class ColumnPruneHelper {
         @Override
         public void visit(LOLimit limit) throws FrontendException {
             Set<Long> output = setOutputUids(limit);
-            limit.annotate(INPUTUIDS, output);
+                                    
+            // the input uids contains all the output uids and
+            // projections in limit expression
+            Set<Long> input = new HashSet<Long>(output);
+            
+            LogicalExpressionPlan exp = limit.getLimitPlan();
+            if (exp != null)
+                collectUids(limit, exp, input);
+            
+            limit.annotate(INPUTUIDS, input);
         }
         
         @Override
@@ -417,8 +425,7 @@ public class ColumnPruneHelper {
         public void visit(LOForEach foreach) throws FrontendException {
             Set<Long> output = setOutputUids(foreach);
             
-            LogicalPlan innerPlan = foreach.getInnerPlan();
-            LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+            LOGenerate gen = OptimizerUtils.findGenerate(foreach);
             gen.annotate(OUTPUTUIDS, output);
             
             visit(gen);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Thu Jul  7 00:27:05 2011
@@ -46,6 +46,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;
@@ -220,6 +221,10 @@ public class ColumnPruneVisitor extends 
     }
     
     @Override
+    public void visit(LOLimit limit) throws FrontendException {
+    }
+    
+    @Override
     public void visit(LOSplitOutput splitOutput) throws FrontendException {
     }
     

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/LimitOptimizer.java Thu Jul  7 00:27:05 2011
@@ -22,7 +22,8 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LODistinct;
@@ -38,8 +39,6 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
-import org.apache.pig.newplan.Operator;
-import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.optimizer.Transformer;
 
@@ -124,7 +123,9 @@ public class LimitOptimizer extends Rule
                     .get(0);
                 currentPlan.removeAndReconnect(limit);
                 currentPlan.insertBetween(prepredecessor, limit, pred);
-            } else if (pred instanceof LOCross || pred instanceof LOUnion) {
+            } else if (limit.getLimitPlan() == null) {
+                // TODO selectively enable optimizations for variable limit
+                if (pred instanceof LOCross || pred instanceof LOUnion) {
                 // Limit can be duplicated, and the new instance pushed in front
                 // of an operator for the following operators
                 // (that is, if you have X->limit, you can transform that to
@@ -189,3 +190,4 @@ public class LimitOptimizer extends Rule
         }
     }
 }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/LineageFindRelVisitor.java Thu Jul  7 00:27:05 2011
@@ -381,8 +381,10 @@ public class LineageFindRelVisitor exten
     }
     
     @Override
-    public void visit(LOLimit relOp) throws FrontendException{
-        mapToPredLoadFunc(relOp);
+    public void visit(LOLimit loLimit) throws FrontendException{
+        mapToPredLoadFunc(loLimit);
+        if (loLimit.getLimitPlan() != null)
+            visitExpression(loLimit.getLimitPlan());
     }
     
     @Override

Added: pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVariableValidator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVariableValidator.java?rev=1143614&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVariableValidator.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/ScalarVariableValidator.java Thu Jul  7 00:27:05 2011
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.visitor;
+
+import org.apache.pig.PigException;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+/**
+ * This validator checks the correctness of use of scalar variables in logical
+ * operators. It assesses the validity of the expression by making sure there is
+ * no projection in it. Currently it works for Limit and Sample (see PIG-1926)
+ * 
+ */
+public class ScalarVariableValidator extends LogicalRelationalNodesVisitor {
+    public static final String ERR_MSG_SCALAR = "Expression in Limit/Sample should be scalar";
+
+    public ScalarVariableValidator(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOLimit limit) throws FrontendException {
+        LogicalExpressionPlan expression = limit.getLimitPlan();
+        if (expression != null) {
+            ProjectFinder pf = new ProjectFinder(expression,
+                    new ReverseDependencyOrderWalker(expression));
+            pf.visit();
+            if (pf.found()) {
+                int errCode = 1131;
+                throw new VisitorException(limit, ERR_MSG_SCALAR, errCode,
+                        PigException.INPUT);
+            }
+        }
+    }
+
+    @Override
+    public void visit(LOFilter filter) throws FrontendException {
+        LogicalExpressionPlan expression = filter.getFilterPlan();
+        if (expression != null) {
+            // if it is a Sample, the expression must be scalar
+            if (filter.isSample()) {
+                ProjectFinder pf = new ProjectFinder(expression,
+                        new ReverseDependencyOrderWalker(expression));
+                pf.visit();
+                if (pf.found()) {
+                    int errCode = 1131;
+                    throw new VisitorException(filter, ERR_MSG_SCALAR, errCode,
+                            PigException.INPUT);
+                }
+            }
+        }
+    }
+
+    public static class ProjectFinder extends LogicalExpressionVisitor {
+        private boolean foundProject;
+
+        public boolean found() {
+            return foundProject;
+        }
+
+        protected ProjectFinder(OperatorPlan p, PlanWalker walker)
+                throws FrontendException {
+            super(p, walker);
+        }
+
+        @Override
+        public void visit(ProjectExpression project) {
+            foundProject = true;
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/visitor/TypeCheckingRelVisitor.java Thu Jul  7 00:27:05 2011
@@ -447,18 +447,44 @@ public class TypeCheckingRelVisitor exte
     }
 
     @Override
-    public void visit(LOLimit op) throws VisitorException {
-        op.resetSchema();
+    public void visit(LOLimit limit) throws FrontendException {
+        limit.resetSchema();
+        LogicalExpressionPlan expressionPlan = limit.getLimitPlan();
+        if (expressionPlan != null) {
+            // Check that the inner plan has only 1 output port
+            if (expressionPlan.getSources().size() > 1) {
+                int errCode = 1057;
+                String msg = "Limit's expression plan can only have one output";
+                msgCollector.collect(msg, MessageType.Error);
+                throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, null);
+            }
+
+            // visit the limit expression
+            visitExpressionPlan(expressionPlan, limit);
+
+            // check limit expression type
+            byte innerCondType = ((LogicalExpression) expressionPlan.getSources().get(0))
+                    .getType();
+            // cast to long if it is a bytearray
+            if (innerCondType == DataType.BYTEARRAY)
+                insertAtomicCastForInnerPlan(expressionPlan, limit, DataType.LONG);
+            // else it must be an int or a long
+            else if (innerCondType != DataType.LONG && innerCondType != DataType.INTEGER) {
+                int errCode = 1058;
+                String msg = "Limit's expression must evaluate to Long or Integer. Found: "
+                        + DataType.findTypeName(innerCondType);
+                msgCollector.collect(msg, MessageType.Error);
+                throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, null);
+            }
+        }
         try {
             // Compute the schema
-            op.getSchema() ;
-        }
-        catch (FrontendException fe) {
+            limit.getSchema();
+        } catch (FrontendException fe) {
             int errCode = 1055;
-            String msg = "Problem while reading"
-                + " schemas from inputs of Limit" ;
+            String msg = "Problem while reading schemas from inputs of Limit";
             msgCollector.collect(msg, MessageType.Error) ;
-            throwTypeCheckerException(op, msg, errCode, PigException.INPUT, fe) ;
+            throwTypeCheckerException(limit, msg, errCode, PigException.INPUT, fe);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Thu Jul  7 00:27:05 2011
@@ -344,11 +344,11 @@ bin_expr 
 ;
 
 limit_clause 
-    : ^( LIMIT rel ( INTEGER | LONGINTEGER ) )
+    : ^( LIMIT rel ( INTEGER | LONGINTEGER | expr ) )
 ;
 
 sample_clause 
-    : ^( SAMPLE rel DOUBLENUMBER )    
+    :	 ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
 ;
 
 order_clause 
@@ -457,7 +457,7 @@ nested_distinct 
 ;
 
 nested_limit 
-    : ^( LIMIT nested_op_input INTEGER )
+    : ^( LIMIT nested_op_input ( INTEGER | expr ) )
 ;
 
 nested_op_input : col_ref | nested_proj

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Thu Jul  7 00:27:05 2011
@@ -316,11 +316,11 @@ bin_expr 
 
 limit_clause 
     : ^( LIMIT { sb.append($LIMIT.text).append(" "); } rel 
-        ( INTEGER { sb.append(" ").append($INTEGER.text); } | LONGINTEGER { sb.append(" ").append($LONGINTEGER.text); } ) )
+        ( INTEGER { sb.append(" ").append($INTEGER.text); } | LONGINTEGER { sb.append(" ").append($LONGINTEGER.text); } | expr ) )
 ;
 
 sample_clause 
-    : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } )    
+    : ^( SAMPLE { sb.append($SAMPLE.text).append(" "); } rel ( DOUBLENUMBER { sb.append(" ").append($DOUBLENUMBER.text); } | expr ) )    
 ;
 
 order_clause 
@@ -435,7 +435,7 @@ nested_distinct 
 ;
 
 nested_limit 
-    : ^( LIMIT { sb.append($LIMIT.text).append(" "); }  nested_op_input INTEGER { sb.append(" ").append($INTEGER.text); } )
+    : ^( LIMIT { sb.append($LIMIT.text).append(" "); }  nested_op_input ( INTEGER { sb.append(" ").append($INTEGER.text); } | expr ) )
 ;
 
 nested_op_input : col_ref | nested_proj

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Thu Jul  7 00:27:05 2011
@@ -331,10 +331,10 @@ pound_proj : ^( POUND ( QUOTEDSTRING | N
 bin_expr : ^( BIN_EXPR cond expr expr )
 ;
 
-limit_clause : ^( LIMIT rel ( INTEGER | LONGINTEGER ) )
+limit_clause : ^( LIMIT rel ( INTEGER | LONGINTEGER | expr ) )
 ;
 
-sample_clause : ^( SAMPLE rel DOUBLENUMBER )
+sample_clause : ^( SAMPLE rel ( DOUBLENUMBER | expr ) )
 ;
 
 order_clause : ^( ORDER rel order_by_clause func_clause? )
@@ -451,7 +451,7 @@ nested_sort : ^( ORDER nested_op_input  
 nested_distinct : ^( DISTINCT nested_op_input )
 ;
 
-nested_limit : ^( LIMIT nested_op_input INTEGER )
+nested_limit : ^( LIMIT nested_op_input ( INTEGER | expr ) )
 ;
 
 nested_op_input : col_ref | nested_proj

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Thu Jul  7 00:27:05 2011
@@ -148,6 +148,14 @@ public class LogicalPlanBuilder {
         return new LOFilter( plan );
     }
 
+    LOLimit createLimitOp() {
+        return new LOLimit( plan );
+    }
+    
+    LOFilter createSampleOp() {
+        return new LOFilter( plan, true );
+    }
+    
     String buildFilterOp(SourceLocation loc, LOFilter op, String alias, String inputAlias, LogicalExpressionPlan expr) {
         op.setFilterPlan( expr );
         return buildOp( loc, op, alias, inputAlias, null );
@@ -163,6 +171,11 @@ public class LogicalPlanBuilder {
         return buildOp( loc, op, alias, inputAlias, null );
     }
     
+    String buildLimitOp(SourceLocation loc, LOLimit op, String alias, String inputAlias, LogicalExpressionPlan expr) {
+        op.setLimitPlan(expr);
+        return buildOp(loc, op, alias, inputAlias, null);
+    }
+    
     String buildSampleOp(SourceLocation loc, String alias, String inputAlias, double value,
             SourceLocation valLoc) {
         LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
@@ -171,10 +184,17 @@ public class LogicalPlanBuilder {
         konst.setLocation( valLoc );
         UserFuncExpression udf = new UserFuncExpression( filterPlan, new FuncSpec( RANDOM.class.getName() ) );
         new LessThanExpression( filterPlan, udf, konst );
-        LOFilter filter = new LOFilter( plan );
+        LOFilter filter = new LOFilter( plan, true );
         return buildFilterOp( loc, filter, alias, inputAlias, filterPlan );
     }
     
+    String buildSampleOp(SourceLocation loc, LOFilter filter, String alias, String inputAlias,
+            LogicalExpressionPlan samplePlan, LogicalExpression expr) {
+        UserFuncExpression udf = new UserFuncExpression( samplePlan, new FuncSpec( RANDOM.class.getName() ) );
+        new LessThanExpression( samplePlan, udf, expr );
+        return buildFilterOp( loc, filter, alias, inputAlias, samplePlan );
+    }
+    
     String buildUnionOp(SourceLocation loc, String alias, List<String> inputAliases, boolean onSchema) {
         LOUnion op = new LOUnion( plan, onSchema );
         return buildOp( loc, op, alias, inputAliases, null );
@@ -907,6 +927,10 @@ public class LogicalPlanBuilder {
         return new LOFilter( plan );
     }
     
+    static LOLimit createNestedLimitOp(LogicalPlan plan) {
+        return new LOLimit ( plan );
+    }
+    
     // Build operator for foreach inner plan.
     Operator buildNestedFilterOp(SourceLocation loc, LOFilter op, LogicalPlan plan, String alias, 
             Operator inputOp, LogicalExpressionPlan expr) {
@@ -927,6 +951,13 @@ public class LogicalPlanBuilder {
         return op;
     }
     
+    Operator buildNestedLimitOp(SourceLocation loc, LOLimit op, LogicalPlan plan, String alias, 
+            Operator inputOp, LogicalExpressionPlan expr) {
+        op.setLimitPlan( expr );
+        buildNestedOp( loc, plan, op, alias, inputOp );
+        return op;
+    }
+    
     private void buildNestedOp(SourceLocation loc, LogicalPlan plan, LogicalRelationalOperator op,
             String alias, Operator inputOp) {
         op.setLocation( loc );

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Thu Jul  7 00:27:05 2011
@@ -79,6 +79,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOFilter;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
@@ -882,25 +883,47 @@ bin_expr[LogicalExpressionPlan plan] ret
 ;
 
 limit_clause returns[String alias]
- : ^( LIMIT rel INTEGER  )
+scope GScope;
+@init { 
+    $GScope::currentOp = builder.createLimitOp();
+    LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+}
+ :  ^( LIMIT rel ( INTEGER
    {
-       $alias = builder.buildLimitOp( new SourceLocation( (PigParserNode)$LIMIT ), $statement::alias,
-           $statement::inputAlias, Long.valueOf( $INTEGER.text ) );
+       $alias = builder.buildLimitOp( new SourceLocation( (PigParserNode)$LIMIT ), 
+         $statement::alias, $statement::inputAlias, Long.valueOf( $INTEGER.text ) );
    }
- | ^( LIMIT rel LONGINTEGER )
+ | LONGINTEGER
    {
-       $alias = builder.buildLimitOp( new SourceLocation( (PigParserNode)$LIMIT ), $statement::alias,
-           $statement::inputAlias, builder.parseLong( $LONGINTEGER.text ) );
+       $alias = builder.buildLimitOp( new SourceLocation( (PigParserNode)$LIMIT ),
+         $statement::alias, $statement::inputAlias, builder.parseLong( $LONGINTEGER.text ) );
    }
+ | expr[exprPlan]
+   {
+       $alias = builder.buildLimitOp( new SourceLocation( (PigParserNode)$LIMIT ),
+           (LOLimit)$GScope::currentOp, $statement::alias, $statement::inputAlias, exprPlan);
+   }
+ ) )
 ;
 
 sample_clause returns[String alias]
- : ^( SAMPLE rel DOUBLENUMBER )
+scope GScope;
+@init { 
+    $GScope::currentOp = builder.createSampleOp();
+    LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+}
+ : ^( SAMPLE rel ( DOUBLENUMBER
    {
        $alias = builder.buildSampleOp( new SourceLocation( (PigParserNode)$SAMPLE ), $statement::alias,
            $statement::inputAlias, Double.valueOf( $DOUBLENUMBER.text ),
            new SourceLocation( (PigParserNode)$DOUBLENUMBER ) );
    }
+ | expr[exprPlan]
+   {
+       $alias = builder.buildSampleOp( new SourceLocation( (PigParserNode)$SAMPLE ),
+           (LOFilter)$GScope::currentOp, $statement::alias, $statement::inputAlias, exprPlan, $expr.expr);
+   }
+  ) )
 ;
 
 order_clause returns[String alias]
@@ -1203,15 +1226,25 @@ nested_distinct[String alias] returns[Op
 ;
 
 nested_limit[String alias] returns[Operator op]
+scope GScope;
 @init {
     Operator inputOp = null;
+    LogicalExpressionPlan exprPlan = new LogicalExpressionPlan();
+    $GScope::currentOp = builder.createNestedLimitOp( $foreach_plan::innerPlan );
 }
- : ^( LIMIT nested_op_input INTEGER )
+ : ^( LIMIT nested_op_input ( INTEGER 
    {
        SourceLocation loc = new SourceLocation( (PigParserNode)$LIMIT );
        $op = builder.buildNestedLimitOp( loc, $foreach_plan::innerPlan, $alias, $nested_op_input.op, 
            Integer.valueOf( $INTEGER.text ) );
    }
+ | expr[exprPlan] 
+   {
+       SourceLocation loc = new SourceLocation( (PigParserNode)$LIMIT );
+       $op = builder.buildNestedLimitOp( loc, (LOLimit)$GScope::currentOp, $foreach_plan::innerPlan, $alias,
+           $nested_op_input.op, exprPlan);
+   }
+  ) )
 ;
 
 nested_op_input returns[Operator op]

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Thu Jul  7 00:27:05 2011
@@ -464,10 +464,10 @@ neg_expr : MINUS cast_expr
         -> ^( NEG cast_expr )
 ;
 
-limit_clause : LIMIT^ rel ( INTEGER | LONGINTEGER )
+limit_clause : LIMIT^ rel ( INTEGER | LONGINTEGER | expr )
 ;
 
-sample_clause : SAMPLE^ rel DOUBLENUMBER
+sample_clause : SAMPLE^ rel ( DOUBLENUMBER | expr )
 ;
 
 order_clause : ORDER^ rel BY! order_by_clause ( USING! func_clause )?
@@ -587,7 +587,7 @@ nested_sort : ORDER^ nested_op_input BY!
 nested_distinct : DISTINCT^ nested_op_input
 ;
 
-nested_limit : LIMIT^ nested_op_input INTEGER
+nested_limit : LIMIT^ nested_op_input ( INTEGER | expr )
 ;
 
 nested_op_input : col_ref | nested_proj

Added: pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestLimitVariable.java?rev=1143614&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestLimitVariable.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestLimitVariable.java Thu Jul  7 00:27:05 2011
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestLimitVariable {
+    private static String[] data = { "1\t11", "2\t3", "3\t10", "4\t11", "5\t10", "6\t15" };
+    private static File inputFile;
+    private static MiniCluster cluster;
+    private static PigServer pigServer;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws Exception {
+        inputFile = Util.createFile(data);
+        cluster = MiniCluster.buildCluster();
+        Util.copyFromLocalToCluster(cluster, inputFile.getAbsolutePath(), inputFile.getName());
+        inputFile.delete();
+    }
+
+    @Before
+    public void setUp() throws ExecException {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    @AfterClass
+    public static void oneTimeTearDown() throws IOException {
+        Util.deleteFile(cluster, inputFile.getName());
+        cluster.shutDown();
+    }
+
+    @Test
+    public void testLimitVariable1() throws IOException {
+        String query = 
+            "a = load '" + inputFile.getName() + "';" + 
+            "b = group a all;" + 
+            "c = foreach b generate COUNT(a) as sum;" + 
+            "d = order a by $0 DESC;" + 
+            "e = limit d c.sum/2;" // return top half of the tuples
+            ;
+
+        Util.registerMultiLineQuery(pigServer, query);
+        Iterator<Tuple> it = pigServer.openIterator("e");
+
+        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] {
+                "(6,15)", "(5,10)", "(4,11)" });
+        Util.checkQueryOutputs(it, expectedRes);
+    }
+    
+    @Test
+    public void testLimitVariable2() throws IOException {
+        String query = 
+            "a = load '" + inputFile.getName() + "' as (id, num);" +
+            "b = filter a by id == 2;" + // only 1 tuple returned (2,3)
+            "c = order a by id ASC;" +
+            "d = limit c b.num;" + // test bytearray to long implicit cast
+            "e = limit c b.num * 2;" // return all (6) tuples
+            ;
+        
+        Util.registerMultiLineQuery(pigServer, query);
+        Iterator<Tuple> itD = pigServer.openIterator("d");
+        List<Tuple> expectedResD = Util.getTuplesFromConstantTupleStrings(new String[] {
+                "(1,11)", "(2,3)", "(3,10)" });
+        Util.checkQueryOutputs(itD, expectedResD);
+
+        Iterator<Tuple> itE = pigServer.openIterator("e");
+        List<Tuple> expectedResE = Util.getTuplesFromConstantTupleStrings(new String[] {
+                "(1,11)", "(2,3)", "(3,10)", "(4,11)", "(5,10)", "(6,15)" });
+        Util.checkQueryOutputs(itE, expectedResE);
+    }
+    
+    @Test(expected=FrontendException.class)
+    public void testLimitVariableException1() throws Throwable {
+        String query = 
+            "a = load '" + inputFile.getName() + "';" + 
+            "b = group a all;" + 
+            "c = foreach b generate COUNT(a) as sum;" + 
+            "d = order a by $0 DESC;" + 
+            "e = limit d $0;" // reference to non scalar context is not allowed
+            ;
+
+        Util.registerMultiLineQuery(pigServer, query);
+        try {
+            pigServer.openIterator("e");
+        } catch (FrontendException fe) {
+            Util.checkMessageInException(fe, ScalarVariableValidator.ERR_MSG_SCALAR);
+            throw fe;
+        }
+    }
+    
+    @Test(expected=FrontendException.class)
+    public void testLimitVariableException2() throws Throwable {
+        String query = 
+            // num is a chararray
+            "a = load '" + inputFile.getName() + "' as (id:int, num:chararray);" + 
+            "b = filter a by num == '3';" + 
+            "c = foreach b generate num as falsenum;" + 
+            "d = order a by id DESC;" + 
+            "e = limit d c.falsenum;" // expression must be Long or Integer
+            ;
+        
+        Util.registerMultiLineQuery(pigServer, query);
+        try {
+            pigServer.openIterator("e");
+        } catch (FrontendException fe) {
+            Util.checkMessageInException(fe,
+                    "Limit's expression must evaluate to Long or Integer");
+            throw fe;
+        }
+    }
+    
+    @Test
+    public void testNestedLimitVariable1() throws Throwable {
+        String query = 
+            // does not work without schema because Util returns typed tuples
+            "a = load '" + inputFile.getName() + "' as (id:int, num:int);" + 
+            "b = group a by num;" + 
+            "c = foreach b generate COUNT(a) as ntup;" +
+            "d = group c all;" +
+            "e = foreach d generate MIN(c.ntup) AS min;" +
+            "f = foreach b {" +
+            " g = order a by id ASC;" +
+            " h = limit g e.min;" +
+            " generate FLATTEN(h);" +
+            "}"
+            ;
+        
+        Util.registerMultiLineQuery(pigServer, query);
+        Iterator<Tuple> it = pigServer.openIterator("f");
+
+        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] {
+                "(1,11)", "(2,3)", "(3,10)", "(6,15)" });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSample.java?rev=1143614&r1=1143613&r2=1143614&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSample.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestSample.java Thu Jul  7 00:27:05 2011
@@ -20,11 +20,14 @@ package org.apache.pig.test;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.newplan.logical.visitor.ScalarVariableValidator;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -103,4 +106,44 @@ public class TestSample {
     {
         verify("myid = sample (load '"+ tmpfilepath + "') 0.5;", DATALEN/3, DATALEN*2/3);
     }
+    
+    @Test
+    public void testSample_VariableNone() throws Exception {
+        verify("a = LOAD '" + tmpfilepath + "'; " +
+                "b = GROUP a all;" +
+                "c = FOREACH b GENERATE COUNT(a) AS count;" +
+        		"myid = SAMPLE a (c.count - c.count);", 0, 0);
+}
+    
+    @Test
+    public void testSample_VariableAll() throws Exception {
+        verify("a = LOAD '" + tmpfilepath + "'; " +
+                "b = GROUP a all;" +
+                "c = FOREACH b GENERATE COUNT(a) AS count;" +
+                "myid = SAMPLE a (c.count / c.count);", DATALEN, DATALEN);
+    }
+    
+    @Test
+    public void testSample_VariableSome() throws Exception {
+        verify("a = LOAD '" + tmpfilepath + "'; " +
+                "b = GROUP a all;" +
+                "c = FOREACH b GENERATE COUNT(a) AS count;" +
+                "myid = SAMPLE a (c.count / (2.0 * c.count) );", DATALEN/3, DATALEN*2/3);
+    }
+    
+    @Test(expected=FrontendException.class)
+    public void testSampleScalarException() throws IOException {
+        String query = 
+            "a = load '" + tmpfilepath + "';" + 
+            "b = sample a $0;" // reference to non scalar context is not allowed
+            ;
+
+        Util.registerMultiLineQuery(pig, query);
+        try {
+            pig.openIterator("b");
+        } catch (FrontendException fe) {
+            Util.checkMessageInException(fe, ScalarVariableValidator.ERR_MSG_SCALAR);
+            throw fe;
+        }
+    }
 }



Mime
View raw message