pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ga...@apache.org
Subject svn commit: r598827 - in /incubator/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/impl/eval/ src/org/apache/pig/impl/logicalLayer/parser/ src/org/apache/pig/impl/mapreduceExec/ src/org/apache/pig/impl/physicalLayer/ test/org/apache/pig/test/
Date Wed, 28 Nov 2007 00:17:34 GMT
Author: gates
Date: Tue Nov 27 16:17:31 2007
New Revision: 598827

URL: http://svn.apache.org/viewvc?rev=598827&view=rev
Log:
PIG-20 Added custom comparator functions for order by.  Provided by Patrick
Hunt.


Added:
    incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
    incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
    incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
Modified:
    incubator/pig/trunk/CHANGES.txt
    incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
    incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
    incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
    incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java

Modified: incubator/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/CHANGES.txt?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/CHANGES.txt (original)
+++ incubator/pig/trunk/CHANGES.txt Tue Nov 27 16:17:31 2007
@@ -22,3 +22,5 @@
 	PIG-8 added binary comparator (olgan)
 
 	PIG-17 integrated with Hadoop 0.15 (olgan@)
+
+	PIG-20 Added custom comparator functions for order by (phunt via gates)

Added: incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java (added)
+++ incubator/pig/trunk/src/org/apache/pig/ComparisonFunc.java Tue Nov 27 16:17:31 2007
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.pig.data.Tuple;
+
+
+public abstract class ComparisonFunc extends WritableComparator {
+    public ComparisonFunc() {
+        super(Tuple.class);
+    }
+
+    public int compare(WritableComparable a, WritableComparable b) {
+        return compare((Tuple)a, (Tuple)b);
+    }
+
+    /**
+     * This callback method must be implemented by all subclasses. Compares 
+     * its two arguments for order. Returns a negative integer, zero, or a 
+     * positive integer as the first argument is less than, equal to, or 
+     * greater than the second. The order of elements of the tuples correspond 
+     * to the fields specified in the order by clause. 
+     * Same semantics as {@link java.util.Comparator}.
+     * 
+     * @param t1 the first Tuple to be compared.
+     * @param t2 the second Tuple to be compared.
+     * @throws IOException
+     * @see java.util.Comparator
+     */
+    abstract public int compare(Tuple t1, Tuple t2);
+}

Modified: incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/eval/EvalSpec.java Tue Nov 27 16:17:31 2007
@@ -22,6 +22,7 @@
 import java.util.Comparator;
 import java.util.List;
 
+import org.apache.pig.ComparisonFunc;
 import org.apache.pig.data.Datum;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.FunctionInstantiator;
@@ -41,6 +42,8 @@
 	transient DataCollector simpleEvalInput; 
 	protected boolean inner = false; //used only if this generate spec is used in a group by
 
+	private String comparatorFuncName;
+	private transient Comparator<Tuple> comparator;
 	
 	/*
 	 * Keep a precomputed pipeline ready if we do simple evals
@@ -51,7 +54,36 @@
 		simpleEvalInput = setupPipe(simpleEvalOutput);
 	}
 	
-	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{};
+	public class UserComparator implements Comparator<Tuple> {
+		Comparator<Tuple> nested;
+		
+		UserComparator(Comparator<Tuple> nested) {
+			this.nested = nested;
+		}
+    	public int compare(Tuple t1, Tuple t2) {
+    		Datum d1 = simpleEval(t1);
+    		Datum d2 = simpleEval(t2);
+    		if (d1 instanceof Tuple) {
+    			return nested.compare((Tuple)d1, (Tuple)d2);
+    		} else {
+    			return nested.compare(new Tuple(d1), new Tuple(d2));
+    		}
+        }
+	}
+	
+	public void instantiateFunc(FunctionInstantiator instantiaor) throws IOException{
+		if (comparatorFuncName != null) {
+			Comparator<Tuple> userComparator = 
+				(ComparisonFunc)instantiaor.instantiateFuncFromAlias(comparatorFuncName);
+			comparator = new UserComparator(userComparator);
+		} else {
+			comparator = new Comparator<Tuple>() {
+		    	public int compare(Tuple t1, Tuple t2) {
+					return simpleEval(t1).compareTo(simpleEval(t2));
+		        }
+		    };
+		}
+	};
 	
     /**
      * set up a default data processing pipe for processing by this spec
@@ -145,17 +177,30 @@
     public abstract boolean amenableToCombiner();
 
     
+    public void setComparatorName(String name) {
+        comparatorFuncName = name;
+    }
+    
+    public String getComparatorName() {
+        return comparatorFuncName;
+    }
+    
     /**
      * Compare 2 tuples according to this spec. This is used while sorting by arbitrary (even
generated) fields.
      * @return
      */
     public Comparator<Tuple> getComparator() {
-        return new Comparator<Tuple>() {
-        	
-        	public int compare(Tuple t1, Tuple t2) {
-    			return (simpleEval(t1).compareTo(simpleEval(t2)));
-            }
-        };
+		if (comparator != null)
+            return comparator;
+		else
+        {
+            comparator = new Comparator<Tuple>() {
+                public int compare(Tuple t1, Tuple t2) {
+                    return simpleEval(t1).compareTo(simpleEval(t2));
+                }
+            };
+            return comparator;
+        }
     }
     
     public void setFlatten(boolean isFlattened){

Modified: incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Tue Nov
27 16:17:31 2007
@@ -515,7 +515,7 @@
 }
 
 
-LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec
projSpec;}
+LogicalOperator OrderClause() : {LogicalOperator op; EvalSpec sortSpec = null; ProjectSpec
projSpec; String funcName;}
 {
 	(
 	op = NestedExpr() <BY> 
@@ -530,6 +530,17 @@
 		)
 	|	(sortSpec = Star() {sortSpec = new GenerateSpec(sortSpec);})
 	)
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
+
 	)
 	{
 		return new LOSort(op, sortSpec);
@@ -607,13 +618,23 @@
 }
 
 EvalSpec NestedSortOrArrange(Schema over, Map<String, EvalSpec> specs):
-{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;}
+{EvalSpec sortSpec; ProjectSpec projSpec; EvalSpec item; Schema subSchema = null; Token t;
String funcName;}
 {
 	(
 	( t = <ORDER> | t = <ARRANGE> )
 	item = BaseEvalSpec(over,specs) { subSchema = item.getOutputSchemaForPipe(over); }
-	<BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;}) 
-		| sortSpec = Star() )
+	<BY> ( (projSpec = SimpleProj(subSchema) {sortSpec = projSpec;})
+		| sortSpec = Star() )     
+    (
+        <USING>  funcName = QualifiedFunction()
+        {
+            try {
+                sortSpec.setComparatorName(funcName);
+            } catch (Exception e){
+                throw new ParseException(e.getMessage());
+            }
+        }
+    )?
 	)
 	{ return copyItemAndAddSpec(item,new SortDistinctSpec(false, sortSpec)); }
 }
@@ -932,6 +953,7 @@
 		return funcName;
 	}	
 }
+
 
 /**
  * Bug 831620 - '$' support

Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/MapReduceLauncher.java Tue Nov
27 16:17:31 2007
@@ -17,8 +17,6 @@
  */
 package org.apache.pig.impl.mapreduceExec;
 
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
@@ -31,7 +29,6 @@
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.IndexedTuple;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
 import org.apache.pig.impl.io.PigFile;
 import org.apache.pig.impl.physicalLayer.POMapreduce;
@@ -74,7 +71,7 @@
             return WritableComparator.compareBytes(b1, s1, l1, b2, s2, l2);
 	    }
     }
-        
+
     static Random rand = new Random();
 
     /**
@@ -161,6 +158,8 @@
             	// not used starting with 0.15 conf.setInputKeyClass(Text.class);
             	// not used starting with 0.15 conf.setInputValueClass(Tuple.class);
             	conf.setOutputKeyClass(Tuple.class);
+            	if (pom.userComparator != null)
+                	conf.setOutputKeyComparatorClass(pom.userComparator);
             	conf.setOutputValueClass(IndexedTuple.class);
             	conf.set("pig.inputs", ObjectSerializer.serialize(pom.inputFileSpecs));
             

Modified: incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/mapreduceExec/SortPartitioner.java Tue Nov
27 16:17:31 2007
@@ -24,6 +24,7 @@
 
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.Partitioner;
 import org.apache.pig.builtin.BinStorage;
@@ -33,13 +34,14 @@
 
 
 public class SortPartitioner implements Partitioner {
-	Tuple[] quantiles;
+	Tuple[] quantiles;
+	WritableComparator comparator;
 	
 	public int getPartition(WritableComparable key, Writable value,
 			int numPartitions) {
 		try{
 			Tuple keyTuple = (Tuple)key;
-			int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0));
+			int index = Arrays.binarySearch(quantiles, keyTuple.getTupleField(0), comparator);
 			if (index < 0)
 				index = -index-1;
 			return Math.min(index, numPartitions - 1);
@@ -48,7 +50,7 @@
 		}
 	}
 
-	public void configure(JobConf job) {
+	public void configure(JobConf job) {
 		String quantilesFile = job.get("pig.quantilesFile", "");
 		if (quantilesFile.length() == 0)
 			throw new RuntimeException("Sort paritioner used but no quantiles found");
@@ -71,6 +73,8 @@
 		}catch (IOException e){
 			throw new RuntimeException(e);
 		}
+
+		comparator = job.getOutputKeyComparator();
 	}
 
 }

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/MapreducePlanCompiler.java Tue
Nov 27 16:17:31 2007
@@ -19,9 +19,12 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 import java.util.Map;
 
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.RandomSampleLoader;
@@ -271,6 +274,17 @@
 		sortJob.addReduceSpec(new GenerateSpec(ps));
 	
     	sortJob.reduceParallelism = loSort.getRequestedParallelism();
+    	
+    	String comparatorFuncName = loSort.getSortSpec().getComparatorName();
+    	if (comparatorFuncName != null) {
+    		try {
+    			sortJob.userComparator = 
+    				(Class<WritableComparator>)Class.forName(comparatorFuncName);
+    		} catch (ClassNotFoundException e) {
+    			throw new RuntimeException("Unable to find user comparator " + comparatorFuncName,
e);
+    		}
+    	}
+
     	return sortJob;
     }
     

Modified: incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java?rev=598827&r1=598826&r2=598827&view=diff
==============================================================================
--- incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java (original)
+++ incubator/pig/trunk/src/org/apache/pig/impl/physicalLayer/POMapreduce.java Tue Nov 27
16:17:31 2007
@@ -19,7 +19,10 @@
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Comparator;
 
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.eval.EvalSpec;
@@ -40,12 +43,14 @@
     public ArrayList<FileSpec>           inputFileSpecs         = new ArrayList<FileSpec>();
     public FileSpec         outputFileSpec        = null;
     public Class           partitionFunction = null;
+    public Class<WritableComparator> userComparator = null;
     public String quantilesFile = null;
     public PigContext pigContext;
     
     public int                     mapParallelism       = -1;     // -1 means let hadoop
decide
     public int                     reduceParallelism    = -1;
 
+    
     static MapReduceLauncher mapReduceLauncher = new MapReduceLauncher();
 
     

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdAsc.java Tue Nov 27 16:17:31 2007
@@ -0,0 +1,13 @@
+package org.apache.pig.test;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdAsc extends ComparisonFunc {
+    // this is a simple example - more complex comparison will require
+    //   breakout of the individual values. I suggest you'll have
+    //   to convert "catch(IOException e) to RuntimeException('msg', e)"
+    public int compare(Tuple t1, Tuple t2) {
+        return t1.compareTo(t2);
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdDesc.java Tue Nov 27 16:17:31 2007
@@ -0,0 +1,13 @@
+package org.apache.pig.test;
+
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdDesc extends ComparisonFunc {
+    // this is a simple example - more complex comparison will require
+    //   breakout of the individual values. I suggest you'll have
+    //   to convert "catch(IOException e) to RuntimeException('msg', e)"
+    public int compare(Tuple t1, Tuple t2) {
+        return t2.compareTo(t1);
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/OrdDescNumeric.java Tue Nov 27 16:17:31 2007
@@ -0,0 +1,42 @@
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataAtom;
+import org.apache.pig.data.Datum;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.ComparisonFunc;
+
+public class OrdDescNumeric extends ComparisonFunc {
+    public int compare(Tuple t1, Tuple t2) {
+        try {
+            for (int i = 0; i < t1.arity(); i++) {
+                Datum d1 = t1.getField(i);
+                Datum d2 = t2.getField(i);
+                int comp;
+                if (d1 instanceof DataAtom) {
+                    comp = compare((DataAtom)d1, (DataAtom)d2);
+                } else {
+                    comp = compare((Tuple)d1, (Tuple)d2);
+                }
+                if (comp != 0) {
+                    return comp;
+                }
+            }
+            return 0;
+        } catch (IOException e) {
+            throw new RuntimeException("Error comparing keys in OrdDEscNumeric", e);
+        }
+    }
+    
+    private int compare(DataAtom a1, DataAtom a2) throws IOException {
+        double num1 = a1.numval();
+        double num2 = a2.numval();
+        if (num2 > num1) {
+            return 1;
+        } else if (num2 < num1) {
+            return -1;
+        }
+        return 0;
+    }
+}

Added: incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java
URL: http://svn.apache.org/viewvc/incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java?rev=598827&view=auto
==============================================================================
--- incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java (added)
+++ incubator/pig/trunk/test/org/apache/pig/test/TestOrderBy.java Tue Nov 27 16:17:31 2007
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.text.DecimalFormat;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.junit.Test;
+
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+
+public class TestOrderBy extends TestCase {
+    private String initString = "mapreduce";
+    private static final int DATALEN = 1024;
+    private String[][] DATA = new String[2][DATALEN];
+    
+    private PigServer pig;
+    private File tmpFile;
+
+    public TestOrderBy() throws Exception {
+        DecimalFormat myFormatter = new DecimalFormat("0000000");
+        for (int i = 0; i < DATALEN; i++) {
+            DATA[0][i] = myFormatter.format(i);
+            DATA[1][i] = myFormatter.format(DATALEN - i - 1);
+        }
+        pig = new PigServer(initString);
+    }
+    
+    protected void setUp() throws Exception {
+        tmpFile = File.createTempFile("test", "txt");
+        PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+        for(int i = 0; i < DATALEN; i++) {
+            ps.println("1\t" + DATA[1][i] + "\t" + DATA[0][i]);
+        }
+        ps.close();
+    }
+    
+    protected void tearDown() throws Exception {
+        tmpFile.delete();
+    }
+    
+    private void verify(String query, boolean descending) throws Exception {
+        pig.registerQuery(query);
+        Iterator<Tuple> it = pig.openIterator("myid");
+        int col = (descending ? 1 : 0);
+        for(int i = 0; i < DATALEN; i++) {
+            Tuple t = (Tuple)it.next();
+            int value = t.getAtomField(1).numval().intValue();
+//            System.out.println("" + i + "," + DATA[0][i] + "," + DATA[1][i] + "," + value);
+            assertEquals(Integer.parseInt(DATA[col][i]), value);
+        }
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Star_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY *;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col1_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $1;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col2_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $2;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col21_NoUsing() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile + "') BY $2, $1;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Star_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdAsc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdDesc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY * USING org.apache.pig.test.OrdDescNumeric;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col1_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdAsc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdDesc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $1 USING org.apache.pig.test.OrdDescNumeric;", true);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col2_Using() throws Exception {
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdAsc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdDesc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2 USING org.apache.pig.test.OrdDescNumeric;", false);
+    }
+
+    @Test
+    public void testTopLevelOrderBy_Col21_Using() throws Exception {
+        // col2/col1 ascending - 
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdAsc;", true);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdDesc;", false);
+        verify("myid = order (load 'file:" + tmpFile +
+            "') BY $2, $1 USING org.apache.pig.test.OrdDescNumeric;", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Star_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY *; generate flatten(D); };", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col1_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1; generate flatten(D); };", false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col2_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $2; generate flatten(D); };", true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col21_NoUsing() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $2, $1; generate flatten(D); };", true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Star_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdAsc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdDesc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY * USING " + 
+            "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };", true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col1_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1 USING " + 
+            "org.apache.pig.test.OrdAsc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+            "') by $0) { D = ORDER $1 BY $1 USING " + 
+            "org.apache.pig.test.OrdDesc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $1 USING " + 
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                true);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col2_Using() throws Exception {
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdAsc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdDesc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2 USING " +
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                false);
+    }
+
+    @Test
+    public void testNestedOrderBy_Col21_Using() throws Exception {
+        // col2/col1 ascending - 
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdAsc; generate flatten(D); };", true);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdDesc; generate flatten(D); };", false);
+        verify("myid = foreach (group (load 'file:" + tmpFile +
+                "') by $0) { D = ORDER $1 BY $2, $1 USING " +
+                "org.apache.pig.test.OrdDescNumeric; generate flatten(D); };",
+                false);
+    }
+
+}



Mime
View raw message