pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [14/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/mai...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/nightly.conf?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/nightly.conf Fri Mar  4 18:17:39 2016
@@ -689,7 +689,24 @@ store c into ':OUTPATH:';\,
                         store d into ':OUTPATH:'; #,
             'java_params' => ['-Dpig.exec.mapPartAgg=true']
             
-            },            
+            },
+            
+            {
+            #PIG-4707 Streaming and empty input
+
+            'num' => 6,
+            'pig' => q# a = load ':INPATH:/singlefile/studenttab10k' as (name: chararray, age: int, gpa: float); 
+                        b = group a by name;
+                        c = foreach b generate flatten(a);
+                        d = stream c through `cat` as (name, age, gpa);
+                        e = filter d by name == 'nonexistent';
+                        SPLIT e into f if gpa > 2, g otherwise;
+                        store f into ':OUTPATH:.1'; 
+                        store g into ':OUTPATH:.2'; 
+                        #,
+            'java_params' => ['-Dpig.exec.mapPartAgg=true']
+            
+            },         
         
             ],
         },
@@ -1380,7 +1397,8 @@ store g into ':OUTPATH:';\,
 		{
 		'name' => 'Union',
 		'tests' => [
-			{
+			{ 
+			# Simple store
 			'num' => 1,
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
@@ -1389,7 +1407,8 @@ d = foreach b generate name, age;
 e = union c, d;
 store e into ':OUTPATH:';\,
 			},
-			{
+			{ 
+			# Union + Groupby + Combiner
             'num' => 2,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1399,15 +1418,20 @@ e = foreach d generate group, SUM(c.age)
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Groupby + Secondary key partitioner
             'num' => 3,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
 c = union a, b;
 d = group c by name;
-e = foreach d { f = order c by $1,$2; generate group, f; };
-store e into ':OUTPATH:';\,
+d1 = group c by name; -- Two separate groupbys to ensure secondary key partitioner
+e = foreach d { f = order c by age, gpa ; g = limit f 1; generate g; };
+h = foreach d1 { i = order c by age asc, gpa desc; j = limit i 1; generate j; };
+store e into ':OUTPATH:.1';
+store h into ':OUTPATH:.2';\,
             },
             {
+            # Union + Orderby
             'num' => 4,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1417,6 +1441,7 @@ store d into ':OUTPATH:';\,
             'sortArgs' => ['-t', '	', '-k', '1,1'],
             },
             {
+            # Simple split + Union
             'num' => 5,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1427,6 +1452,7 @@ store a2 into ':OUTPATH:.1';
 store d into ':OUTPATH:.2';\,
             },
             {
+            # Union + Join
             'num' => 6,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1436,6 +1462,7 @@ e = join c by name, d by name PARALLEL 2
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Replicate Join left
             'num' => 7,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1445,6 +1472,7 @@ e = join c by name, d by name using 'rep
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Replicate Join right
             'num' => 8,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1454,6 +1482,7 @@ e = join d by name, c by name using 'rep
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Skewed Join left
             'num' => 9,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1463,6 +1492,7 @@ e = join c by name, d by name using 'ske
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Skewed Join right
             'num' => 10,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1486,6 +1516,7 @@ i = foreach i generate group, SUM(h.age)
 store i into ':OUTPATH:';\,
             },
             {
+            # Union + operators
             'num' => 12,
             'pig' => q\a = load ':INPATH:/singlefile/studentnulltab10k' as (name, age:int, gpa:double);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age:int, gpa:double);
@@ -1496,6 +1527,7 @@ e = filter d by (name matches '.*MIKE.*'
 store e into ':OUTPATH:';\,
             },
             {
+            # Union + Groupby + Replicate join
             'num' => 13,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
@@ -1503,22 +1535,24 @@ c = union a, b;
 d = group c by name;
 e = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
 f = join d by group, e by name using 'replicated';
-store f into ':OUTPATH:';\,
+g = foreach f generate group, flatten(c), name, age, registration, contributions;
+store g into ':OUTPATH:';\,
             },
-            {  ## Secondary Key
+            {  
+            # Group by with Secondary Key + Union
             'num' => 14,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age, gpa);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age, gpa);
 c = group a by name;
 d = foreach c {
-    sorted = order a by name,age;
+    sorted = order a by name,age,gpa;
     lmt = limit sorted 1;
     generate lmt as c1;
 };
 e = foreach d generate flatten(c1) as (name:chararray, age, gpa);
 f = group b by name;
 g = foreach f {
-    sorted = order b by name,age;
+    sorted = order b by name,age,gpa;
     lmt = limit sorted 1;
     generate lmt as f1;
 };
@@ -1529,6 +1563,7 @@ store j into ':OUTPATH:';\,
             'sortArgs' => ['-t', '	', '-k', '1,1'],
             },
             {
+            # Union + Cross
             'num' => 15,
             'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float);
 b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float);
@@ -1537,6 +1572,30 @@ d = cross a, c;
 e = union b, d;
 store e into ':OUTPATH:';\,
             },
+            { 
+            # Union + Distinct
+            'num' => 16,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa);
+c = union a, b;
+d = distinct c;
+store c into ':OUTPATH:';\,
+            },
+            {
+            # Union + Groupby + FILTER
+            'num' => 17,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float);
+c = group a by name;
+d = group b by name;
+e = union c, d;
+e = foreach e generate $0, $1 as groupbag;
+f = foreach e {
+     g = order $1 by age asc, gpa desc;  
+     h = filter g by (gpa == 0 ? true : false);
+     generate group, h; };
+store f into ':OUTPATH:';\,
+            }
 		]
 		},
 		{
@@ -2189,12 +2248,12 @@ store D into ':OUTPATH:';\,
  			{
 				'num' => 9,
 				'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k';
-b = order a by $0, $1;
+b = order a by $0, $1, $2;
 c = limit b 1000/10;
 store c into ':OUTPATH:';\,
 
 				'verify_pig_script' => q\a = load ':INPATH:/singlefile/studentnulltab10k';
-b = order a by $0, $1;
+b = order a by $0, $1, $2;
 c = limit b 100;
 store c into ':OUTPATH:';\,
 
@@ -3054,6 +3113,64 @@ e = join a by name full outer, b by name
 store e into ':OUTPATH:';\,
 
                         },
+                # right outer join with fixed memory
+                        {
+                        'num' => 11,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+e = join a by name right outer, b by name ;
+store e into ':OUTPATH:';\,
+
+                        },
+                # full outer join with empty left relation
+                        {
+                        'num' => 12,
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = filter a by name=='abc';
+e = join b by name right outer, a by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = foreach a generate (null, null, null, name, age, gpa);
+c = foreach b generate flatten($0);
+store c into ':OUTPATH:';\,
+
+                        },
+                        # left outer join with fixed memory
+                        {
+                        'num' => 13,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name < 'b';
+e = join a by name left outer, b by name ;
+store e into ':OUTPATH:';\,
+                        },
+                        # full outer join with fixed memory
+                        {
+                        'num' => 14,
+                        'java_params' => ['-Dpig.skewedjoin.reduce.maxtuple=100 -Dpig.skewedjoin.reduce.mem=516947966'],
+                        'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name using 'skewed' parallel 8;
+store e into ':OUTPATH:';\,
+                        'verify_pig_script' => q\a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
+b = filter b by name > 'm';
+e = join a by name full outer, b by name ;
+store e into ':OUTPATH:';\,
+
+                        },
                 ]
 
             },
@@ -3440,7 +3557,9 @@ store b into ':OUTPATH:';\,
             'tests' => [
                     {
                     # test reading and writing out files with .bz2 extension
+                    # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
                     'num' => 1,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
 store a into ':OUTPATH:.intermediate.bz2';
@@ -3450,7 +3569,33 @@ store b into ':OUTPATH:';\,
                     },
                     {
                     # test reading and writing with .bz extension
+                    # relying on Hadoop's bzipcodec (for 0.23/2.X and after)
                     'num' => 2,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=true'],
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz';
+b = load ':OUTPATH:.intermediate.bz';
+store b into ':OUTPATH:';\,
+                    'notmq' => 1,
+                    },
+                    {
+                    # test reading and writing out files with .bz2 extension
+                    # using Bzip2TextInputFormat.
+                    'num' => 3,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+store a into ':OUTPATH:.intermediate.bz2';
+b = load ':OUTPATH:.intermediate.bz2';
+store b into ':OUTPATH:';\,
+                    'notmq' => 1,
+                    },
+                    {
+                    # test reading and writing with .bz extension
+                    # using Bzip2TextInputFormat.
+                    'num' => 4,
+                    'java_params' => ['-Dpig.bzip.use.hadoop.inputformat=false'],
                     'pig' => q\
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
 store a into ':OUTPATH:.intermediate.bz';
@@ -3509,6 +3654,18 @@ f = foreach e generate AVG(d.age) as avg
 y = foreach a generate age/c.avg, age/f.avg;
 store y into ':OUTPATH:';\,
                     },
+                    {
+                    # test scalar with split
+                    'num' => 5,
+                    'pig' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age, gpa);
+b = group a all;
+c = foreach b generate AVG(a.age) as avg, COUNT(a.age) as cnt;
+d = foreach c generate avg;
+e = group d by $0;
+f = foreach e generate group, c.avg, c.cnt;
+store f into ':OUTPATH:';\,
+                    },
                 ]
             },
             {
@@ -3873,6 +4030,42 @@ store b into ':OUTPATH:';\,
                 ]
             },
             {
+            'name' => 'JavaScriptUDFs',
+            'tests' => [
+                    {
+                    # test double square
+                    'num' => 1,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/js/scriptingudf.js' using javascript as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.square(gpa);
+store b into ':OUTPATH:';\,
+                    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate gpa * gpa;
+store b into ':OUTPATH:';\,
+                    },
+                ]
+            },
+            {
+            'name' => 'GroovyUDFs',
+            'tests' => [
+                    {
+                    # test integer square
+                    'num' => 1,
+                    'pig' => q\
+register ':SCRIPTHOMEPATH:/groovy/scriptingudf.groovy' using groovy as myfuncs;
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate myfuncs.square(age);
+store b into ':OUTPATH:';\,
+                    'verify_pig_script' => q\
+a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name, age:int, gpa:double);
+b = foreach a generate age * age;
+store b into ':OUTPATH:';\,
+                    },
+                ]
+            },
+            {
             'name' => 'StreamingPythonUDFs',
             'tests' => [
                     {
@@ -4910,34 +5103,39 @@ store C into ':OUTPATH:';\,
                     {
                         # PIG-2286
                         'num' => 1,
+                        'floatpostprocess' => 1,
+                        'delimiter' => '	',
                         'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double, gpa:double);
                                 B = group A all;
-                                C = foreach B generate group, COR(A.age, A.gpa);
+                                C = foreach B generate group, flatten(COR(A.age, A.gpa));
                                 store C into ':OUTPATH:';?,
                         'verify_pig_script' => q?set pig.exec.nocombiner true
                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
                                 B = group A all;
-                                C = foreach B generate group, COR(A.age, A.gpa);
+                                C = foreach B generate group, flatten(COR(A.age, A.gpa));
                                 store C into ':OUTPATH:';?,
                     }, {
                         # PIG-2286, with 3 inputs to COR
                         'num' => 2,
+                        'floatpostprocess' => 1,
+                        'delimiter' => '	',
                         'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
                                 B = foreach A generate age, gpa, gpa*gpa as gpa2;
                                 C = group B all;
-                                D = foreach C generate group, COR(B.age, B.gpa, B.gpa2);
+                                D = foreach C generate group, flatten(COR(B.age, B.gpa, B.gpa2));
                                 store D into ':OUTPATH:';?,
                         'verify_pig_script' => q?set pig.exec.nocombiner true
                                 A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:double ,gpa:double);
                                 B = foreach A generate age, gpa, gpa*gpa as gpa2;
                                 C = group B all;
-                                D = foreach C generate group, COR(B.age, B.gpa, B.gpa2);
+                                D = foreach C generate group, flatten(COR(B.age, B.gpa, B.gpa2));
                                 store D into ':OUTPATH:';?,
                     }, {
                         # PIG-2385
                         'num' => 3,
                         'pig_params' => ['-M'],
                         'floatpostprocess' => 1,
+                        'delimiter' => '	',
                         'pig' => q?A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int,gpa:double);
                                 Z = group A all;
                                 Z1 = foreach Z generate AVG(A.gpa) as avg;
@@ -5075,6 +5273,30 @@ store C into ':OUTPATH:';\,
                                 C = load ':INPATH:/singlefile/votertab10k'as (name:chararray, age:int, reg:chararray, contrib:float);
                                 D = join C by name, B by name;
                                 store D into ':OUTPATH:';",
+                    },{
+                        'num' => 4,
+                        'pig' => "set pig.optimizer.rules.disabled PushUpFilter;
+                                define bb BuildBloom('Hash.JENKINS_HASH', 'fixed', '128', '3');
+                                A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                B = filter A by name == 'alice allen';
+                                C = group B all;
+                                D = foreach C generate bb(B.name);
+                                store D into ':HDFSTMP:/mybloom_4';
+                                exec;
+                                define bloom Bloom(':HDFSTMP:/mybloom_4');
+                                E = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                F = LOAD ':INPATH:/singlefile/studenttab10k' AS (name:chararray, age:int, gpa:double);
+                                G = union E, F;
+                                -- PushUpFilter is disabled to avoid filter being pushed before union
+                                H = filter G by bloom(name);
+                                store H into ':OUTPATH:';",
+                        'notmq' => 1,
+                        'verify_pig_script' => "
+                                A = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+                                B = LOAD ':INPATH:/singlefile/studenttab10k' AS (name, age:int ,gpa:double);
+                                C = UNION A,B;
+                                D = filter C by name == 'alice allen';
+                                store D into ':OUTPATH:';",
                     }
                 ],
             },{
@@ -5605,6 +5827,119 @@ store a into ':OUTPATH:';\,
 								\,
 					}
                 ]
+            },
+            {
+                'name' => 'HiveUDF',
+                'tests' => [
+                        {
+                # HiveUDF extends UDF
+                'num' => 1,
+                'pig' => q\
+                        define sin HiveUDF('sin');
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        B = foreach A generate sin(gpa);
+                        store B into ':OUTPATH:';\,
+                'verify_pig_script' => q\
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        B = foreach A generate SIN(gpa);
+                        store B into ':OUTPATH:';\,
+                        },
+                        {
+                # HiveUDF extends GenericUDF
+                'num' => 2,
+                'pig' => q\
+                        define upper HiveUDF('upper');
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        B = foreach A generate upper(name);
+                        store B into ':OUTPATH:';\,
+                'verify_pig_script' => q\
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        B = foreach A generate UPPER(name);
+                        store B into ':OUTPATH:';\,
+                        },
+                        {
+                # HiveUDTF
+                'num' => 3,
+                'pig' => q\
+                        define explode HiveUDTF('explode');
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray);
+                        B = foreach A generate TOBAG(name, age, gpa) as b;
+                        C = foreach B generate flatten(explode(b));
+                        store C into ':OUTPATH:';\,
+                'verify_pig_script' => q\
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:chararray, gpa:chararray);
+                        B = foreach A generate TOBAG(name, age, gpa) as b;
+                        C = foreach B generate flatten(b);
+                        store C into ':OUTPATH:';\,
+                        },
+                        {
+                # HiveUDAF extends GenericUDAF, with null handling
+                'num' => 4,
+                'pig' => q\
+                        define avg HiveUDAF('avg');
+                        A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double);
+                        B = group A by name;
+                        C = foreach B generate group, avg(A.age);
+                        store C into ':OUTPATH:';\,
+                'verify_pig_script' => q\
+                        A = LOAD ':INPATH:/singlefile/studentnulltab10k' as (name:chararray, age:int, gpa:double);
+                        B = group A by name;
+                        C = foreach B generate group, AVG(A.age);
+                        store C into ':OUTPATH:';\,
+                        },
+                        {
+                # HiveUDAF extends UDAF
+                'num' => 5,
+                'pig' => q\
+                        define percentile HiveUDAF('percentile');
+                        A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double);
+                        B = foreach A generate name, age, 0.5 as perc;
+                        C = group B by name;
+                        D = foreach C generate group, percentile(B.(age, perc));
+                        store D into ':OUTPATH:';\,
+                'verify_pig_script' => q\
+                        register :FUNCPATH:/datafu.jar
+                        define Quartile datafu.pig.stats.Quantile('0.5');
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double);
+                        B = group A by name;
+                        C = foreach B {
+                            sorted = order A by age;
+                            generate group, flatten(Quartile(sorted.age));
+                        }
+                        store C into ':OUTPATH:';\,
+                        },
+                        {
+                # Constant folding and ship jars
+                'num' => 6,
+                'pig' => q#
+                        sh echo -e "zach young\nzach zipper" > names.txt
+                        define in_file HiveUDF('in_file', '(null, "names.txt")');
+                        A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double);
+                        B = foreach A generate in_file(name, 'names.txt');
+                        store B into ':OUTPATH:';#,
+                'verify_pig_script' => q#register :PIGGYBANKJAR:
+                        sh echo -e "zach young\nzach zipper" > names.txt
+                        rmf :INPATH:/singlefile/names.txt
+                        fs -put names.txt :INPATH:/singlefile/names.txt
+                        define LookupInFiles org.apache.pig.piggybank.evaluation.string.LookupInFiles();
+                        A = LOAD ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:long, gpa:double);
+                        B = foreach A generate LookupInFiles(name, ':INPATH:/singlefile/names.txt');
+                        C = foreach B generate (boolean)$0;
+                        store C into ':OUTPATH:';
+                        fs -rm :INPATH:/singlefile/names.txt#
+                        },
+                        {
+                # Custom Hive UDF and MapredContext
+                'num' => 7,
+                'pig' => q\set mapred.max.split.size '100000000'
+                        register :FUNCPATH:/testudf.jar;
+                        define DummyContextUDF HiveUDF('org.apache.pig.test.udf.evalfunc.DummyContextUDF');
+                        A = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age:int, gpa:double);
+                        B = foreach A generate DummyContextUDF(age);
+                        store B into ':OUTPATH:';\,
+                'expected_err_regex' => "Encountered Warning UDF_WARNING_1 4610 time.*",
+                        }
+                ]
             }
         ],
     },

Modified: pig/branches/spark/test/e2e/pig/tests/streaming.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/streaming.conf?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/streaming.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/streaming.conf Fri Mar  4 18:17:39 2016
@@ -39,13 +39,14 @@ $cfg = {
 			'num' => 1,
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
-B = foreach A generate $2, $1, $0;
-C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'`;
+B = foreach A generate $2, $1;
+C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $2, $1}'`;
 store C into ':OUTPATH:';#,
                         'pig_win' => q#
+DEFINE CMD `awk -F "\\\t" "{print $2, $1}"` output(stdout using PigStreaming(' '));
 A = load ':INPATH:/singlefile/studenttab10k';
-B = foreach A generate $2, $1, $0;
-C = stream B through `awk "BEGIN {FS = \\\\"\t\\\\"; OFS = \\\\"\t\\\\"} {print $3, $2, $1}"`;
+B = foreach A generate $2, $1;
+C = stream B through CMD;
 store C into ':OUTPATH:';#,
                         'sql' => "select name, age, gpa from studenttab10k;",	
                         },
@@ -54,16 +55,15 @@ store C into ':OUTPATH:';#,
                         'num' => 2,
                         'pig' => q#
 A = load ':INPATH:/singlefile/studenttab10k';
-B = foreach A generate $2, $1, $0;
-C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $3, $2, $1}'` as (name, age, gpa);
-D = foreach C generate name, age;
-store D into ':OUTPATH:';#,
+B = foreach A generate $2, $1;
+C = stream B through `awk 'BEGIN {FS = "\t"; OFS = "\t"} {print $2, $1}'` as (age, gpa);
+store C into ':OUTPATH:';#,
                         'pig_win' => q#
+DEFINE CMD `awk -F "\\\t" "{print $2, $1}"` output(stdout using PigStreaming(' '));
 A = load ':INPATH:/singlefile/studenttab10k';
-B = foreach A generate $2, $1, $0;
-C = stream B through `awk "BEGIN {FS = \\\\"\t\\\\"; OFS = \\\\"\t\\\\"} {print $3, $2, $1}"` as (name, age, gpa);
-D = foreach C generate name, age;
-store D into ':OUTPATH:';#,
+B = foreach A generate $2, $1;
+C = stream B through CMD as (age, gpa);
+store C into ':OUTPATH:';#,
                         'sql' => "select name, age from studenttab10k;",
                         },
                         {
@@ -432,7 +432,7 @@ C = stream B through CMD1;
 D = stream C through CMD2;
 store D into ':OUTPATH:';#,
 			'pig_win' => q#
-define CMD1 `perl -ne "print $_;print STDERR "stderr $_";"`;
+define CMD1 `perl -ne "print $_;print STDERR 'stderr $_';"`;
 define CMD2 `Split.pl 3` input(stdin using PigStreaming(',')) ship(':SCRIPTHOMEPATH:/Split.pl');
 A = load ':INPATH:/singlefile//studenttab10k';
 B = stream A through CMD1;

Modified: pig/branches/spark/test/e2e/pig/tests/turing_jython.conf
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/tests/turing_jython.conf?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/tests/turing_jython.conf (original)
+++ pig/branches/spark/test/e2e/pig/tests/turing_jython.conf Fri Mar  4 18:17:39 2016
@@ -527,6 +527,7 @@ result = P.bind({'in1':input1, 'in2':inp
 	}, {
 #11.22 	1 	illustrate() on a complex query 	 
 		'num' => 2
+		,'execonly' => 'mapred,local' #TODO: PIG-3993: Illustrate is yet to be implemented in Tez
 		,'pig' => q\#!/usr/bin/python
 from org.apache.pig.scripting import Pig
 

Modified: pig/branches/spark/test/e2e/pig/udfs/java/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/e2e/pig/udfs/java/build.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/e2e/pig/udfs/java/build.xml (original)
+++ pig/branches/spark/test/e2e/pig/udfs/java/build.xml Fri Mar  4 18:17:39 2016
@@ -31,6 +31,9 @@
         <fileset dir="${pig.dir}">
             <include name="pig*-core-*.jar"/>
         </fileset>
+        <fileset dir="${pig.dir}/lib" erroronmissingdir="false">
+            <include name="*.jar"/>
+        </fileset>
         <fileset dir="${hadoop.common.dir}" erroronmissingdir="false">
             <include name="hadoop-common*.jar"/>
         </fileset>

Modified: pig/branches/spark/test/org/apache/pig/TestMain.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/TestMain.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/TestMain.java (original)
+++ pig/branches/spark/test/org/apache/pig/TestMain.java Fri Mar  4 18:17:39 2016
@@ -39,6 +39,9 @@ import org.apache.pig.tools.parameters.P
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.Test;
 
+import java.nio.charset.Charset;
+import com.google.common.io.Files;
+
 public class TestMain {
     private Log log = LogFactory.getLog(TestMain.class);
 
@@ -126,6 +129,29 @@ public class TestMain {
         }
     }
 
+    @Test
+    public void testlog4jConf() throws Exception {
+        Properties properties = Main.log4jConfAsProperties(null);
+        assertTrue(properties.isEmpty());
+        properties = Main.log4jConfAsProperties("");
+        assertTrue(properties.isEmpty());
+        // Test for non-existent file
+        properties = Main.log4jConfAsProperties("non-existing-" + System.currentTimeMillis());
+        assertTrue(properties.isEmpty());
+
+        // Create tmp file in under build/test/classes
+        File tmpFile = File.createTempFile("pig-log4jconf", ".properties", new File("build/test/classes"));
+        tmpFile.deleteOnExit();
+        Files.write("A=B", tmpFile, Charset.forName("UTF-8"));
+        // Read it as a resource
+        properties = Main.log4jConfAsProperties(tmpFile.getName());
+        assertEquals("B", properties.getProperty("A"));
+        // Read it as a file
+        properties = Main.log4jConfAsProperties(tmpFile.getAbsolutePath());
+        assertEquals("B", properties.getProperty("A"));
+    }
+
+
     public static class TestNotificationListener2 extends TestNotificationListener {
         protected boolean hadArgs = false;
         public TestNotificationListener2() {}

Modified: pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java (original)
+++ pig/branches/spark/test/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/TestInputSizeReducerEstimator.java Fri Mar  4 18:17:39 2016
@@ -38,30 +38,26 @@ public class TestInputSizeReducerEstimat
     @Test
     public void testGetInputSizeFromFs() throws Exception {
         long size = 2L * 1024 * 1024 * 1024;
+        POLoad load1 = createPOLoadWithSize(size, new PigStorage());
+        POLoad load2 = createPOLoadWithSize(size, new PigStorageWithStatistics());
         Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize(
-                CONF, Lists.newArrayList(createPOLoadWithSize(size, new PigStorage())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
+                CONF, Lists.newArrayList(load1), new org.apache.hadoop.mapreduce.Job(CONF)));
 
         Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize(
-                CONF,
-                Lists.newArrayList(createPOLoadWithSize(size, new PigStorageWithStatistics())),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
+                CONF, Lists.newArrayList(load2), new org.apache.hadoop.mapreduce.Job(CONF)));
 
         Assert.assertEquals(size * 2, InputSizeReducerEstimator.getTotalInputFileSize(
-                CONF,
-                Lists.newArrayList(
-                        createPOLoadWithSize(size, new PigStorage()),
-                        createPOLoadWithSize(size, new PigStorageWithStatistics())),
-                        new org.apache.hadoop.mapreduce.Job(CONF)));
+                CONF, Lists.newArrayList(load1, load2), new org.apache.hadoop.mapreduce.Job(CONF)));
 
         // Negative test - PIG-3754
-        POLoad poLoad = createPOLoadWithSize(size, new PigStorage());
-        poLoad.setLFile(new FileSpec("hbase://users", null));
+        load1.setLFile(new FileSpec("hbase://users", null));
 
-        Assert.assertEquals(-1, InputSizeReducerEstimator.getTotalInputFileSize(
-                CONF,
-                Collections.singletonList(poLoad),
-                new org.apache.hadoop.mapreduce.Job(CONF)));
+        Assert.assertEquals(0, InputSizeReducerEstimator.getTotalInputFileSize(
+                CONF, Collections.singletonList(load1), new org.apache.hadoop.mapreduce.Job(CONF)));
+
+        // Skip non-hdfs input - PIG-4679
+        Assert.assertEquals(size, InputSizeReducerEstimator.getTotalInputFileSize(
+                CONF, Lists.newArrayList(load1, load2), new org.apache.hadoop.mapreduce.Job(CONF)));
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestAvroStorage.java Fri Mar  4 18:17:39 2016
@@ -34,8 +34,11 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
+import java.util.Iterator;
 
+import com.google.common.io.Closeables;
 import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericContainer;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
 import org.apache.avro.generic.GenericDatumReader;
@@ -52,12 +55,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
 import org.apache.pig.builtin.mock.Storage.Data;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.impl.util.avro.AvroBagWrapper;
@@ -131,8 +134,8 @@ public class TestAvroStorage {
     }
 
     @BeforeClass
-    public static void setup() throws ExecException, IOException {
-        pigServerLocal = new PigServer(ExecType.LOCAL);
+    public static void setup() throws Exception {
+        pigServerLocal = new PigServer(Util.getLocalTestMode());
         Util.deleteDirectory(new File(outbasedir));
         generateInputFiles();
     }
@@ -426,6 +429,21 @@ public class TestAvroStorage {
     }
 
     @Test
+    public void testLoadRecordsSpecifyFullSchemaFromClass() throws Exception {
+      final String input = basedir + "data/avro/uncompressed/records.avro";
+      final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
+      testAvroStorage(true, basedir + "code/pig/identity.pig",
+          ImmutableMap.of(
+               "INFILE",            input,
+               "OUTFILE",           createOutputName(),
+               "AVROSTORAGE_IN_2",  "-c org.apache.pig.builtin.avro.code.java.RecordPojo",
+               "AVROSTORAGE_OUT_1", "''",
+               "AVROSTORAGE_OUT_2", "-c org.apache.pig.builtin.avro.code.java.RecordPojo")
+        );
+      verifyResults(createOutputName(),check);
+    }
+
+    @Test
     public void testLoadRecordsSpecifyFullSchemaFromFile() throws Exception {
       final String input = basedir + "data/avro/uncompressed/records.avro";
       final String check = basedir + "data/avro/uncompressed/recordsAsOutputByPig.avro";
@@ -828,7 +846,7 @@ public class TestAvroStorage {
 
     @Test
     public void testRetrieveDataFromMap() throws Exception {
-        pigServerLocal = new PigServer(ExecType.LOCAL);
+        pigServerLocal = new PigServer(Util.getLocalTestMode());
         Data data = resetData(pigServerLocal);
         Map<String, String> mapv1 = new HashMap<String, String>();
         mapv1.put("key1", "v11");
@@ -906,6 +924,96 @@ public class TestAvroStorage {
         assertEquals("bar", v);
     }
 
+    @Test
+    public void testAvroMapWrapper() throws Exception {
+        final Map<CharSequence, Object> m = new HashMap<CharSequence, Object>();
+        for (String fn : avroSchemas) {
+            final String avro = basedir + "data/avro/uncompressed/" + fn + ".avro";
+            int i = 0;
+            for (GenericContainer r : readAvroData(avro)) {
+                m.put(new Utf8(fn + i), r);
+                i += 1;
+            }
+        }
+        final AvroMapWrapper amw = new AvroMapWrapper(m);
+        // Test out all the interfaces the AvroMapWrapper supports
+        for (Object o : amw.values()) {
+            assertTrue(isValidPigObject(o));
+        }
+        for (CharSequence k : amw.keySet()) {
+            assertTrue(isValidPigObject(k));
+            assertTrue(isValidPigObject(amw.get(k)));
+        }
+        for (Map.Entry<CharSequence, Object> e : amw.entrySet()) {
+            assertTrue(isValidPigObject(e.getKey()));
+            assertTrue(isValidPigObject(e.getValue()));
+        }
+    }
+
+    private boolean isValidPigObject(Object o) {
+        if (o == null) {
+            return true;
+        }
+        switch (DataType.findType(o)) {
+            case DataType.TUPLE:
+                for (Object inner : ((Tuple) o).getAll()) {
+                    if (!isValidPigObject(inner)) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.BAG:
+                final Iterator<Tuple> bi = ((DataBag) o).iterator();
+                while (bi.hasNext()) {
+                    if (!isValidPigObject(bi.next())) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.MAP:
+                for (Object inner : ((Map) o).values()) {
+                    if (!isValidPigObject(inner)) {
+                        return false;
+                    }
+                }
+                return true;
+            case DataType.BIGDECIMAL:
+            case DataType.BIGINTEGER:
+            case DataType.BOOLEAN:
+            case DataType.BYTE:
+            case DataType.BYTEARRAY:
+            case DataType.CHARARRAY:
+            case DataType.DATETIME:
+            case DataType.DOUBLE:
+            case DataType.FLOAT:
+            case DataType.GENERIC_WRITABLECOMPARABLE:
+            case DataType.INTEGER:
+            case DataType.LONG:
+                return true;
+            case DataType.ERROR:
+            default:
+                return false;
+        }
+    }
+
+    private List<GenericContainer> readAvroData(String path) throws IOException {
+        final FileSystem fs = FileSystem.getLocal(new Configuration());
+        final Path filePath = new Path(path);
+        assertTrue("File path " + filePath + " does not exists!", fs.exists(filePath));
+        final GenericDatumReader<GenericContainer> reader = new GenericDatumReader<GenericContainer>();
+        final DataFileStream<GenericContainer> in = new DataFileStream<GenericContainer>(fs.open(filePath), reader);
+        final List<GenericContainer> avroData = new ArrayList<GenericContainer>();
+        try {
+            while (in.hasNext()) {
+                GenericContainer obj = in.next();
+                avroData.add(obj);
+            }
+        } finally {
+            Closeables.closeQuietly(in);
+        }
+        return avroData;
+    }
+
     private void testAvroStorage(boolean expectedToSucceed, String scriptFile, Map<String,String> parameterMap) throws IOException {
         pigServerLocal.setBatchOn();
 

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestPluckTuple.java Fri Mar  4 18:17:39 2016
@@ -44,7 +44,7 @@ public class TestPluckTuple {
     }
 
     @Test
-    public void testSchema() throws Exception {
+    public void testStartsWith() throws Exception {
         String query = "a = load 'a' as (x:int,y:chararray,z:long);" +
                        "b = load 'b' as (x:int,y:chararray,z:long);" +
                        "c = join a by x, b by x;" +
@@ -55,6 +55,39 @@ public class TestPluckTuple {
     }
 
     @Test
+    public void testNegativeStartsWith() throws Exception {
+        String query = "a = load 'a' as (x:int,y:chararray,z:long);" +
+                       "b = load 'b' as (x:int,y:chararray,z:long);" +
+                       "c = join a by x, b by x;" +
+                       "define pluck PluckTuple('a::','false');" +
+                       "d = foreach c generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        assertTrue(Schema.equals(pigServer.dumpSchema("b"), pigServer.dumpSchema("d"), false, true));
+    }
+
+    @Test
+    public void testPatternMatches() throws Exception {
+        String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" +
+                "a2 = load 'a2' as (x:int,y:chararray,z:long);" +
+                "b = join a1 by x, a2 by x;" +
+                "define pluck PluckTuple('a[2|3]::.*');" +
+                "c = foreach b generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        assertTrue(Schema.equals(pigServer.dumpSchema("a2"), pigServer.dumpSchema("c"), false, true));
+    }
+
+    @Test
+    public void testNegativePatternMatches() throws Exception {
+        String query = "a1 = load 'a1' as (x:int,y:chararray,z:long);" +
+                "a2 = load 'a2' as (x:int,y:chararray,z:long);" +
+                "b = join a1 by x, a2 by x;" +
+                "define pluck PluckTuple('a[2|3]::.*','false');" +
+                "c = foreach b generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        assertTrue(Schema.equals(pigServer.dumpSchema("a1"), pigServer.dumpSchema("c"), false, true));
+    }
+
+    @Test
     public void testOutput() throws Exception {
         Data data = resetData(pigServer);
 
@@ -87,4 +120,62 @@ public class TestPluckTuple {
         assertEquals(exp2, it.next());
         assertFalse(it.hasNext());
     }
+
+    @Test
+    public void testTwoPluckTuples() throws Exception {
+        Data data = resetData(pigServer);
+
+        data.set("a",
+            Utils.getSchemaFromString("xa:int,yb:chararray,zc:long"),
+            tuple(1, "hey", 3L),
+            tuple(2, "woah", 4L)
+            );
+
+        String query = "a = load 'a' using mock.Storage();" +
+            "define pluck1 PluckTuple('.a');" +
+            "define pluck2 PluckTuple('.b');" +
+            "b = foreach a generate flatten(pluck1(*)), flatten(pluck2(*));";
+        pigServer.registerQuery(query);
+        Iterator<Tuple> it = pigServer.openIterator("b");
+        assertTrue(it.hasNext());
+        assertEquals(tuple(1,"hey"), it.next());
+        assertTrue(it.hasNext());
+        assertEquals(tuple(2,"woah"), it.next());
+        assertFalse(it.hasNext());
+    }
+
+    @Test
+    public void testNegativeOutput() throws Exception {
+        Data data = resetData(pigServer);
+
+        Tuple exp1 = tuple(1, "sasf", 5L);
+        Tuple exp2 = tuple(2, "woah", 6L);
+
+        data.set("a",
+            Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+            tuple(1, "hey", 2L),
+            tuple(2, "woah", 3L),
+            tuple(3, "c", 4L)
+            );
+        data.set("b",
+            Utils.getSchemaFromString("x:int,y:chararray,z:long"),
+            exp1,
+            exp2,
+            tuple(4, "c", 7L)
+            );
+
+        String query = "a = load 'a' using mock.Storage();" +
+            "b = load 'b' using mock.Storage();" +
+            "c = join a by x, b by x;" +
+            "define pluck PluckTuple('a::','false');" +
+            "d = foreach c generate flatten(pluck(*));";
+        pigServer.registerQuery(query);
+        Iterator<Tuple> it = pigServer.openIterator("d");
+        assertTrue(it.hasNext());
+        assertEquals(exp1, it.next());
+        assertTrue(it.hasNext());
+        assertEquals(exp2, it.next());
+        assertFalse(it.hasNext());
+    }
+
 }
\ No newline at end of file

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestTOP.java Fri Mar  4 18:17:39 2016
@@ -33,25 +33,34 @@ import org.junit.Test;
 public class TestTOP {
     private static TupleFactory tupleFactory_ = TupleFactory.getInstance();
     private static BagFactory bagFactory_ = BagFactory.getInstance();
-    private static Tuple inputTuple_ = tupleFactory_.newTuple(3);
-    private static DataBag dBag_ = bagFactory_.newDefaultBag();
+    private static Tuple inputTuple_ = null;
 
     @BeforeClass
     public static void setup() throws ExecException {
+        inputTuple_ = fillTuple(0, 100);
+    }
+    
+    public static Tuple fillTuple(int start, int stop) throws ExecException {
+        Tuple tuple = tupleFactory_.newTuple(3);
+        
+        DataBag dBag = bagFactory_.newDefaultBag();
+
         // set N = 10 i.e retain top 10 tuples
-        inputTuple_.set(0, 10);
+        tuple.set(0, 10);
         // compare tuples by field number 1
-        inputTuple_.set(1, 1);
+        tuple.set(1, 1);
         // set the data bag containing the tuples
-        inputTuple_.set(2, dBag_);
+        tuple.set(2, dBag);
 
         // generate tuples of the form (group-1, 1), (group-2, 2) ...
-        for (long i = 0; i < 100; i++) {
+        for (long i = start; i < stop; i++) {
             Tuple nestedTuple = tupleFactory_.newTuple(2);
             nestedTuple.set(0, "group-" + i);
             nestedTuple.set(1, i);
-            dBag_.add(nestedTuple);
+            dBag.add(nestedTuple);
         }
+        
+        return tuple;
     }
 
     @Test
@@ -66,6 +75,26 @@ public class TestTOP {
         assertEquals(outBag.size(), 10L);
         checkItemsLT(outBag, 1, 10);
     }
+
+    @Test
+    public void testTOPAccumulator() throws Exception {
+        Tuple firstTuple = fillTuple(0, 50);
+        Tuple secondTuple = fillTuple(50, 100);
+        
+        TOP top = new TOP("DESC");
+        top.accumulate(firstTuple);
+        top.accumulate(secondTuple);
+        DataBag outBag = top.getValue();
+        assertEquals(outBag.size(), 10L);
+        checkItemsGT(outBag, 1, 89);
+
+        top = new TOP("ASC");
+        top.accumulate(firstTuple);
+        top.accumulate(secondTuple);
+        outBag = top.getValue();
+        assertEquals(outBag.size(), 10L);
+        checkItemsLT(outBag, 1, 10);
+    }
 
     @Test
     public void testTopAlgebraic() throws IOException {

Modified: pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/mock/TestMockStorage.java Fri Mar  4 18:17:39 2016
@@ -17,13 +17,15 @@
  */
 package org.apache.pig.builtin.mock;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertTrue;
-import static junit.framework.Assert.assertFalse;
-import static junit.framework.Assert.fail;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
 import static org.apache.pig.builtin.mock.Storage.resetData;
 import static org.apache.pig.builtin.mock.Storage.schema;
 import static org.apache.pig.builtin.mock.Storage.tuple;
+import static org.apache.pig.builtin.mock.Storage.bag;
+import static org.apache.pig.builtin.mock.Storage.map;
 
 import java.util.HashSet;
 import java.util.List;
@@ -47,7 +49,9 @@ public class TestMockStorage {
     data.set("foo",
         tuple("a"),
         tuple("b"),
-        tuple("c")
+        tuple("c"),
+        tuple(map("d","e", "f","g")),
+        tuple(bag(tuple("h"),tuple("i")))
         );
 
     pigServer.registerQuery("A = LOAD 'foo' USING mock.Storage();");
@@ -57,8 +61,10 @@ public class TestMockStorage {
     assertEquals(tuple("a"), out.get(0));
     assertEquals(tuple("b"), out.get(1));
     assertEquals(tuple("c"), out.get(2));
+    assertEquals(tuple(map("f", "g", "d", "e" )), out.get(3));
+    assertEquals(tuple(bag(tuple("h"),tuple("i"))), out.get(4));
   }
-  
+
   @Test
   public void testMockSchema() throws Exception {
     PigServer pigServer = new PigServer(Util.getLocalTestMode());

Modified: pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java (original)
+++ pig/branches/spark/test/org/apache/pig/impl/builtin/TestStreamingUDF.java Fri Mar  4 18:17:39 2016
@@ -29,6 +29,7 @@ import java.util.List;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.test.MiniGenericCluster;
@@ -46,6 +47,7 @@ import org.junit.runner.RunWith;
 @RunWith(OrderedJUnit4Runner.class)
 @TestOrder({
     "testPythonUDF_onCluster",
+    "testPythonUDF_withBytearrayAndBytes_onCluster",
     "testPythonUDF__allTypes",
     "testPythonUDF__withBigDecimal",
     "testPythonUDF",
@@ -111,6 +113,41 @@ public class TestStreamingUDF {
         assertEquals(expected0, actual0);
         assertEquals(expected1, actual1);
     }
+    
+    @Test
+    public void testPythonUDF_withBytearrayAndBytes_onCluster() throws Exception {
+        pigServerMapReduce = new PigServer(cluster.getExecType(), cluster.getProperties());
+
+        
+        String[] pythonScript = {
+            "from pig_util import outputSchema",
+            "import os",
+            "@outputSchema('f:bytearray')",
+            "def foo(bar):",
+            "    return bytearray(os.urandom(1000))"
+        };
+        
+        Util.createLocalInputFile( "pyfilewBaB.py", pythonScript);
+
+        String[] input = {
+            "field1"
+        };
+        Util.createLocalInputFile("testTupleBaB", input);
+        Util.copyFromLocalToCluster(cluster, "testTupleBaB", "testTupleBaB");
+
+        pigServerMapReduce.registerQuery("REGISTER 'pyfilewBaB.py' USING streaming_python AS pf;");
+        pigServerMapReduce.registerQuery("A = LOAD 'testTupleBaB' as (b:chararray);");
+        pigServerMapReduce.registerQuery("B = FOREACH A generate pf.foo(b);");
+
+        Iterator<Tuple> iter = pigServerMapReduce.openIterator("B");
+        assertTrue(iter.hasNext());
+        Object result = iter.next().get(0);
+
+        //Mostly we're happy we got a result w/o throwing an exception, but we'll
+        //do a basic check.
+        assertTrue(result instanceof DataByteArray);
+        assertEquals(1000, ((DataByteArray)result).size());
+    }
 
     @Test
     public void testPythonUDF() throws Exception {
@@ -159,7 +196,6 @@ public class TestStreamingUDF {
         };
         Util.createLocalInputFile( "pyfileNL.py", pythonScript);
 
-        
         Data data = resetData(pigServerLocal);
         Tuple t0 = tf.newTuple(2);
         t0.set(0, "field10");

Modified: pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java (original)
+++ pig/branches/spark/test/org/apache/pig/impl/streaming/TestPigStreamingUDF.java Fri Mar  4 18:17:39 2016
@@ -369,6 +369,18 @@ public class TestPigStreamingUDF {
     }
     
     @Test
+    public void testDeserialize__emptyMap() throws IOException {
+        byte[] input = "|[_|]_|_".getBytes();
+        FieldSchema fs = new FieldSchema("", DataType.MAP);
+        PigStreamingUDF sp = new PigStreamingUDF(fs);
+
+        Map<String, String> expectedOutput = new TreeMap<String, String>();
+
+        Object out = sp.deserialize(input, 0, input.length);
+        Assert.assertEquals(tf.newTuple(expectedOutput), out);
+    }
+
+    @Test
     public void testDeserialize__bug() throws Exception {
         byte[] input = "|(_|-_|,_32|,_987654321098765432|,_987654321098765432|)_|_".getBytes();
 

Modified: pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java (original)
+++ pig/branches/spark/test/org/apache/pig/impl/streaming/TestStreamingUDFOutputHandler.java Fri Mar  4 18:17:39 2016
@@ -65,6 +65,20 @@ public class TestStreamingUDFOutputHandl
         Assert.assertEquals(tf.newTuple("abc\ndef\nghi\njkl"), t);
     }
     
+    @Test
+    public void testGetValue__earlyNewLine() throws Exception{
+        FieldSchema fs = new FieldSchema("", DataType.CHARARRAY);
+        String data = "\na|_\n";
+        
+        PigStreamingUDF deserializer = new PigStreamingUDF(fs);
+        OutputHandler outty = new StreamingUDFOutputHandler(deserializer);
+        outty.bindTo(null, getIn(data), 0, 0);
+        
+        Tuple t = outty.getNext();
+        
+        Assert.assertEquals(tf.newTuple("\na"), t);
+    }
+    
     private BufferedPositionedInputStream getIn(String input) throws UnsupportedEncodingException {
         InputStream stream = new ByteArrayInputStream(input.getBytes("UTF-8"));
         BufferedPositionedInputStream result = new BufferedPositionedInputStream(stream);

Modified: pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java (original)
+++ pig/branches/spark/test/org/apache/pig/newplan/logical/optimizer/TestImplicitSplitOnTuple.java Fri Mar  4 18:17:39 2016
@@ -56,7 +56,7 @@ public class TestImplicitSplitOnTuple {
                 "D2 = FOREACH tuplified GENERATE tuplify.memberId as memberId, tuplify.shopId as shopId, score AS score;"+
                 "J = JOIN D1 By shopId, D2 by shopId;"+
                 "K = FOREACH J GENERATE D1::memberId AS member_id1, D2::memberId AS member_id2, D1::shopId as shop;"+
-                "L = ORDER K by shop;"+
+                "L = ORDER K by shop, member_id1, member_id2;"+
                 "STORE L into 'output' using mock.Storage;");
         List<Tuple> list = data.get("output");
         assertEquals("list: "+list, 20, list.size());

Modified: pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestAccumulator.java Fri Mar  4 18:17:39 2016
@@ -49,7 +49,7 @@ public class TestAccumulator {
     private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
     private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
     private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
-    private static final String INPUT_DIR = "build/test/data";
+    private static final String INPUT_DIR = Util.getTestDirectory(TestAccumulator.class);
 
     private static PigServer pigServer;
     private static Properties properties;
@@ -88,7 +88,7 @@ public class TestAccumulator {
     }
 
     private static void createFiles() throws IOException {
-        new File(INPUT_DIR).mkdir();
+        new File(INPUT_DIR).mkdirs();
 
         PrintWriter w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE1));
 
@@ -558,6 +558,21 @@ public class TestAccumulator {
         }
     }
 
+    // Pig 4365
+    @Test
+    public void testAccumWithTOP() throws IOException{
+        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
+        pigServer.registerQuery("B = group A all;");
+        pigServer.registerQuery("D = foreach B { C = TOP(5, 0, A); generate flatten(C); }");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+    
+        List<Tuple> expected = Util.getTuplesFromConstantTupleStrings(
+                new String[] {"(200,1.1)", "(200,2.1)", "(300,3.3)", "(400,null)", "(400,null)" });
+        
+        Util.checkQueryOutputsAfterSort(iter, expected);
+    }
+
     @Test
     public void testAccumWithMultiBuiltin() throws IOException{
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (id:int, c:chararray);");

Modified: pig/branches/spark/test/org/apache/pig/test/TestAssert.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestAssert.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestAssert.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestAssert.java Fri Mar  4 18:17:39 2016
@@ -116,13 +116,13 @@ public class TestAssert {
       try {
           pigServer.openIterator("A");
       } catch (FrontendException fe) {
-          if (!Util.isSparkExecType(Util.getLocalTestMode())) {
+          if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
+                  || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
               Assert.assertTrue(fe.getCause().getMessage().contains(
-                      "Job terminated with anomalous status FAILED"));
-          }
-          else {
+                      "Assertion violated: i should be greater than 1"));
+          } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(
-                      "i should be greater than 1"));
+                      "Job terminated with anomalous status FAILED"));
           }
       }
   }
@@ -148,13 +148,13 @@ public class TestAssert {
       try {
           pigServer.openIterator("A");
       } catch (FrontendException fe) {
-          if (!Util.isSparkExecType(Util.getLocalTestMode())) {
+          if (pigServer.getPigContext().getExecType().toString().startsWith("TEZ")
+                  || pigServer.getPigContext().getExecType().toString().startsWith("SPARK")) {
               Assert.assertTrue(fe.getCause().getMessage().contains(
-                      "Job terminated with anomalous status FAILED"));
-          }
-          else {
+                      "Assertion violated: i should be greater than 1"));
+          } else {
               Assert.assertTrue(fe.getCause().getMessage().contains(
-                      "i should be greater than 1"));
+                      "Job terminated with anomalous status FAILED"));
           }
       }
   }

Modified: pig/branches/spark/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestBZip.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestBZip.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestBZip.java Fri Mar  4 18:17:39 2016
@@ -28,6 +28,7 @@ import java.io.FileReader;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Properties;
@@ -42,19 +43,50 @@ import org.apache.hadoop.mapreduce.Input
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.test.utils.CloseAwareFSDataInputStream;
+import org.apache.pig.test.utils.CloseAwareOutputStream;
 import org.apache.tools.bzip2r.CBZip2InputStream;
 import org.apache.tools.bzip2r.CBZip2OutputStream;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
 
+@RunWith(Parameterized.class)
 public class TestBZip {
     private static Properties properties;
     private static MiniGenericCluster cluster;
 
+    @Parameters(name = "pig.bzip.use.hadoop.inputformat = {0}.")
+    public static Iterable<Object[]> data() {
+        if ( HadoopShims.isHadoopYARN() ) {
+            return Arrays.asList(new Object[][] {
+                { false  },
+                { true   }
+            });
+        } else {
+            return Arrays.asList(new Object[][] {
+                { false }
+            });
+        }
+    }
+
+    public TestBZip (Boolean useBzipFromHadoop) {
+        properties = cluster.getProperties();
+        properties.setProperty("pig.bzip.use.hadoop.inputformat", useBzipFromHadoop.toString());
+    }
+
+    @Rule
+    public TemporaryFolder folder = new TemporaryFolder();
+
     @BeforeClass
     public static void oneTimeSetUp() throws Exception {
         cluster = MiniGenericCluster.buildCluster();
@@ -73,10 +105,9 @@ public class TestBZip {
     public void testBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -121,9 +152,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutput);
     }
 
    /**
@@ -133,10 +161,9 @@ public class TestBZip {
     public void testBzipInPig2() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".bz2");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.bz2");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutput = Util.removeColon(out.getAbsolutePath());
 
@@ -181,9 +208,6 @@ public class TestBZip {
         for (int j = 1; j < 100; j++) {
             assertEquals(new Integer(j), map.get(j));
         }
-
-        in.delete();
-        out.delete();
     }
 
     //see PIG-2391
@@ -197,10 +221,9 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
         String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
-        in.deleteOnExit();
 
         try {
             CBZip2OutputStream cos =
@@ -230,7 +253,6 @@ public class TestBZip {
                 it2.next();
             }
         } finally {
-            in.delete();
             Util.deleteFile(cluster, "intermediate.bz");
             Util.deleteFile(cluster, "final.bz");
         }
@@ -249,9 +271,8 @@ public class TestBZip {
         };
 
         // bzip compressed input
-        File in = File.createTempFile("junit", ".bz2");
+        File in = folder.newFile("junit-in.bz2");
         String compressedInputFileName = in.getAbsolutePath();
-        in.deleteOnExit();
         String clusterCompressedFilePath = Util.removeColon(compressedInputFileName);
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
@@ -291,7 +312,6 @@ public class TestBZip {
             assertFalse(it2.hasNext());
 
         } finally {
-            in.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
             Util.deleteFile(cluster, clusterCompressedFilePath);
         }
@@ -305,10 +325,9 @@ public class TestBZip {
      public void testEmptyBzipInPig() throws Exception {
         PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        File in = File.createTempFile("junit", ".tmp");
-        in.deleteOnExit();
+        File in = folder.newFile("junit-in.tmp");
 
-        File out = File.createTempFile("junit", ".bz2");
+        File out = folder.newFile("junit-out.bz2");
         out.delete();
         String clusterOutputFilePath = Util.removeColon(out.getAbsolutePath());
 
@@ -336,10 +355,6 @@ public class TestBZip {
 
         pig.registerQuery("B = load '" + Util.encodeEscape(clusterOutputFilePath) + "';");
         pig.openIterator("B");
-
-        in.delete();
-        Util.deleteFile(cluster, clusterOutputFilePath);
-
     }
 
     /**
@@ -347,8 +362,7 @@ public class TestBZip {
      */
     @Test
     public void testEmptyBzip() throws Exception {
-        File tmp = File.createTempFile("junit", ".tmp");
-        tmp.deleteOnExit();
+        File tmp = folder.newFile("junit.tmp");
         CBZip2OutputStream cos = new CBZip2OutputStream(new FileOutputStream(
                 tmp));
         cos.close();
@@ -358,7 +372,25 @@ public class TestBZip {
                 fs.open(new Path(tmp.getAbsolutePath())), -1, tmp.length());
         assertEquals(-1, cis.read(new byte[100]));
         cis.close();
-        tmp.delete();
+    }
+
+    @Test
+    public void testInnerStreamGetsClosed() throws Exception {
+        File tmp = folder.newFile("junit.tmp");
+
+        CloseAwareOutputStream out = new CloseAwareOutputStream(new FileOutputStream(tmp));
+        CBZip2OutputStream cos = new CBZip2OutputStream(out);
+        assertFalse(out.isClosed());
+        cos.close();
+        assertTrue(out.isClosed());
+
+        FileSystem fs = FileSystem.getLocal(new Configuration(false));
+        Path path = new Path(tmp.getAbsolutePath());
+        CloseAwareFSDataInputStream in = new CloseAwareFSDataInputStream(fs.open(path));
+        CBZip2InputStream cis = new CBZip2InputStream(in, -1, tmp.length());
+        assertFalse(in.isClosed());
+        cis.close();
+        assertTrue(in.isClosed());
     }
 
     /**
@@ -451,6 +483,7 @@ public class TestBZip {
             props.put(entry.getKey(), entry.getValue());
         }
         props.setProperty(MRConfiguration.MAX_SPLIT_SIZE, Integer.toString(splitSize));
+        props.setProperty("pig.noSplitCombination", "true");
         PigServer pig = new PigServer(cluster.getExecType(), props);
         FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(props));
         fs.delete(new Path(outputFile), true);
@@ -464,7 +497,7 @@ public class TestBZip {
                 numPartFiles++;
             }
         }
-        assertEquals(true, numPartFiles > 0);
+        assertEquals(true, numPartFiles > 1);
 
         // verify record count to verify we read bzip data correctly
         Util.registerMultiLineQuery(pig, script);
@@ -480,26 +513,32 @@ public class TestBZip {
                 "1\t2\r3\t4"
         };
 
-        String inputFileName = "input.txt";
-        Util.createInputFile(cluster, inputFileName, inputData);
-
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
-
-        pig.setBatchOn();
-        pig.registerQuery("a = load '" +  inputFileName + "';");
-        pig.registerQuery("store a into 'output.bz2';");
-        pig.registerQuery("store a into 'output';");
-        pig.executeBatch();
+        try {
+            String inputFileName = "input.txt";
+            Util.createInputFile(cluster, inputFileName, inputData);
 
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        outputFiles = fs.listStatus(new Path("output.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            pig.setBatchOn();
+            pig.registerQuery("a = load '" +  inputFileName + "';");
+            pig.registerQuery("store a into 'output.bz2';");
+            pig.registerQuery("store a into 'output';");
+            pig.executeBatch();
+
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster, "input.txt");
+            Util.deleteFile(cluster, "output.bz2");
+            Util.deleteFile(cluster, "output");
+        }
     }
 
     @Test
@@ -511,34 +550,41 @@ public class TestBZip {
         String inputFileName = "input2.txt";
         Util.createInputFile(cluster, inputFileName, inputData);
 
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
-        PigContext pigContext = pig.getPigContext();
-        pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
-        pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
-
-        pig.setBatchOn();
-        pig.registerQuery("a = load '" +  inputFileName + "';");
-        pig.registerQuery("store a into 'output2.bz2';");
-        pig.registerQuery("store a into 'output2';");
-        pig.executeBatch();
-
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
-
-        outputFiles = fs.listStatus(new Path("output2.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+        try {
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
+            PigContext pigContext = pig.getPigContext();
+            pigContext.getProperties().setProperty( "output.compression.enabled", "true" );
+            pigContext.getProperties().setProperty( "output.compression.codec", "org.apache.hadoop.io.compress.BZip2Codec" );
+
+            pig.setBatchOn();
+            pig.registerQuery("a = load '" +  inputFileName + "';");
+            pig.registerQuery("store a into 'output2.bz2';");
+            pig.registerQuery("store a into 'output2';");
+            pig.executeBatch();
+
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output2.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster,"input2.txt");
+            Util.deleteFile(cluster,"output2.bz2");
+            Util.deleteFile(cluster,"output2");
+        }
     }
 
     /**
-     * Tests that Pig throws an Exception when the input files to be loaded are actually
-     * a result of concatenating 2 or more bz2 files. Pig should not silently ignore part
-     * of the input data.
+     * Tests that Pig's Bzip2TextInputFormat throws an IOException when the input files to be loaded are actually
+     * a result of concatenating 2 or more bz2 files. It should not silently ignore part
+     * of the input data.  When, hadoop's TextInpuFormat is used(PIG-3251), it should
+     * successfully read this concatenated bzip file to the end.
      */
-    @Test (expected=IOException.class)
+    @Test
     public void testBZ2Concatenation() throws Exception {
         String[] inputData1 = new String[] {
                 "1\ta",
@@ -556,14 +602,12 @@ public class TestBZip {
         };
 
         // bzip compressed input file1
-        File in1 = File.createTempFile("junit", ".bz2");
+        File in1 = folder.newFile("junit-in1.bz2");
         String compressedInputFileName1 = in1.getAbsolutePath();
-        in1.deleteOnExit();
 
         // file2
-        File in2 = File.createTempFile("junit", ".bz2");
+        File in2 = folder.newFile("junit-in2.bz2");
         String compressedInputFileName2 = in2.getAbsolutePath();
-        in1.deleteOnExit();
 
         String unCompressedInputFileName = "testRecordDelims-uncomp.txt";
         Util.createInputFile(cluster, unCompressedInputFileName, inputDataMerged);
@@ -603,19 +647,29 @@ public class TestBZip {
             // pig script to read compressed concatenated input
             script = "a = load '" + Util.encodeEscape(compressedInputFileName1) +"';";
             pig.registerQuery(script);
-            Iterator<Tuple> it2 = pig.openIterator("a");
 
-            while(it1.hasNext()) {
-                Tuple t1 = it1.next();
-                Tuple t2 = it2.next();
-                assertEquals(t1, t2);
+            try {
+              Iterator<Tuple> it2 = pig.openIterator("a");
+              while(it1.hasNext()) {
+                  Tuple t1 = it1.next();
+                  Tuple t2 = it2.next();
+                  assertEquals(t1, t2);
+              }
+
+              assertFalse(it2.hasNext());
+
+              // When pig.bzip.use.hadoop.inputformat=true, it should successfully read the concatenated bzip file
+              assertEquals("IOException should be thrown when pig's own Bzip2TextInputFormat is used",
+                           properties.getProperty("pig.bzip.use.hadoop.inputformat"),
+                           "true");
+
+            } catch (IOException e) {
+                assertEquals("IOException should only be thrown when pig's own Bzip2TextInputFormat is used",
+                             properties.getProperty("pig.bzip.use.hadoop.inputformat"),
+                             "false");
             }
 
-            assertFalse(it2.hasNext());
-
         } finally {
-            in1.delete();
-            in2.delete();
             Util.deleteFile(cluster, unCompressedInputFileName);
         }
 
@@ -625,11 +679,12 @@ public class TestBZip {
      * Concatenate the contents of src file to the contents of dest file
      */
     private void catInto(String src, String dest) throws IOException {
-        BufferedWriter out = new BufferedWriter(new FileWriter(dest, true));
-        BufferedReader in = new BufferedReader(new FileReader(src));
-        String str;
-        while ((str = in.readLine()) != null) {
-            out.write(str);
+        FileOutputStream out = new FileOutputStream(new File(dest) , true);
+        FileInputStream in = new FileInputStream(new File(src));
+        byte[] buffer = new byte[4096];
+        int bytesread;
+        while ((bytesread = in.read(buffer)) != -1) {
+            out.write(buffer,0, bytesread);
         }
         in.close();
         out.close();
@@ -658,20 +713,26 @@ public class TestBZip {
         pw.println(inputScript);
         pw.close();
 
-        PigServer pig = new PigServer(cluster.getExecType(), properties);
-
-        FileInputStream fis = new FileInputStream(inputScriptName);
-        pig.registerScript(fis);
+        try {
+            PigServer pig = new PigServer(cluster.getExecType(), properties);
 
-        FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
-                pig.getPigContext().getProperties()));
-        FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            FileInputStream fis = new FileInputStream(inputScriptName);
+            pig.registerScript(fis);
 
-        outputFiles = fs.listStatus(new Path("output3.bz2"),
-                Util.getSuccessMarkerPathFilter());
-        assertTrue(outputFiles[0].getLen() > 0);
+            FileSystem fs = FileSystem.get(ConfigurationUtil.toConfiguration(
+                    pig.getPigContext().getProperties()));
+            FileStatus[] outputFiles = fs.listStatus(new Path("output3"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+
+            outputFiles = fs.listStatus(new Path("output3.bz2"),
+                    Util.getSuccessMarkerPathFilter());
+            assertTrue(outputFiles[0].getLen() > 0);
+        } finally {
+            Util.deleteFile(cluster, "input3.txt");
+            Util.deleteFile(cluster, "output3.bz2");
+            Util.deleteFile(cluster, "output3");
+        }
     }
 
 }



Mime
View raw message