pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From the...@apache.org
Subject svn commit: r1150955 - in /pig/trunk: ./ src/org/apache/pig/parser/ test/org/apache/pig/test/
Date Mon, 25 Jul 2011 23:28:57 GMT
Author: thejas
Date: Mon Jul 25 23:28:55 2011
New Revision: 1150955

URL: http://svn.apache.org/viewvc?rev=1150955&view=rev
Log:
PIG-1904: Default split destination (azaroth via thejas)

Added:
    pig/trunk/test/org/apache/pig/test/TestSplit.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/parser/AliasMasker.g
    pig/trunk/src/org/apache/pig/parser/AstPrinter.g
    pig/trunk/src/org/apache/pig/parser/AstValidator.g
    pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
    pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
    pig/trunk/src/org/apache/pig/parser/QueryLexer.g
    pig/trunk/src/org/apache/pig/parser/QueryParser.g

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul 25 23:28:55 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-1904: Default split destination (azaroth via thejas)
+
 PIG-2125: Make Pig work with hadoop .NEXT (daijy)
 
 PIG-2143: Make PigStorage optionally store schema; improve docs. (dvryaboy) 

Modified: pig/trunk/src/org/apache/pig/parser/AliasMasker.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AliasMasker.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AliasMasker.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AliasMasker.g Mon Jul 25 23:28:55 2011
@@ -479,13 +479,17 @@ mr_clause 
 ;
 
 split_clause 
-    : ^( SPLIT rel split_branch split_branch+ )
+    : ^( SPLIT rel split_branch+ split_otherwise? )
 ;
 
 split_branch
     : ^( SPLIT_BRANCH alias cond )
 ;
 
+split_otherwise 
+    : ^( OTHERWISE alias ) 
+;
+
 col_ref : alias_col_ref | dollar_col_ref
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/AstPrinter.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstPrinter.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstPrinter.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstPrinter.g Mon Jul 25 23:28:55 2011
@@ -464,13 +464,17 @@ mr_clause 
 
 split_clause 
     : ^( SPLIT  { sb.append($SPLIT.text).append(" "); }
-        rel { sb.append(" INTO "); } split_branch ( { sb.append(", "); } split_branch)+ )
+        rel { sb.append(" INTO "); } split_branch ( { sb.append(", "); } split_branch )*
split_otherwise? )
 ;
 
 split_branch
     : ^( SPLIT_BRANCH alias { sb.append(" IF "); } cond )    
 ;
 
+split_otherwise 
+    : ^( OTHERWISE { sb.append($OTHERWISE.text).append(" "); } alias ) 
+;
+
 col_ref : alias_col_ref | dollar_col_ref
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/AstValidator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/AstValidator.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/AstValidator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/AstValidator.g Mon Jul 25 23:28:55 2011
@@ -470,7 +470,7 @@ stream_clause : ^( STREAM rel ( EXECCOMM
 mr_clause : ^( MAPREDUCE QUOTEDSTRING path_list? store_clause load_clause EXECCOMMAND? )
 ;
 
-split_clause : ^( SPLIT rel split_branch+ )
+split_clause : ^( SPLIT rel split_branch+ split_otherwise? )
 ;
 
 split_branch
@@ -480,6 +480,12 @@ split_branch
    }
 ;
 
+split_otherwise 	: ^( OTHERWISE alias )
+   {
+       aliases.add( $alias.name );
+   }
+;
+
 col_ref : alias_col_ref | dollar_col_ref
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanBuilder.java Mon Jul 25 23:28:55 2011
@@ -60,6 +60,8 @@ import org.apache.pig.newplan.logical.ex
 import org.apache.pig.newplan.logical.expression.LessThanExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.expression.UserFuncExpression;
 import org.apache.pig.newplan.logical.relational.LOCogroup;
@@ -85,6 +87,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.newplan.logical.relational.LogicalSchema;
 import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.newplan.logical.rules.OptimizerUtils;
 import org.apache.pig.newplan.logical.visitor.ProjectStarExpander;
 
 public class LogicalPlanBuilder {
@@ -215,6 +218,48 @@ public class LogicalPlanBuilder {
         return buildOp ( loc, op, alias, inputAlias, null );
     }
     
+    String buildSplitOtherwiseOp(SourceLocation loc, LOSplitOutput op, String alias, String
inputAlias)
+            throws ParserValidationException, PlanGenerationFailureException {
+        LogicalExpressionPlan splitPlan = new LogicalExpressionPlan();
+        Operator losplit = lookupOperator(inputAlias);
+        LogicalExpression currentExpr = null;
+        for (Operator losplitoutput : plan.getSuccessors(losplit)) {
+            // take all the LOSplitOutput and negate their filter plans
+            LogicalExpressionPlan fragment = ((LOSplitOutput) losplitoutput)
+                    .getFilterPlan();
+            try {
+                if (OptimizerUtils.planHasNonDeterministicUdf(fragment))
+                    throw new ParserValidationException(
+                            intStream, loc, new FrontendException(op,
+                                    "Can not use Otherwise in Split with an expression containing
a @Nondeterministic UDF", 1131));
+            } catch (FrontendException e) {
+                e.printStackTrace();
+                throw new PlanGenerationFailureException(intStream, loc, e);
+            }
+            LogicalExpression root = null;
+            try {
+                // get the root expression of the filter plan in LOSplitOutput and copy it
+                root = ((LogicalExpression) fragment.getSources().get(0))
+                        .deepCopy(splitPlan);
+            } catch (FrontendException e) {
+                e.printStackTrace();
+                throw new PlanGenerationFailureException(intStream, loc, e);
+            }
+            if (root == null)
+                throw new PlanGenerationFailureException(intStream, loc,
+                        new FrontendException(op,
+                                "Could not retrieve LogicalExpression for LOSplitOutput "
+ losplitoutput, 2048));
+            if (currentExpr == null)
+                currentExpr = root;
+            else
+                currentExpr = new OrExpression(splitPlan, currentExpr, root);
+        }
+        // using De Morgan's law (!A && !B) == !(A || B)
+        currentExpr = new NotExpression(splitPlan, currentExpr);
+        op.setFilterPlan(splitPlan);
+        return buildOp(loc, op, alias, inputAlias, null);
+    }
+    
     String buildCrossOp(SourceLocation loc, String alias, List<String> inputAliases,
String partitioner) {
         LOCross op = new LOCross( plan );
         return buildOp ( loc, op, alias, inputAliases, partitioner );

Modified: pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g (original)
+++ pig/trunk/src/org/apache/pig/parser/LogicalPlanGenerator.g Mon Jul 25 23:28:55 2011
@@ -1326,7 +1326,7 @@ split_clause
           SourceLocation loc = new SourceLocation( (PigParserNode)$SPLIT );
           $statement::inputAlias = builder.buildSplitOp( loc, $statement::inputAlias );
       } 
-      split_branch+
+      split_branch+ split_otherwise?
     )
 ;
 
@@ -1344,6 +1344,19 @@ scope GScope;
    }
 ;
 
+split_otherwise throws PlanValidationException, PlanGenerationFailureException
+scope GScope;
+@init {
+    $GScope::currentOp = builder.createSplitOutputOp();
+}
+ : ^( OTHERWISE alias ) 
+  {
+       SourceLocation loc = new SourceLocation( (PigParserNode)$alias.start );
+       builder.buildSplitOtherwiseOp( loc, (LOSplitOutput)$GScope::currentOp, $alias.name,
+           $statement::inputAlias);
+  }
+;
+
 col_ref[LogicalExpressionPlan plan] returns[LogicalExpression expr]
  : alias_col_ref[$plan] { $expr = $alias_col_ref.expr; }
  | dollar_col_ref[$plan] { $expr = $dollar_col_ref.expr; }

Modified: pig/trunk/src/org/apache/pig/parser/QueryLexer.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryLexer.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryLexer.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryLexer.g Mon Jul 25 23:28:55 2011
@@ -105,6 +105,9 @@ INTO : 'INTO'
 IF : 'IF'
 ;
 
+OTHERWISE : 'OTHERWISE'
+;
+
 ALL : 'ALL'
 ;
 

Modified: pig/trunk/src/org/apache/pig/parser/QueryParser.g
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/parser/QueryParser.g?rev=1150955&r1=1150954&r2=1150955&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/parser/QueryParser.g (original)
+++ pig/trunk/src/org/apache/pig/parser/QueryParser.g Mon Jul 25 23:28:55 2011
@@ -608,14 +608,18 @@ stream_clause : STREAM^ rel THROUGH! ( E
 mr_clause : MAPREDUCE^ QUOTEDSTRING ( LEFT_PAREN! path_list RIGHT_PAREN! )? store_clause
load_clause EXECCOMMAND?
 ;
 
-split_clause : SPLIT rel INTO split_branch ( COMMA split_branch )+
-            -> ^( SPLIT rel split_branch+ )
+split_clause : SPLIT rel INTO split_branch ( ( COMMA split_branch )+ | ( ( COMMA split_branch
)* COMMA split_otherwise ) )
+            -> ^( SPLIT rel split_branch+ split_otherwise?)
 ;
 
 split_branch : alias IF cond
             -> ^( SPLIT_BRANCH alias cond )
 ;
 
+split_otherwise : alias OTHERWISE
+            -> ^( OTHERWISE alias )
+;
+
 col_ref : alias_col_ref | dollar_col_ref
 ;
 

Added: pig/trunk/test/org/apache/pig/test/TestSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestSplit.java?rev=1150955&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestSplit.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestSplit.java Mon Jul 25 23:28:55 2011
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSplit {
+    private static final String[] data = new String[] { "1", "2", "3", "4", "5", "6" };
+    private static File file;
+    private PigServer pig;
+
+    @BeforeClass
+    public static void oneTimeSetUp() throws IOException {
+        file = Util.createLocalInputFile("split_input", data);
+    }
+
+    @Before
+    public void setUp() throws ExecException {
+        pig = new PigServer(ExecType.LOCAL);
+    }
+
+    @Test
+    public void testSplit1() throws IOException {
+        String query = 
+            "a = load '" + file.getAbsolutePath() + "' as (id:int);" + 
+            "split a into b if id > 3, c if id < 3, d otherwise;"
+            ;
+
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("d");
+
+        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[]
{ "(3)" });
+        Util.checkQueryOutputs(it, expectedRes);
+    }
+    
+    @Test
+    public void testSplit2() throws IOException {
+        String query = 
+            "a = load '" + file.getAbsolutePath() + "' as (id:int);" + 
+            "split a into b if id % 2 == 0, d otherwise;"
+            ;
+
+        Util.registerMultiLineQuery(pig, query);
+        Iterator<Tuple> it = pig.openIterator("d");
+
+        List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[]
{ "(1)", "(3)", "(5)" });
+        Util.checkQueryOutputsAfterSort(it, expectedRes);
+    }
+    
+    @Test(expected=FrontendException.class)
+    public void testSplitNondeterministic() throws IOException {
+        String query = 
+            "a = load '" + file.getAbsolutePath() + "' as (id:int);" + 
+            "split a into b if RANDOM() < 0.5, d otherwise;"
+            ;
+
+        try {
+            Util.registerMultiLineQuery(pig, query);
+        } catch (FrontendException fe) {
+            Util.checkMessageInException(fe,
+                    "Can not use Otherwise in Split with an expression containing a @Nondeterministic
UDF");
+            throw fe;
+        }
+
+    }
+}
+    



Mime
View raw message