pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [15/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/mai...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBuiltin.java Fri Mar  4 18:17:39 2016
@@ -18,12 +18,15 @@
 package org.apache.pig.test;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
 import java.io.File;
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -37,15 +40,21 @@ import java.util.Properties;
 import java.util.Random;
 import java.util.Set;
 import java.util.StringTokenizer;
-import java.math.BigDecimal;
-import java.math.BigInteger;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import org.apache.pig.Accumulator;
 import org.apache.pig.Algebraic;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConstants;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.builtin.ARITY;
 import org.apache.pig.builtin.AddDuration;
 import org.apache.pig.builtin.BagSize;
@@ -135,6 +144,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestBuiltin {
+    private static final Log LOG = LogFactory.getLog(TestBuiltin.class);
     private static PigServer pigServer;
     private static Properties properties;
     private static MiniGenericCluster cluster;
@@ -142,6 +152,8 @@ public class TestBuiltin {
     private TupleFactory tupleFactory = TupleFactory.getInstance();
     private BagFactory bagFactory = DefaultBagFactory.getInstance();
 
+    private static Tuple NULL_INPUT_TUPLE;
+
     // some inputs
     private static Integer[] intInput = { 3, 1, 2, 4, 5, 7, null, 6, 8, 9, 10 };
     private static Long[] intAsLong = { 3L, 1L, 2L, 4L, 5L, 7L, null, 6L, 8L, 9L, 10L };
@@ -178,6 +190,10 @@ public class TestBuiltin {
     // a bag of inputs of that type
     private static HashMap<String, Tuple> inputMap = new HashMap<String, Tuple>();
 
+    // A mapping between a type name (example: "Integer") and tuples containing
+    // a bag of inputs of that type for accumulator functions
+    private static HashMap<String, Tuple[]> inputMapForAccumulate = new HashMap<String, Tuple[]>();
+
     // A mapping between name of Aggregate function and the input type of its
     // argument
     private static HashMap<String, String> allowedInput = new HashMap<String, String>();
@@ -217,6 +233,9 @@ public class TestBuiltin {
         // first set up EvalFuncMap and expectedMap
         setupEvalFuncMap();
 
+        NULL_INPUT_TUPLE = TupleFactory.getInstance().newTuple(1);
+        NULL_INPUT_TUPLE.set(0, null);
+
         expectedMap.put("SUM", new Double(55));
         expectedMap.put("DoubleSum", new Double(170.567391834593));
         expectedMap.put("IntSum", new Long(55));
@@ -258,8 +277,8 @@ public class TestBuiltin {
         // set up allowedInput
         for (String[] aggGroups : aggs) {
             int i = 0;
-            for (String agg: aggGroups) {                
-                allowedInput.put(agg, inputTypeAsString[i++]);    
+            for (String agg: aggGroups) {
+                allowedInput.put(agg, inputTypeAsString[i++]);
             }
         }
 
@@ -339,6 +358,19 @@ public class TestBuiltin {
         inputMap.put("String", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), stringInput));
         inputMap.put("DateTime", Util.loadNestTuple(TupleFactory.getInstance().newTuple(1), datetimeInput));
 
+        // set up input hash for accumulate
+        inputMapForAccumulate.put("Integer", Util.splitCreateBagOfTuples(intInput,3));
+        inputMapForAccumulate.put("IntegerAsLong", Util.splitCreateBagOfTuples(intAsLong,3));
+        inputMapForAccumulate.put("Long", Util.splitCreateBagOfTuples(longInput,3));
+        inputMapForAccumulate.put("Float", Util.splitCreateBagOfTuples(floatInput,3));
+        inputMapForAccumulate.put("FloatAsDouble", Util.splitCreateBagOfTuples(floatAsDouble,3));
+        inputMapForAccumulate.put("Double", Util.splitCreateBagOfTuples(doubleInput,3));
+        inputMapForAccumulate.put("BigDecimal", Util.splitCreateBagOfTuples(bigDecimalInput,3));
+        inputMapForAccumulate.put("BigInteger", Util.splitCreateBagOfTuples(bigIntegerInput,3));
+        inputMapForAccumulate.put("ByteArray", Util.splitCreateBagOfTuples(ByteArrayInput,3));
+        inputMapForAccumulate.put("ByteArrayAsDouble", Util.splitCreateBagOfTuples(baAsDouble,3));
+        inputMapForAccumulate.put("String", Util.splitCreateBagOfTuples(stringInput,3));
+        inputMapForAccumulate.put("DateTime", Util.splitCreateBagOfTuples(datetimeInput,3));
     }
 
     @BeforeClass
@@ -380,7 +412,7 @@ public class TestBuiltin {
         Tuple t3 = TupleFactory.getInstance().newTuple(2);
         t3.set(0, new DateTime("2007-03-05T03:05:03.000Z"));
         t3.set(1, "P1D");
-        
+
         assertEquals(func1.exec(t1), new DateTime("2009-01-07T01:07:02.000Z"));
         assertEquals(func1.exec(t2), new DateTime("2008-02-06T02:07:02.000Z"));
         assertEquals(func1.exec(t3), new DateTime("2007-03-06T03:05:03.000Z"));
@@ -403,12 +435,42 @@ public class TestBuiltin {
         DateTime dt2 = func2.exec(t2);
         assertEquals(dt2.compareTo(new DateTime("2009-01-07T01:07:01.000Z")), 0);
 
+        Tuple t2space = TupleFactory.getInstance().newTuple(1);
+        t2space.set(0, "2009-01-07 01:07:01.000Z");
+        DateTime dt2space = func2.exec(t2space);
+        assertEquals(dt2space.compareTo(new DateTime("2009-01-07T01:07:01.000Z")), 0);
+
+        Tuple t2dateOnly = TupleFactory.getInstance().newTuple(1);
+        t2dateOnly.set(0, "2015-05-29");
+        DateTime dt2dateOnly = func2.exec(t2dateOnly);
+        assertEquals(dt2dateOnly.compareTo(new DateTime("2015-05-29")), 0);
+
+        Tuple t2dateSpaceHour = TupleFactory.getInstance().newTuple(1);
+        t2dateSpaceHour.set(0, "2015-05-29 11");
+        DateTime dt2dateSpaceHour = func2.exec(t2dateSpaceHour);
+        assertEquals(dt2dateSpaceHour.compareTo(new DateTime("2015-05-29T11")), 0);
+
+        Tuple t2dateSpaceHourMin = TupleFactory.getInstance().newTuple(1);
+        t2dateSpaceHourMin.set(0, "2015-05-29 11:38");
+        DateTime dt2dateSpaceHourMin = func2.exec(t2dateSpaceHourMin);
+        assertEquals(dt2dateSpaceHourMin.compareTo(new DateTime("2015-05-29T11:38")), 0);
+
+        Tuple t2dateSpaceHourMinSec = TupleFactory.getInstance().newTuple(1);
+        t2dateSpaceHourMinSec.set(0, "2015-05-29 11:38:39");
+        DateTime dt2dateSpaceHourMinSec = func2.exec(t2dateSpaceHourMinSec);
+        assertEquals(dt2dateSpaceHourMinSec.compareTo(new DateTime("2015-05-29T11:38:39")), 0);
+
         Tuple t3 = TupleFactory.getInstance().newTuple(1);
         t3.set(0, "2009-01-07T01:07:01.000+08:00");
         DateTime dt3 = func2.exec(t3);
         assertEquals(dt3.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00", DateTimeZone.forID("+08:00"))), 0);
 
-        ToDate2ARGS func3 = new ToDate2ARGS();        
+        Tuple t3space = TupleFactory.getInstance().newTuple(1);
+        t3space.set(0, "2009-01-07 01:07:01.000+08:00");
+        DateTime dt3space = func2.exec(t3space);
+        assertEquals(dt3space.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00", DateTimeZone.forID("+08:00"))), 0);
+
+        ToDate2ARGS func3 = new ToDate2ARGS();
         Tuple t4 = TupleFactory.getInstance().newTuple(2);
         t4.set(0, "2009.01.07 AD at 01:07:01");
         t4.set(1, "yyyy.MM.dd G 'at' HH:mm:ss");
@@ -420,8 +482,8 @@ public class TestBuiltin {
         t5.set(1, "yyyy.MM.dd G 'at' HH:mm:ss Z");
         DateTime dt5 = func3.exec(t5);
         assertEquals(dt5.compareTo(new DateTime("2009-01-07T01:07:01.000+08:00")), 0);
-        
-        ToDate3ARGS func4 = new ToDate3ARGS();        
+
+        ToDate3ARGS func4 = new ToDate3ARGS();
         Tuple t6 = TupleFactory.getInstance().newTuple(3);
         t6.set(0, "2009.01.07 AD at 01:07:01");
         t6.set(1, "yyyy.MM.dd G 'at' HH:mm:ss");
@@ -465,13 +527,13 @@ public class TestBuiltin {
         t12.set(1, "yyyy.MM.dd G 'at' HH:mm:ss Z");
         String dtStr4 = func6.exec(t12);
         assertEquals(dtStr4, "2009.01.07 AD at 01:07:01 +0800");
-        
+
         ToMilliSeconds func7 = new ToMilliSeconds();
         Tuple t13 = TupleFactory.getInstance().newTuple(1);
         t13.set(0, new DateTime(1231290421000L));
         Long ut2 = func7.exec(t11);
         assertEquals(ut2.longValue(), 1231290421000L);
-        
+
         // Null handling
         t1.set(0, null);
         assertEquals(func1.exec(t1), null);
@@ -907,6 +969,9 @@ public class TestBuiltin {
             } else {
                 assertEquals(msg, (Double)output, (Double)getExpected(avgTypes[k]), 0.00001);
             }
+
+            // Check null input
+            assertNull(avg.exec(NULL_INPUT_TUPLE));
         }
     }
 
@@ -1375,6 +1440,9 @@ public class TestBuiltin {
             else {
                 assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001);
             }
+
+            // Check null input
+            assertNull(sum.exec(NULL_INPUT_TUPLE));
         }
     }
 
@@ -1421,7 +1489,7 @@ public class TestBuiltin {
             else if (inputType == "BigDecimal")
                 assertEquals(msg, ((BigDecimal) output).toPlainString(), ((BigDecimal)getExpected(sumTypes[k])).toPlainString());
             else if (inputType == "BigInteger")
-                assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString()); 
+                assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger)getExpected(sumTypes[k])).toString());
             else {
               assertEquals(msg, (Double)output, (Double)getExpected(sumTypes[k]), 0.00001);
             }
@@ -1439,28 +1507,10 @@ public class TestBuiltin {
 
             String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " +
                            output + " == " + getExpected(minTypes[k]) + " (expected) )]";
+            assertForInputType(inputType, msg, getExpected(minTypes[k]), output);
 
-            if (inputType == "ByteArray") {
-              assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "String") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal) output).toPlainString(),  ((BigDecimal) getExpected(minTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger) output).toString(), ((BigInteger) getExpected(minTypes[k])).toString());
-
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis());
-            }
+            // Check null input
+            assertNull(min.exec(NULL_INPUT_TUPLE));
         }
     }
 
@@ -1469,7 +1519,7 @@ public class TestBuiltin {
     public void testMINIntermediate() throws Exception {
 
         String[] minTypes = {"MINIntermediate", "LongMinIntermediate", "IntMinIntermediate", "FloatMinIntermediate",
-                             "BigDecimalMinIntermediate", "BigIntegerMinIntermediate", 
+                             "BigDecimalMinIntermediate", "BigIntegerMinIntermediate",
                              "StringMinIntermediate", "DateTimeMinIntermediate"};
         for (int k = 0; k < minTypes.length; k++) {
             EvalFunc<?> min = evalFuncMap.get(minTypes[k]);
@@ -1479,28 +1529,7 @@ public class TestBuiltin {
 
             String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " +
                            ((Tuple)output).get(0) + " == " + getExpected(minTypes[k]) + " (expected) )]";
-
-            if (inputType == "ByteArray") {
-              assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal)((Tuple)output).get(0)).toPlainString(), ((BigDecimal)getExpected(minTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger)((Tuple)output).get(0)).toString(), ((BigInteger)getExpected(minTypes[k])).toString());
-                System.out.println("xxx: here");
-            } else if (inputType == "String") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(minTypes[k]));
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)((Tuple)output).get(0)).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis());
-            }
+            assertForInputType(inputType, msg, getExpected(minTypes[k]), ((Tuple)output).get(0));
         }
     }
 
@@ -1515,27 +1544,23 @@ public class TestBuiltin {
 
             String msg = "[Testing " + minTypes[k] + " on input type: " + getInputType(minTypes[k]) + " ( (output) " +
                            output + " == " + getExpected(minTypes[k]) + " (expected) )]";
+            assertForInputType(inputType, msg, getExpected(minTypes[k]), output);
+        }
+    }
 
-            if (inputType == "ByteArray") {
-              assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(minTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(minTypes[k])).toString());
-            } else if (inputType == "String") {
-                assertEquals(msg, output, getExpected(minTypes[k]));
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(minTypes[k])).getMillis());
-            }
+    @Test
+    public void testMINAccumulate() throws Exception {
+        String[] minTypes = {"MIN", "LongMin", "IntMin", "FloatMin","BigDecimalMin","BigIntegerMin", "StringMin", "DateTimeMin"};
+        for (int k = 0; k < minTypes.length; k++) {
+            Accumulator<?> min = (Accumulator<?>)evalFuncMap.get(minTypes[k]);
+            String inputType = getInputType(minTypes[k]);
+            Tuple[] tuples = inputMapForAccumulate.get(inputType);
+            for (Tuple tup : tuples)
+            	min.accumulate(tup);
+            Object output = min.getValue();
+            String msg = "[Testing " + minTypes[k] + " accumulate on input type: " + getInputType(minTypes[k]) + " ( (output) " +
+                           output + " == " + getExpected(minTypes[k]) + " (expected) )]";
+            assertForInputType(inputType, msg, getExpected(minTypes[k]), output);
         }
     }
 
@@ -1551,31 +1576,13 @@ public class TestBuiltin {
 
             String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " +
                            output + " == " + getExpected(maxTypes[k]) + " (expected) )]";
+            assertForInputType(inputType, msg, getExpected(maxTypes[k]), output);
 
-            if (inputType == "ByteArray") {
-              assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(maxTypes[k])).toString());
-            } else if (inputType == "String") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis());
-            }
+            // Check null input
+            assertNull(max.exec(NULL_INPUT_TUPLE));
         }
     }
 
-
     @Test
     public void testMAXIntermed() throws Exception {
 
@@ -1590,27 +1597,7 @@ public class TestBuiltin {
 
             String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " +
                            ((Tuple)output).get(0) + " == " + getExpected(maxTypes[k]) + " (expected) )]";
-
-            if (inputType == "ByteArray") {
-              assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal)((Tuple)output).get(0)).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger)((Tuple)output).get(0)).toString(), ((BigInteger)getExpected(maxTypes[k])).toString());
-            } else if (inputType == "String") {
-                assertEquals(msg, ((Tuple)output).get(0), getExpected(maxTypes[k]));
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)((Tuple)output).get(0)).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis());
-            }
+            assertForInputType(inputType, msg, getExpected(maxTypes[k]), ((Tuple)output).get(0));
         }
     }
 
@@ -1626,32 +1613,29 @@ public class TestBuiltin {
 
             String msg = "[Testing " + maxTypes[k] + " on input type: " + getInputType(maxTypes[k]) + " ( (output) " +
                            output + " == " + getExpected(maxTypes[k]) + " (expected) )]";
-
-            if (inputType == "ByteArray") {
-              assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Long") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Integer") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Double") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "Float") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "BigDecimal") {
-                assertEquals(msg, ((BigDecimal)output).toPlainString(), ((BigDecimal)getExpected(maxTypes[k])).toPlainString());
-            } else if (inputType == "BigInteger") {
-                assertEquals(msg, ((BigInteger)output).toString(), ((BigInteger)getExpected(maxTypes[k])).toString());
-            } else if (inputType == "String") {
-                assertEquals(msg, output, getExpected(maxTypes[k]));
-            } else if (inputType == "DateTime") {
-                // Compare millis so that we dont have to worry about TZ
-                assertEquals(msg, ((DateTime)output).getMillis(), ((DateTime)getExpected(maxTypes[k])).getMillis());
-            }
+            assertForInputType(inputType, msg, getExpected(maxTypes[k]), output);
         }
 
     }
 
     @Test
+    public void testMAXAccumulate() throws Exception {
+        String[] maxTypes = {"MAX", "LongMax", "IntMax", "FloatMax", "BigDecimalMax", "BigIntegerMax", "StringMax", "DateTimeMax"};
+        for (int k = 0; k < maxTypes.length; k++) {
+        	Accumulator<?> max = (Accumulator<?>)evalFuncMap.get(maxTypes[k]);
+            String inputType = getInputType(maxTypes[k]);
+            Tuple[] tuples = inputMapForAccumulate.get(inputType);
+            for (Tuple tup : tuples)
+            	max.accumulate(tup);
+            Object output = max.getValue();
+
+            String msg = "[Testing " + maxTypes[k] + " accumulate on input type: " + getInputType(maxTypes[k]) + " ( (output) " +
+                           output + " == " + getExpected(maxTypes[k]) + " (expected) )]";
+            assertForInputType(inputType, msg, getExpected(maxTypes[k]), output);
+        }
+    }
+
+    @Test
     public void testMathFuncs() throws Exception {
         Random generator = new Random();
         generator.setSeed(System.currentTimeMillis());
@@ -1939,7 +1923,7 @@ public class TestBuiltin {
         t3.set(0, null);
         t3.set(1, "^\\/search\\/iy\\/(.*?)\\/.*");
         t3.set(2, 2);
-        
+
         Tuple t4 = tupleFactory.newTuple(3);
         t4.set(0,"this is a match");
         t4.set(1, "this is a (.+?)");
@@ -2185,7 +2169,7 @@ public class TestBuiltin {
         }
         assertTrue("null in tobag result", s.contains(null));
     }
-        
+
     @Test
     public void testTOBAGSupportsTuplesInInput() throws IOException {
         String[][] expected = {
@@ -2401,7 +2385,7 @@ public class TestBuiltin {
         assertTrue(msg, res.equals(exp));
 
     }
-    
+
     /**
      * End-to-end testing of the CONCAT() builtin function for vararg parameters
      * @throws Exception
@@ -2412,17 +2396,17 @@ public class TestBuiltin {
         Util.createLocalInputFile(input, new String[]{"dummy"});
         PigServer pigServer = new PigServer(Util.getLocalTestMode());
         pigServer.registerQuery("A = LOAD '"+input+"' as (x:chararray);");
-        
+
         pigServer.registerQuery("B = foreach A generate CONCAT('a', CONCAT('b',CONCAT('c','d')));");
         Iterator<Tuple> its = pigServer.openIterator("B");
         Tuple t = its.next();
         assertEquals("abcd",t.get(0));
-        
+
         pigServer.registerQuery("B = foreach A generate CONCAT('a', 'b', 'c', 'd');");
         its = pigServer.openIterator("B");
         t = its.next();
         assertEquals("abcd",t.get(0));
-        
+
         pigServer.registerQuery("B = foreach A generate CONCAT('a', CONCAT('b','c'), 'd');");
         its = pigServer.openIterator("B");
         t = its.next();
@@ -2787,11 +2771,11 @@ public class TestBuiltin {
         assertTrue(rt.get(0).equals("456"));
         rt = i.next();
         assertTrue(rt.get(0).equals("789"));
-        
+
         // Check when delim specified
         Tuple t4 = tf.newTuple(2);
         t4.set(0, "123|456|78\"9");
-        t4.set(1, "|");        
+        t4.set(1, "|");
         b = f.exec(t4);
         assertTrue(b.size()==3);
         i = b.iterator();
@@ -2804,7 +2788,7 @@ public class TestBuiltin {
 
         b = f.exec(t2);
         assertTrue(b==null);
-        
+
         b = f.exec(t3);
         assertTrue(b==null);
     }
@@ -2846,7 +2830,7 @@ public class TestBuiltin {
         result = d.exec(t);
         assertEquals(2, result.size());
     }
-    
+
     //see PIG-2331
     @Test
     public void testURIwithCurlyBrace() throws Exception {
@@ -2887,6 +2871,29 @@ public class TestBuiltin {
         return expectedMap.get(expectedFor);
     }
 
+    private void assertForInputType(String inputType, String assertMsg, Object expected, Object actual) {
+        if (inputType == "ByteArray") {
+          assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "Long") {
+            assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "Integer") {
+            assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "Double") {
+            assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "Float") {
+            assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "BigDecimal") {
+            assertEquals(assertMsg, ((BigDecimal)expected).toPlainString(), ((BigDecimal)expected).toPlainString());
+        } else if (inputType == "BigInteger") {
+            assertEquals(assertMsg, ((BigInteger)expected).toString(), ((BigInteger)actual).toString());
+        } else if (inputType == "String") {
+            assertEquals(assertMsg, expected, actual);
+        } else if (inputType == "DateTime") {
+            // Compare millis so that we dont have to worry about TZ
+            assertEquals(assertMsg, ((DateTime)expected).getMillis(), ((DateTime)actual).getMillis());
+        }
+    }
+
     @Test
     public void testKeySet() throws Exception {
         Map<String, Object> m = new HashMap<String, Object>();
@@ -2933,6 +2940,41 @@ public class TestBuiltin {
         assertEquals(resultList.get(1), "hadoop");
     }
 
+    /**
+     * Tests that VALUESET preserves the schema when the map's value type is primitive.
+     */
+    @Test
+    public void testValueSetOutputSchemaPrimitiveType() throws FrontendException {
+        Schema inputSchema = new Schema();
+        Schema charArraySchema = new Schema(new FieldSchema(null, DataType.CHARARRAY));
+        FieldSchema mapSchema = new FieldSchema(null, charArraySchema, DataType.MAP);
+        inputSchema.add(mapSchema);
+
+        Schema tupleSchema = new Schema(new FieldSchema(null, charArraySchema, DataType.TUPLE));
+        Schema expectedSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG));
+
+        VALUESET vs = new VALUESET();
+        assertEquals(expectedSchema, vs.outputSchema(inputSchema));
+    }
+
+    /**
+     * Tests that VALUESET preserves the schema when the map's value type is complex.
+     */
+    @Test
+    public void testValueSetOutputSchemaComplexType() throws FrontendException {
+        Schema inputSchema = new Schema();
+        Schema tupleSchema = Schema.generateNestedSchema(DataType.TUPLE, DataType.CHARARRAY);
+        Schema bagSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG));
+        FieldSchema mapSchema = new FieldSchema(null, bagSchema, DataType.MAP);
+        inputSchema.add(mapSchema);
+
+        Schema tupleOfBagSchema = new Schema(new FieldSchema(null, bagSchema, DataType.TUPLE));
+        Schema expectedSchema = new Schema(new FieldSchema(null, tupleOfBagSchema, DataType.BAG));
+
+        VALUESET vs = new VALUESET();
+        assertEquals(expectedSchema, vs.outputSchema(inputSchema));
+    }
+
     @SuppressWarnings("unchecked")
     @Test
     public void testValueList() throws Exception {
@@ -2958,6 +3000,40 @@ public class TestBuiltin {
         assertEquals((String)resultList.get(2), "hadoop");
     }
 
+    /**
+     * Tests that VALUELIST preserves the schema when the map's value type is primitive.
+     */
+    @Test
+    public void testValueListOutputSchemaPrimitiveType() throws FrontendException {
+        Schema inputSchema = new Schema();
+        Schema charArraySchema = new Schema(new FieldSchema(null, DataType.CHARARRAY));
+        FieldSchema mapSchema = new FieldSchema(null, charArraySchema, DataType.MAP);
+        inputSchema.add(mapSchema);
+
+        Schema tupleSchema = new Schema(new FieldSchema(null, charArraySchema, DataType.TUPLE));
+        Schema expectedSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG));
+
+        VALUELIST vl = new VALUELIST();
+        assertEquals(expectedSchema, vl.outputSchema(inputSchema));
+    }
+
+    /**
+     * Tests that VALUELIST preserves the schema when the map's value type is complex.
+     */
+    @Test
+    public void testValueListOutputSchemaComplexType() throws FrontendException {
+        Schema inputSchema = new Schema();
+        Schema tupleSchema = Schema.generateNestedSchema(DataType.TUPLE, DataType.CHARARRAY);
+        Schema bagSchema = new Schema(new FieldSchema(null, tupleSchema, DataType.BAG));
+        FieldSchema mapSchema = new FieldSchema(null, bagSchema, DataType.MAP);
+        inputSchema.add(mapSchema);
+
+        Schema tupleOfBagSchema = new Schema(new FieldSchema(null, bagSchema, DataType.TUPLE));
+        Schema expectedSchema = new Schema(new FieldSchema(null, tupleOfBagSchema, DataType.BAG));
+
+        VALUELIST vl = new VALUELIST();
+        assertEquals(expectedSchema, vl.outputSchema(inputSchema));
+    }
 
     @SuppressWarnings("unchecked")
     @Test
@@ -3015,12 +3091,12 @@ public class TestBuiltin {
         Long years = func1.exec(t);
         System.out.println("Years: " + years.toString());
         Assert.assertEquals(years.longValue(), 7L);
-        
+
         MonthsBetween func2 = new MonthsBetween();
         Long months = func2.exec(t);
         System.out.println("Months: " + months.toString());
         Assert.assertEquals(months.longValue(),84L);
-        
+
         WeeksBetween func3 = new WeeksBetween();
         Long weeks = func3.exec(t);
         System.out.println("Weeks: " + weeks.toString());
@@ -3058,7 +3134,7 @@ public class TestBuiltin {
         t1.set(0, ToDate.extractDateTime("2010-04-15T08:11:33.020Z"));
         Tuple t2 = TupleFactory.getInstance().newTuple(1);
         t2.set(0, ToDate.extractDateTime("2010-04-15T08:11:33.020+08:00"));
-        
+
         GetYear func1 = new GetYear();
         Integer year = func1.exec(t1);
         assertEquals(year.intValue(), 2010);
@@ -3070,31 +3146,31 @@ public class TestBuiltin {
         assertEquals(month.intValue(), 4);
         month = func2.exec(t2);
         assertEquals(month.intValue(), 4);
-        
+
         GetDay func3 = new GetDay();
         Integer day = func3.exec(t1);
         assertEquals(day.intValue(), 15);
         day = func3.exec(t2);
         assertEquals(day.intValue(), 15);
-        
+
         GetHour func4 = new GetHour();
         Integer hour = func4.exec(t1);
         assertEquals(hour.intValue(), 8);
         hour = func4.exec(t2);
         assertEquals(hour.intValue(), 8);
-        
+
         GetMinute func5 = new GetMinute();
         Integer minute = func5.exec(t1);
         assertEquals(minute.intValue(), 11);
         minute = func5.exec(t2);
         assertEquals(minute.intValue(), 11);
-        
+
         GetSecond func6 = new GetSecond();
         Integer second = func6.exec(t1);
         assertEquals(second.intValue(), 33);
         second = func6.exec(t2);
         assertEquals(second.intValue(), 33);
-        
+
         GetMilliSecond func7 = new GetMilliSecond();
         Integer milli = func7.exec(t1);
         assertEquals(milli.intValue(), 20);
@@ -3106,13 +3182,13 @@ public class TestBuiltin {
         assertEquals(weekyear.intValue(), 2010);
         weekyear = func8.exec(t2);
         assertEquals(weekyear.intValue(), 2010);
-        
+
         GetWeek func9 = new GetWeek();
         Integer week = func9.exec(t1);
         assertEquals(week.intValue(), 15);
         week = func9.exec(t2);
         assertEquals(week.intValue(), 15);
-        
+
         // Null handling
         t1.set(0, null);
         assertEquals(func1.exec(t1), null);
@@ -3124,7 +3200,7 @@ public class TestBuiltin {
         assertEquals(func7.exec(t1), null);
         assertEquals(func8.exec(t1), null);
         assertEquals(func9.exec(t1), null);
-        
+
     }
 
     @Test
@@ -3139,16 +3215,134 @@ public class TestBuiltin {
         pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
         pigServer.registerQuery("B = foreach A generate name, UniqueID();");
         Iterator<Tuple> iter = pigServer.openIterator("B");
-        iter.next().get(1).equals("0-0");
-        iter.next().get(1).equals("0-1");
-        iter.next().get(1).equals("0-2");
-        iter.next().get(1).equals("0-3");
-        iter.next().get(1).equals("0-4");
-        iter.next().get(1).equals("1-0");
-        iter.next().get(1).equals("1-1");
-        iter.next().get(1).equals("1-1");
-        iter.next().get(1).equals("1-2");
-        iter.next().get(1).equals("1-3");
-        iter.next().get(1).equals("1-4");
+        assertEquals(iter.next().get(1),"0-0");
+        assertEquals(iter.next().get(1),"0-1");
+        assertEquals(iter.next().get(1),"0-2");
+        assertEquals(iter.next().get(1),"0-3");
+        assertEquals(iter.next().get(1),"0-4");
+        assertEquals(iter.next().get(1),"1-0");
+        assertEquals(iter.next().get(1),"1-1");
+        assertEquals(iter.next().get(1),"1-2");
+        assertEquals(iter.next().get(1),"1-3");
+        assertEquals(iter.next().get(1),"1-4");
+    }
+
+    @Test
+    public void testRANDOMWithJob() throws Exception {
+        Util.resetStateForExecModeSwitch();
+        String inputFileName = "testRANDOM.txt";
+        Util.createInputFile(cluster, inputFileName, new String[]
+            {"1\n2\n3\n4\n5\n1\n2\n3\n4\n5\n"});
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+        // running with two mappers
+        pigServer.getPigContext().getProperties().setProperty("mapred.max.split.size", "10");
+        pigServer.getPigContext().getProperties().setProperty("pig.noSplitCombination", "true");
+        pigServer.registerQuery("A = load '" + inputFileName + "' as (name);");
+        pigServer.registerQuery("B = foreach A generate name, RANDOM();");
+        Iterator<Tuple> iter = pigServer.openIterator("B");
+        double [] mapper1 = new double[5];
+        double [] mapper2 = new double[5];
+        for( int i = 0; i < 5; i++ ){
+            mapper1[i] = (Double) iter.next().get(1);
+            if( i != 0 ) {
+                // making sure it's not creating same value
+                assertNotEquals(mapper1[i-1], mapper1[i], 0.0001);
+            }
+        }
+        for( int i = 0; i < 5; i++ ){
+            mapper2[i] = (Double) iter.next().get(1);
+            if( i != 0 ) {
+                // making sure it's not creating same value
+                assertNotEquals(mapper2[i-1], mapper2[i], 0.0001);
+            }
+        }
+        // making sure different mappers are creating different random values
+        for( int i = 0; i < 5; i++ ){
+            assertNotEquals(mapper1[i], mapper2[i], 0.0001);
+        }
+    }
+
+
+    @Test
+    public void testRANDOM() throws Exception {
+        Configuration conf = new Configuration();
+        PigMapReduce.sJobConfInternal.set(conf);
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
+
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+        org.apache.pig.builtin.RANDOM r = new org.apache.pig.builtin.RANDOM();
+        double [] tmpresult = new double [5];
+
+        for( int i = 0; i < 5 ; i++ ) {
+            tmpresult[i] = r.exec(null).doubleValue();
+            LOG.info("Return value of RANDOM(): " + tmpresult[i]);
+            if( i != 0 ) {
+                //making sure RANDOM isn't returning some fixed number
+                assertNotEquals(tmpresult[i-1], tmpresult[i], 0.0001);
+            }
+        }
+
+        // with different task id, random should return different number
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "1");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+        }
+
+        // with different jobid, random should return completely different number
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_112");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001);
+        }
+
+        // with same jobid and taskid, random should return exact same sequence
+        // of pseudo-random number
+        org.apache.pig.builtin.RANDOM.resetSeedUniquifier();
+        PigMapReduce.sJobConfInternal.get().set(MRConfiguration.JOB_ID,"job_1111_111");
+        PigMapReduce.sJobConfInternal.get().set(PigConstants.TASK_INDEX, "0");
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+        }
+
+        // When initialized again, they should return a different random values
+        // even when jobid and taskid match.
+        // To cover the case when RANDOM is called more than once in the user's
+        // script.
+        // B = FOREACH A generate RANDOM(), RANDOM();
+        r = new org.apache.pig.builtin.RANDOM();
+        for( int i = 0; i < 5 ; i++ ) {
+            assertNotEquals(tmpresult[i], r.exec(null).doubleValue(), 0.0001 );
+        }
+    }
+
+    @Test
+    public void testToMapSchema() throws Exception {
+        PigServer pigServer = new PigServer(Util.getLocalTestMode(), new Properties());
+        pigServer.registerQuery("A = load '1.txt' as (a0:chararray, a1:int, a2:double, a3);");
+        pigServer.registerQuery("B = foreach A generate [a0,a1];");
+        Schema s = pigServer.dumpSchema("B");
+        Assert.assertEquals(s.toString(), "{map[int]}");
+        pigServer.registerQuery("B = foreach A generate [a0,a1,a0,a2];");
+        s = pigServer.dumpSchema("B");
+        Assert.assertEquals(s.toString(), "{map[]}");
+        pigServer.registerQuery("B = foreach A generate [a0,a3];");
+        s = pigServer.dumpSchema("B");
+        Assert.assertEquals(s.toString(), "{map[]}");
+        pigServer.registerQuery("A = load '1.txt' as (a:{(a0:chararray, a1:int)});");
+        pigServer.registerQuery("B = foreach A generate TOMAP(a);");
+        s = pigServer.dumpSchema("B");
+        Assert.assertEquals(s.toString(), "{map[int]}");
+        pigServer.registerQuery("A = load '1.txt' as (a:{(a0, a1, a2, a3:int)});");
+        pigServer.registerQuery("B = foreach A generate TOMAP(a);");
+        s = pigServer.dumpSchema("B");
+        Assert.assertEquals(s.toString(), "{map[]}");
+
     }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCubeOperator.java Fri Mar  4 18:17:39 2016
@@ -143,34 +143,6 @@ public class TestCubeOperator {
     }
 
     @Test
-    public void testRollupHIIBasic() throws IOException {
-        // basic correctness test
-        String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);"
-                + "b = cube a by rollup(x,y) pivot 1;"
-                + "c = foreach b generate flatten(group) as (type,location), COUNT_STAR(cube) as count, SUM(cube.z) as total;"
-                + "store c into 'output' using mock.Storage();";
-        Util.registerMultiLineQuery(pigServer, query);
-
-        Set<Tuple> expected = ImmutableSet.of(
-                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)),
-                tf.newTuple(Lists.newArrayList("cat", "naples", (long) 1, (long) 9)),
-                tf.newTuple(Lists.newArrayList("cat", null, (long) 2, (long) 27)),
-                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)),
-                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)),
-                tf.newTuple(Lists.newArrayList("dog", "naples", (long) 1, (long) 5)),
-                tf.newTuple(Lists.newArrayList("dog", null, (long) 3, (long) 31)),
-                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)),
-                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)),
-                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)),
-                tf.newTuple(Lists.newArrayList(null, null, (long) 7, (long) 63)));
-
-        List<Tuple> out = data.get("output");
-        for (Tuple tup : out) {
-            assertTrue(expected + " contains " + tup, expected.contains(tup));
-        }
-    }
-
-    @Test
     public void testCubeAndRollup() throws IOException {
         // basic correctness test
         String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);"
@@ -203,38 +175,6 @@ public class TestCubeOperator {
     }
 
     @Test
-    public void testCubeAndRollupHII() throws IOException {
-        // basic correctness test
-        String query = "a = load 'input2' USING mock.Storage() as (v:chararray,w:chararray,x:chararray,y:chararray,z:long);"
-                + "b = cube a by cube(v,w), rollup(x,y) pivot 1;"
-                + "c = foreach b generate flatten(group) as (type,location,color,category), COUNT_STAR(cube) as count, SUM(cube.z) as total;"
-                + "store c into 'output' using mock.Storage();";
-        Util.registerMultiLineQuery(pigServer, query);
-
-        Set<Tuple> expected = ImmutableSet
-                .of(tf.newTuple(Lists.newArrayList("dog", "miami", "white", "pet", (long) 1,
-                        (long) 5)), tf.newTuple(Lists.newArrayList("dog", null, "white", "pet",
-                        (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null, "miami",
-                        "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(null,
-                        null, "white", "pet", (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(
-                        "dog", "miami", "white", null, (long) 1, (long) 5)), tf.newTuple(Lists
-                        .newArrayList("dog", null, "white", null, (long) 1, (long) 5)), tf
-                        .newTuple(Lists.newArrayList(null, "miami", "white", null, (long) 1,
-                                (long) 5)), tf.newTuple(Lists.newArrayList(null, null, "white",
-                        null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog", "miami",
-                        null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList("dog",
-                        null, null, null, (long) 1, (long) 5)), tf.newTuple(Lists.newArrayList(
-                        null, "miami", null, null, (long) 1, (long) 5)), tf.newTuple(Lists
-                        .newArrayList(null, null, null, null, (long) 1, (long) 5)));
-
-        List<Tuple> out = data.get("output");
-        for (Tuple tup : out) {
-            assertTrue(expected + " contains " + tup, expected.contains(tup));
-        }
-
-    }
-
-    @Test
     public void testCubeMultipleIAliases() throws IOException {
         // test for input alias to cube being assigned multiple times
         String query = "a = load 'input' USING mock.Storage() as (x:chararray,y:chararray,z:long);"
@@ -636,36 +576,6 @@ public class TestCubeOperator {
     }
 
     @Test
-    public void testRollupHIIAfterCogroup() throws IOException {
-        // test for cubing on co-grouped relation
-        String query = "a = load 'input1' USING mock.Storage() as (a1:chararray,b1,c1,d1); "
-                + "b = load 'input' USING mock.Storage() as (a2,b2,c2:long,d2:chararray);"
-                + "c = cogroup a by a1, b by d2;"
-                + "d = foreach c generate flatten(a), flatten(b);"
-                + "e = cube d by rollup(a2,b2) pivot 1;"
-                + "f = foreach e generate flatten(group), COUNT(cube) as count, SUM(cube.c2) as total;"
-                + "store f into 'output' using mock.Storage();";
-
-        Util.registerMultiLineQuery(pigServer, query);
-
-        Set<Tuple> expected = ImmutableSet.of(
-                tf.newTuple(Lists.newArrayList("cat", "miami", (long) 1, (long) 18)),
-                tf.newTuple(Lists.newArrayList("cat", null, (long) 1, (long) 18)),
-                tf.newTuple(Lists.newArrayList("dog", "miami", (long) 1, (long) 12)),
-                tf.newTuple(Lists.newArrayList("dog", "tampa", (long) 1, (long) 14)),
-                tf.newTuple(Lists.newArrayList("dog", null, (long) 2, (long) 26)),
-                tf.newTuple(Lists.newArrayList("turtle", "tampa", (long) 1, (long) 4)),
-                tf.newTuple(Lists.newArrayList("turtle", "naples", (long) 1, (long) 1)),
-                tf.newTuple(Lists.newArrayList("turtle", null, (long) 2, (long) 5)),
-                tf.newTuple(Lists.newArrayList(null, null, (long) 5, (long) 49)));
-
-        List<Tuple> out = data.get("output");
-        for (Tuple tup : out) {
-            assertTrue(expected + " contains " + tup, expected.contains(tup));
-        }
-    }
-
-    @Test
     public void testExplainCube() throws IOException {
         // test for explain
         String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); "
@@ -686,19 +596,6 @@ public class TestCubeOperator {
 
         Util.registerMultiLineQuery(pigServer, query);
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(baos);
-        pigServer.explain("b", ps);
-        assertTrue(baos.toString().contains("RollupDimensions"));
-    }
-
-    @Test
-    public void testExplainRollupHII() throws IOException {
-        // test for explain
-        String query = "a = load 'input' USING mock.Storage() as (a1:chararray,b1:chararray,c1:long); "
-                + "b = cube a by rollup(a1,b1) pivot 1;";
-
-        Util.registerMultiLineQuery(pigServer, query);
-        ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
         pigServer.explain("b", ps);
         assertTrue(baos.toString().contains("RollupDimensions"));

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Mar  4 18:17:39 2016
@@ -250,6 +250,7 @@ public class TestEvalPipeline {
             return sb.toString();
         }
 
+        @Override
         public Schema outputSchema(Schema input) {
             try {
             Schema stringSchema = new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY));
@@ -265,6 +266,8 @@ public class TestEvalPipeline {
         @Override
         public Map<String, Object> exec(Tuple input) throws IOException {
 
+            super.reporter.progress();
+
             TupleFactory tupleFactory = TupleFactory.getInstance();
             ArrayList<Object> objList = new ArrayList<Object>();
             objList.add(new Integer(1));
@@ -294,6 +297,7 @@ public class TestEvalPipeline {
             return myMap;
         }
 
+        @Override
         public Schema outputSchema(Schema input) {
             return new Schema(new Schema.FieldSchema(null, DataType.MAP));
         }

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri Mar  4 18:17:39 2016
@@ -21,6 +21,8 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
+import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -58,7 +60,6 @@ import org.apache.pig.impl.util.ObjectSe
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -173,9 +174,9 @@ public class TestEvalPipeline2 {
         // if the conversion happens when minimum conditions for conversion
         // such as expected number of bytes are met.
         String[] input = {
-                    "asdf\t12\t1.1\t231\t234",
-                    "sa\t1231\t123.4\t12345678\t1234.567",
-                    "asdff\t1232123\t1.45345\t123456789\t123456789.9"
+                    "asdf\t12\t1.1\t231\t234\t3024123\t3.2492",
+                    "sa\t1231\t123.4\t12345678\t1234.567\t5081123453\t9.181817",
+                    "asdff\t1232123\t1.45345\t123456789\t123456789.9\t1234567\t1.234567"
                     };
 
         Util.createInputFile(cluster, "table_bs_ac", input);
@@ -187,7 +188,7 @@ public class TestEvalPipeline2 {
         pigServer.store("a", output, BinStorage.class.getName());
 
         pigServer.registerQuery("b = load '" + output + "' using BinStorage('Utf8StorageConverter') "
-                + "as (name: int, age: int, gpa: float, lage: long, dgpa: double);");
+                + "as (name: int, age: int, gpa: float, lage: long, dgpa: double, bi:biginteger, bd:bigdecimal);");
 
         Iterator<Tuple> it = pigServer.openIterator("b");
 
@@ -206,6 +207,8 @@ public class TestEvalPipeline2 {
         Assert.assertTrue((Float)tup.get(2) == 1.1F);
         Assert.assertTrue((Long)tup.get(3) == 231L);
         Assert.assertTrue((Double)tup.get(4) == 234.0);
+        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("3024123"));
+        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("3.2492"));
 
         //tuple 2
         tup = it.next();
@@ -214,6 +217,8 @@ public class TestEvalPipeline2 {
         Assert.assertTrue((Float)tup.get(2) == 123.4F);
         Assert.assertTrue((Long)tup.get(3) == 12345678L);
         Assert.assertTrue((Double)tup.get(4) == 1234.567);
+        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("5081123453"));
+        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("9.181817"));
 
         //tuple 3
         tup = it.next();
@@ -222,6 +227,8 @@ public class TestEvalPipeline2 {
         Assert.assertTrue((Float)tup.get(2) == 1.45345F);
         Assert.assertTrue((Long)tup.get(3) == 123456789L);
         Assert.assertTrue((Double)tup.get(4) == 1.234567899E8);
+        Assert.assertEquals((BigInteger)tup.get(5), new BigInteger("1234567"));
+        Assert.assertEquals((BigDecimal)tup.get(6), new BigDecimal("1.234567"));
 
         Util.deleteFile(cluster, "table");
     }
@@ -823,7 +830,7 @@ public class TestEvalPipeline2 {
             pigServer.openIterator("c");
         } catch (Exception e) {
             PigException pe = LogUtils.getPigException(e);
-            Util.checkStrContainsSubStr(pe.getMessage(), "Incompatable schema");
+            Util.checkStrContainsSubStr(pe.getMessage(), "Incompatible schema");
             return;
         }
         Assert.fail();
@@ -1428,7 +1435,6 @@ public class TestEvalPipeline2 {
     // See PIG-1826
     @Test
     public void testNonStandardData() throws Exception{
-        Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType()));
         String[] input1 = {
                 "0",
         };
@@ -1441,7 +1447,10 @@ public class TestEvalPipeline2 {
             pigServer.openIterator("b");
             Assert.fail();
         } catch (Exception e) {
-            String message = e.getCause().getCause().getMessage();
+            // Tez does not construct exceptions from stacktrace as it will have multiple ones.
+            // So e.getCause().getCause() will be null
+            Throwable cause = e.getCause().getCause() ==  null ? e.getCause() : e.getCause().getCause();
+            String message = cause.getMessage();
             Assert.assertTrue(message.contains(ArrayList.class.getName()));
         }
     }
@@ -1449,7 +1458,6 @@ public class TestEvalPipeline2 {
     // See PIG-1826
     @Test
     public void testNonStandardDataWithoutFetch() throws Exception{
-        Assume.assumeTrue("Skip this test for TEZ. See PIG-3994", Util.isMapredExecType(cluster.getExecType()));
         Properties props = pigServer.getPigContext().getProperties();
         props.setProperty(PigConfiguration.PIG_OPT_FETCH, "false");
         String[] input1 = {
@@ -1690,4 +1698,26 @@ public class TestEvalPipeline2 {
             pigServer.getPigContext().getProperties().remove("pig.exec.reducers.bytes.per.reducer");
         }
     }
+
+    // see PIG-4392
+    @Test
+    public void testRankWithEmptyReduce() throws Exception {
+        Util.createInputFile(cluster, "table_testRankWithEmptyReduce", new String[]{"1\t2\t3", "4\t5\t6", "7\t8\t9"});
+        pigServer.setDefaultParallel(4);
+
+        pigServer.registerQuery("d = load 'table_testRankWithEmptyReduce' as (a:int, b:int, c:int);");
+        pigServer.registerQuery("e = rank d by a parallel 4;");
+
+        Iterator<Tuple> iter = pigServer.openIterator("e");
+
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,1,2,3)");
+        results.add("(2,4,5,6)");
+        results.add("(3,7,8,9)");
+
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertTrue(results.contains(iter.next().toString()));
+        Assert.assertFalse(iter.hasNext());
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFRJoin.java Fri Mar  4 18:17:39 2016
@@ -46,9 +46,8 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.test.utils.TestHelper;
-import org.junit.After;
 import org.junit.AfterClass;
-import org.junit.Before;
+import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TestFRJoin {
@@ -61,8 +60,8 @@ public class TestFRJoin {
         pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
     }
 
-    @Before
-    public void setUp() throws Exception {
+    @BeforeClass
+    public static void oneTimeSetup() throws Exception {
         int LOOP_SIZE = 2;
         String[] input = new String[2 * LOOP_SIZE];
         int k = 0;
@@ -85,13 +84,9 @@ public class TestFRJoin {
 
     @AfterClass
     public static void oneTimeTearDown() throws Exception {
-        cluster.shutDown();
-    }
-
-    @After
-    public void tearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
         Util.deleteFile(cluster, INPUT_FILE2);
+        cluster.shutDown();
     }
 
     public static class FRJoin extends EvalFunc<DataBag> {
@@ -442,7 +437,7 @@ public class TestFRJoin {
         Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
         {
             pigServer.registerQuery("C = join A by $0 left, B by $0 using 'replicated';");
-            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'replicated';");
+            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
             pigServer.registerQuery("E = union C,D;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
@@ -475,14 +470,14 @@ public class TestFRJoin {
     public void testFRJoinOut9() throws IOException {
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:int);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (x:int,y:int);");
+        pigServer.registerQuery("C = UNION A, B;");
+        pigServer.registerQuery("D = FILTER C BY x == 1;");
         DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance()
                 .newDefaultBag();
         Map<String, Tuple> hashFRJoin = new HashMap<String, Tuple>();
         Map<String, Tuple> hashJoin = new HashMap<String, Tuple>();
         {
-            pigServer.registerQuery("C = join A by $0 left, B by $0 using 'repl';");
-            pigServer.registerQuery("D = join A by $1 left, B by $1 using 'repl';");
-            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("E = join C by $0 left, D by $0 using 'repl';");
             Iterator<Tuple> iter = pigServer.openIterator("E");
 
             while (iter.hasNext()) {
@@ -494,9 +489,7 @@ public class TestFRJoin {
             }
         }
         {
-            pigServer.registerQuery("C = join A by $0 left, B by $0;");
-            pigServer.registerQuery("D = join A by $1 left, B by $1;");
-            pigServer.registerQuery("E = union C,D;");
+            pigServer.registerQuery("E = join C by $0 left, D by $0;");
             Iterator<Tuple> iter = pigServer.openIterator("E");
             while (iter.hasNext()) {
                 Tuple tuple = iter.next();

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Mar  4 18:17:39 2016
@@ -1434,6 +1434,24 @@ public class TestGrunt {
     }
 
     @Test
+    public void testScriptWithSingleQuoteInsideCommentInGenerate() throws Throwable {
+        //the query has not store or dump, but in when -check is used
+        // all statements should be validated
+        String query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" +
+        "b = foreach a generate s1,\n" +
+        "--comment should be ignored even it has single quote ' in the line \n"  +
+        " s2;\n";
+        ArrayList<String> msgs = new ArrayList<String>();                //
+        validate(query, true, msgs.toArray(new String[0]));
+        query = "a = load 'foo.pig' as (s1:chararray, s2:chararray);\n" +
+        "b = foreach a generate s1,\n" +
+        "/* comment should be ignored even it has single quote ' in the line \n"  +
+        "*/ \n" +
+        " s2;\n";
+        validate(query, true, msgs.toArray(new String[0]));
+    }
+
+    @Test
     public void testDebugOn() throws Throwable {
 
         String strCmd = "set debug on\n";

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorage.java Fri Mar  4 18:17:39 2016
@@ -38,7 +38,6 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
@@ -51,7 +50,6 @@ import org.apache.pig.data.TupleFactory;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
-import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -837,7 +835,6 @@ public class TestHBaseStorage {
      */
     @Test
     public void testMergeJoin() throws IOException {
-        Assume.assumeTrue("Skip this test for TEZ. See PIG-4315", pig.getPigContext().getExecType().equals(ExecType.LOCAL));
         prepareTable(TESTTABLE_1, true, DataFormat.HBaseBinary);
         prepareTable(TESTTABLE_2, true, DataFormat.HBaseBinary);
         pig.registerQuery("a = load 'hbase://" + TESTTABLE_1 + "' using "
@@ -1080,7 +1077,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1127,7 +1124,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1174,7 +1171,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals(timestamp, col_a_ts);
             Assert.assertEquals(timestamp, col_b_ts);
             Assert.assertEquals(timestamp, col_c_ts);
@@ -1219,7 +1216,7 @@ public class TestHBaseStorage {
             long col_a_ts = getColTimestamp(result, TESTCOLUMN_A);
             long col_b_ts = getColTimestamp(result, TESTCOLUMN_B);
             long col_c_ts = getColTimestamp(result, TESTCOLUMN_C);
-            
+
             Assert.assertEquals("00".substring(v.length()) + v, rowKey);
             Assert.assertEquals(i, col_a);
             Assert.assertEquals(i + 0.0, col_b, 1e-6);
@@ -1274,13 +1271,27 @@ public class TestHBaseStorage {
      * @throws ParseException
      */
     @Test
-    public void testNoWAL() throws IOException, ParseException {
+    public void testNoWAL() throws Exception {
         HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A, "-noWAL");
 
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
-        Assert.assertFalse(hbaseStorage.createPut(key, type).getWriteToWAL());
-        Assert.assertFalse(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL());
+        Put put = hbaseStorage.createPut(key, type);
+        Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis());
+        boolean hasDurabilityMethod = false;
+        try {
+            put.getClass().getMethod("getDurability");
+            hasDurabilityMethod = true;
+        } catch (NoSuchMethodException e) {
+        }
+        if (hasDurabilityMethod) { // Hbase version 0.95+
+            Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put);
+            Assert.assertEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
+            Assert.assertEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
+        } else {
+            Assert.assertFalse(put.getWriteToWAL());
+            Assert.assertFalse(delete.getWriteToWAL());
+        }
     }
 
     /**
@@ -1289,13 +1300,27 @@ public class TestHBaseStorage {
      * @throws ParseException
      */
     @Test
-    public void testWIthWAL() throws IOException, ParseException {
+    public void testWIthWAL() throws Exception {
         HBaseStorage hbaseStorage = new HBaseStorage(TESTCOLUMN_A);
 
         Object key = "somekey";
         byte type = DataType.CHARARRAY;
-        Assert.assertTrue(hbaseStorage.createPut(key, type).getWriteToWAL());
-        Assert.assertTrue(hbaseStorage.createDelete(key, type, System.currentTimeMillis()).getWriteToWAL());
+        Put put = hbaseStorage.createPut(key, type);
+        Delete delete = hbaseStorage.createDelete(key, type, System.currentTimeMillis());
+        boolean hasDurabilityMethod = false;
+        try {
+            put.getClass().getMethod("getDurability");
+            hasDurabilityMethod = true;
+        } catch (NoSuchMethodException e) {
+        }
+        if (hasDurabilityMethod) { // Hbase version 0.95+
+            Object skipWal = Class.forName("org.apache.hadoop.hbase.client.Durability").getField("SKIP_WAL").get(put);
+            Assert.assertNotEquals(put.getClass().getMethod("getDurability").invoke(put), skipWal);
+            Assert.assertNotEquals(delete.getClass().getMethod("getDurability").invoke(delete), skipWal);
+        } else {
+            Assert.assertTrue(put.getWriteToWAL());
+            Assert.assertTrue(delete.getWriteToWAL());
+        }
     }
 
     /**
@@ -1551,7 +1576,7 @@ public class TestHBaseStorage {
      */
     private static long getColTimestamp(Result result, String colName) {
         byte[][] colArray = Bytes.toByteArrays(colName.split(":"));
-        return result.getColumnLatestCell(colArray[0], colArray[1]).getTimestamp();
+        return result.getColumnLatest(colArray[0], colArray[1]).getTimestamp();
     }
 
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestHBaseStorageParams.java Fri Mar  4 18:17:39 2016
@@ -19,6 +19,7 @@ package org.apache.pig.test;
 import org.apache.commons.cli.ParseException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Scan;
 import org.apache.pig.backend.hadoop.hbase.HBaseStorage;
 import org.apache.pig.impl.util.UDFContext;
 import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 import java.util.Properties;
 
 public class TestHBaseStorageParams {
@@ -77,6 +79,23 @@ public class TestHBaseStorageParams {
       doColumnParseTest(storage, "foo:a", "foo:b ", " foo:c,d");
     }
 
+    /**
+     * Assert that -maxResultsPerColumnFamily actually gets set on Scan
+     */
+    @Test
+    public void testSetsMaxResultsPerColumnFamily() throws Exception {
+        Field scanField = HBaseStorage.class.getDeclaredField("scan");
+        scanField.setAccessible(true);
+
+        HBaseStorage storageNoMax = new HBaseStorage("", "");
+        Scan scan = (Scan)scanField.get(storageNoMax);
+        Assert.assertEquals(-1, scan.getMaxResultsPerColumnFamily());
+
+        HBaseStorage storageWithMax = new HBaseStorage("", "-maxResultsPerColumnFamily 123");
+        scan = (Scan)scanField.get(storageWithMax);
+        Assert.assertEquals(123, scan.getMaxResultsPerColumnFamily());
+    }
+
     private void doColumnParseTest(HBaseStorage storage, String... names) {
       Assert.assertEquals("Wrong column count",
         names.length, storage.getColumnInfoList().size());

Modified: pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java Fri Mar  4 18:17:39 2016
@@ -251,7 +251,7 @@ abstract public class TestJobSubmission
         jc=jcc.compile(mrPlan, "Test");
         job = jc.getWaitingJobs().get(0);
 
-        Util.assertParallelValues(-1, -1, -1, 1, job.getJobConf());
+        Util.assertParallelValues(-1, -1, 1, 1, job.getJobConf());
 
         util.deleteTable(Bytes.toBytesBinary("test_table"));
         // In HBase 0.90.1 and above we can use util.shutdownMiniHBaseCluster()

Modified: pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLimitVariable.java Fri Mar  4 18:17:39 2016
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.Tuple;
@@ -64,20 +65,24 @@ public class TestLimitVariable {
 
     @Test
     public void testLimitVariable1() throws IOException {
+        pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_EXEC_MAP_PARTAGG, "" + true);
         String query =
-            "a = load '" + inputFile.getName() + "';" +
+            "a = load '" + inputFile.getName() + "' as (f1:int, f2:int);" +
             "b = group a all;" +
             "c = foreach b generate COUNT(a) as sum;" +
             "d = order a by $0 DESC;" +
-            "e = limit d c.sum/2;" // return top half of the tuples
+            "e = limit d c.sum/2;" + // return top half of the tuples
+            "f = group e all;" +
+            "g = foreach f generate AVG(e.$0), SUM(e.$1);"
             ;
 
         Util.registerMultiLineQuery(pigServer, query);
-        Iterator<Tuple> it = pigServer.openIterator("e");
+        Iterator<Tuple> it = pigServer.openIterator("g");
 
         List<Tuple> expectedRes = Util.getTuplesFromConstantTupleStrings(new String[] {
-                "(6,15)", "(5,10)", "(4,11)" });
+                "(5.0,36)"});
         Util.checkQueryOutputs(it, expectedRes);
+        pigServer.getPigContext().getProperties().remove(PigConfiguration.PIG_EXEC_MAP_PARTAGG);
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLoaderStorerShipCacheFiles.java Fri Mar  4 18:17:39 2016
@@ -50,10 +50,9 @@ public abstract class TestLoaderStorerSh
             hadoopVersion = "23";
         }
         String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure",
-                "kryo"};
+                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
 
-        checkPlan(pp, expectedJars, 7, pigServer.getPigContext());
+        checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }
 
     @Test
@@ -67,10 +66,9 @@ public abstract class TestLoaderStorerSh
             hadoopVersion = "23";
         }
         String[] expectedJars = new String[] {"hive-common", "hive-exec", "hive-serde", 
-                "hive-shims-0." + hadoopVersion, "hive-shims-common-0", "hive-shims-common-secure",
-                "kryo"};
+                "hive-shims-0." + hadoopVersion, "hive-shims-common", "kryo"};
 
-        checkPlan(pp, expectedJars, 7, pigServer.getPigContext());
+        checkPlan(pp, expectedJars, 6, pigServer.getPigContext());
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLogicalPlanBuilder.java Fri Mar  4 18:17:39 2016
@@ -1204,13 +1204,13 @@ public class TestLogicalPlanBuilder {
         try {
             buildPlan( query + "c = foreach b generate group as mygroup:{t: (myname, myage)}, COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Incompatable field schema"));
+            Assert.assertTrue(e.getMessage().contains("Incompatible field schema"));
         }
 
         try {
             buildPlan( query + "c = foreach b generate flatten(group) as (myname, myage, mygpa), COUNT(a) as mycount;");
         } catch (AssertionFailedError e) {
-            Assert.assertTrue(e.getMessage().contains("Incompatable schema"));
+            Assert.assertTrue(e.getMessage().contains("Incompatible schema"));
         }
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMRCompiler.java Fri Mar  4 18:17:39 2016
@@ -39,6 +39,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.LimitAdjuster;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
@@ -130,7 +131,8 @@ import org.junit.runner.RunWith;
     "testUDFInMergedCoGroup",
     "testUDFInMergedJoin",
     "testSchemaInStoreForDistinctLimit",
-    "testStorerLimit"})
+    "testStorerLimit",
+    "testFetchOptimizerSideEffect"})
 public class TestMRCompiler {
     static MiniCluster cluster;
 
@@ -1280,5 +1282,27 @@ public class TestMRCompiler {
         POStore store = (POStore)firstMrOper.reducePlan.getLeaves().get(0);
         assertEquals(store.getStoreFunc().getClass().getName(), "org.apache.pig.impl.io.InterStorage");
     }
+
+    // See PIG-4538
+    @Test
+    public void testFetchOptimizerSideEffect() throws Exception{
+        String query = "in1 = LOAD 'data.txt' AS (ident:chararray);" +
+            "in2 = LOAD 'data.txt' AS (ident:chararray);" +
+            "in3 = LOAD 'data.txt';" +
+            "joined = JOIN in1 BY ident LEFT OUTER, in2 BY ident;" +
+            "store joined into 'output';";
+        PhysicalPlan pp = Util.buildPp(pigServer, query);
+        MROperPlan mp = Util.buildMRPlan(pp, pc);
+        // isPlanFetchable should not bring side effect:
+        //   set parentPlan for operators
+        FetchOptimizer.isPlanFetchable(pc, pp);
+        MapReduceOper op = mp.getLeaves().get(0);
+        PhysicalOperator store = op.reducePlan.getLeaves().get(0);
+        POForEach foreach = (POForEach)op.reducePlan.getPredecessors(store).get(0);
+        PhysicalOperator project = foreach.getInputPlans().get(0).getRoots().get(0);
+        Field parentPlan = PhysicalOperator.class.getDeclaredField("parentPlan");
+        parentPlan.setAccessible(true);
+        assertTrue(parentPlan.get(project)==null);
+    }
 }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Fri Mar  4 18:17:39 2016
@@ -52,6 +52,7 @@ public class TestMergeJoin {
 
     private static final String INPUT_FILE = "testMergeJoinInput.txt";
     private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
+    private static final String INPUT_FILE3 = "testMergeJoinInput3.txt";
     private PigServer pigServer;
     private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
@@ -78,6 +79,19 @@ public class TestMergeJoin {
         Util.createInputFile(cluster, INPUT_FILE, input);
         
         Util.createInputFile(cluster, INPUT_FILE2, new String[]{"2"});
+
+        String[] input3 = new String[LOOP_SIZE];
+        for (int i = 0; i<= LOOP_SIZE-1; i++) {
+            input3[i] = "(" + (i + 1) + ")\t + {";
+            for(int j=1;j<=LOOP_SIZE;j++) {
+                input3[i] = input3[i] + "(" + j + ")";
+                if (j!=LOOP_SIZE) {
+                    input3[i] = input3[i] + ",";
+                }
+            }
+            input3[i] = input3[i] + "}";
+        }
+        Util.createInputFile(cluster, INPUT_FILE3, input3);
     }
 
     @AfterClass
@@ -91,6 +105,7 @@ public class TestMergeJoin {
     public void tearDown() throws Exception {
         Util.deleteFile(cluster, INPUT_FILE);
         Util.deleteFile(cluster, INPUT_FILE2);
+        Util.deleteFile(cluster, INPUT_FILE3);
     }
 
     @Test
@@ -146,6 +161,61 @@ public class TestMergeJoin {
     }
 
     @Test
+    public void testMergeJoinWithReplicatedJoin() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("C = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+            pigServer.registerQuery("E = join D by A::f1, C by f1 using 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join A by f1, B by f1 using 'replicated';");
+            pigServer.registerQuery("E = join D by A::f1, C by f1;");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
+    public void testMergeJoinWithForeachFlatten() throws IOException{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (f1:int,f2:int);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (t:(f1:int), b:{(f1:int)});");
+        pigServer.registerQuery("C = foreach B generate flatten(t) as f1:int, flatten(b);");
+        pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("D = join C by f1, A by f1 using 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("D = join C by f1, A by f1;");
+            Iterator<Tuple> iter = pigServer.openIterator("D");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbshj.size(), dbMergeJoin.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
     public void testMergeJoinOnMultiFields() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "';");
@@ -380,6 +450,33 @@ public class TestMergeJoin {
     }
 
     @Test
+    public void testMergeJoinWithUDF() throws Exception{
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (x:int,y:double);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (x:int,y:double);");
+        pigServer.registerQuery("A = FOREACH A GENERATE x, ABS(y) AS y;");
+
+        DataBag dbMergeJoin = BagFactory.getInstance().newDefaultBag(), dbshj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("C = JOIN A BY x, B BY x USING 'merge';");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbMergeJoin.add(iter.next());
+            }
+        }
+        {
+            pigServer.registerQuery("C = JOIN A BY x, B BY x;");
+            Iterator<Tuple> iter = pigServer.openIterator("C");
+
+            while(iter.hasNext()) {
+                dbshj.add(iter.next());
+            }
+        }
+        Assert.assertEquals(dbMergeJoin.size(), dbshj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbMergeJoin, dbshj));
+    }
+
+    @Test
     public void testMergeJoin3Way() throws IOException{
         try {
             pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
@@ -417,24 +514,6 @@ public class TestMergeJoin {
     }
 
     @Test
-    public void testMergeFailWithOrderUDF() throws Exception{
-        String query = "A = LOAD '" + INPUT_FILE + "' as (id, name, n);\n" +
-                "B = LOAD '" + INPUT_FILE + "' as (id, name);\n" +
-                "A = FOREACH A GENERATE LOWER($0) as id;\n" +
-                "C = ORDER B by $0 parallel 5;\n" +
-                "D = join A by id, C by id using 'merge';\n" +
-                "store D into '/dev/null/1';";
-        // verify that this fails parsing sanity checks.
-        try {
-            Util.buildPp(pigServer, query);
-        } catch (Throwable t) {
-            // expected to fail.
-            return;
-        }
-        Assert.fail("Allowed a Merge Join despite a UDF");
-    }       
-
-    @Test
     public void testMergeJoinFailure2() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name);");



Mime
View raw message