pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [2/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/main...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java (original)
+++ pig/branches/spark/contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/TestCSVExcelStorage.java Fri Mar  4 18:17:39 2016
@@ -25,8 +25,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Properties;
 
-import junit.framework.Assert;
-
 import org.apache.commons.lang.StringUtils;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
@@ -35,13 +33,16 @@ import org.apache.pig.builtin.mock.Stora
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.tools.parameters.ParseException;
 import org.apache.pig.test.Util;
-
+import org.apache.pig.tools.parameters.ParseException;
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import static org.apache.pig.builtin.mock.Storage.resetData;
+import static org.apache.pig.builtin.mock.Storage.tuple;
+
 public class TestCSVExcelStorage  {
 
     Properties props = new Properties();
@@ -71,7 +72,7 @@ public class TestCSVExcelStorage  {
             "\"Mac \"\"the knife\"\"\",Cohen,30",
             "\"Conrad\nEmil\",Dinger,40",
                 "Emil,\"\nDinger\",40",
-                "Quote problem,\"My \"\"famous\"\"\nsong\",60",
+                "Quote problem,\"My \"\"famous\"\"\nsong\",",
             "1st Field,\"A poem that continues\nfor several lines\ndo we\n(even with \r)handle that?\",Good,Fairy",
     };
 
@@ -85,7 +86,7 @@ public class TestCSVExcelStorage  {
             add(Util.createTuple(new String[] {"Mac \"the knife\"", "Cohen", "30"}));
             add(Util.createTuple(new String[] {"Conrad\nEmil", "Dinger", "40"}));
             add(Util.createTuple(new String[] {"Emil", "\nDinger", "40"}));
-            add(Util.createTuple(new String[] {"Quote problem", "My \"famous\"\nsong", "60"}));
+            add(Util.createTuple(new String[] {"Quote problem", "My \"famous\"\nsong", ""}));
             add(Util.createTuple(new String[] {"1st Field", "A poem that continues\nfor several lines\ndo we\n(even with \n)handle that?", "Good", "Fairy"}));
         }
     };
@@ -103,7 +104,7 @@ public class TestCSVExcelStorage  {
             add(Util.createTuple(new String[] {"Emil"}));
             add(Util.createTuple(new String[] {"Dinger,40"}));  // Trailing double quote after Emil eats rest of line
             add(Util.createTuple(new String[] {"Quote problem", "My \"famous\""}));
-            add(Util.createTuple(new String[] {"song,60"}));
+            add(Util.createTuple(new String[] {"song,"}));
             add(Util.createTuple(new String[] {"1st Field", "A poem that continues"}));
             add(Util.createTuple(new String[] {"for several lines"}));
             add(Util.createTuple(new String[] {"do we"}));
@@ -322,7 +323,7 @@ public class TestCSVExcelStorage  {
         return f.getAbsolutePath().replaceAll("\\\\", "/");
     }
 
-    // Comprehensive loader test: uses several datatypes; skips the header; 
+    // Comprehensive loader test: uses several datatypes; skips the header;
     //                            handles missing/extra fields; handles quotes, commas, newlines
     @Test
     public void load() throws IOException, ParseException {
@@ -330,7 +331,7 @@ public class TestCSVExcelStorage  {
 
         pig.registerQuery(
             "data = load '" + dataDir + testFile + "' " +
-            "using org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER') " + 
+            "using org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER') " +
             "AS (" + schema + ");"
         );
 
@@ -365,10 +366,10 @@ public class TestCSVExcelStorage  {
 
         pig.registerQuery(
             "data = load '" + dataDir + input + "' " +
-            "using org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER') " + 
+            "using org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'SKIP_INPUT_HEADER') " +
             "AS (" + schema + ");"
         );
-        pig.store("data", dataDir + output, 
+        pig.store("data", dataDir + output,
                   "org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'WRITE_OUTPUT_HEADER')");
 
         // Read it back
@@ -415,10 +416,10 @@ public class TestCSVExcelStorage  {
 
          pig.registerQuery(
             "data = load '" + dataDir + input + "' " +
-            "using PigStorage('|')" + 
+            "using PigStorage('|')" +
             "AS (" + schema + ");"
         );
-        pig.store("data", dataDir + output, 
+        pig.store("data", dataDir + output,
                   "org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'SKIP_OUTPUT_HEADER')");
 
         pig.registerQuery(
@@ -432,9 +433,17 @@ public class TestCSVExcelStorage  {
             "(\"(1,)\",\"(1,(2,))\",\"{(1,),(3,)}\",\"{(1,{(,3),(,5)}),(6,{(7,),(9,)})}\",\"{b=2, a=null}\",\"{d=null, a={b=null, c=2}}\")"
         };
 
-        Assert.assertEquals(StringUtils.join(expected, "\n"), StringUtils.join(data, "\n"));
+        String[] expectedJDK8 = {
+                "(\"(1,2)\",\"(1,(2,3))\",\"{(1,2),(3,4)}\",\"{(1,{(2,3),(4,5)}),(6,{(7,8),(9,0)})}\",\"{a=1, b=2}\",\"{a={b=1, c=2}, d={e=3, f=4}}\")",
+                "(\"(1,)\",\"(1,(2,))\",\"{(1,),(3,)}\",\"{(1,{(,3),(,5)}),(6,{(7,),(9,)})}\",\"{a=null, b=2}\",\"{a={b=null, c=2}, d=null}\")"
+            };
+
+        String actual = StringUtils.join(data, "\n");
+        Assert.assertTrue("Failed to match. Output was " + actual,
+                StringUtils.join(expected, "\n").equals(actual)
+                        || StringUtils.join(expectedJDK8, "\n").equals(actual));
     }
-    
+
     // Test that STORE stores CR (\r) quoted/unquoted in yes_multiline/no_multiline
     @Test
     public void storeCR() throws IOException {
@@ -478,4 +487,71 @@ public class TestCSVExcelStorage  {
         Assert.assertEquals(expectedNoMultiline, actual);
     }
 
+    // Test to validate that each CSV file gets the correct header if they are run at the same time (PIG-4689)
+    @Test
+    public void storeTwoFilesWithDifferentHeaders() throws IOException, ParseException {
+        pig.setBatchOn(); // Very important to reproduce this bug
+
+        Storage.Data data = resetData(pig);
+
+        String fooOutFileName = createOutputFileName();
+        data.set(
+                "foo",
+                "foo_1:chararray",
+                tuple("A")
+        );
+        pig.registerQuery(
+                "foo = LOAD 'foo' USING mock.Storage();"
+        );
+        pig.registerQuery(
+                "STORE foo INTO '" + fooOutFileName + "' " +
+                "USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'WRITE_OUTPUT_HEADER');"
+        );
+
+        String barOutFileName = createOutputFileName();
+        data.set(
+                "bar",
+                "bar_1:chararray, bar_2:chararray",
+                tuple("B","C")
+        );
+        pig.registerQuery(
+                "bar = LOAD 'bar' USING mock.Storage();"
+        );
+        pig.registerQuery(
+                "STORE bar INTO '" + barOutFileName + "' " +
+                "USING org.apache.pig.piggybank.storage.CSVExcelStorage(',', 'YES_MULTILINE', 'UNIX', 'WRITE_OUTPUT_HEADER');"
+        );
+
+        pig.executeBatch();
+
+        // -----
+
+        pig.registerQuery(
+                "fooCsv = load '" + fooOutFileName + "' ;"
+        );
+
+        Iterator<Tuple> fooCsv = pig.openIterator("fooCsv");
+        String[] expectedFooCsv = {
+                // header should be written because we used the 'WRITE_OUTPUT_HEADER' argument
+                "(foo_1)",
+                "(A)"
+        };
+
+        Assert.assertEquals(StringUtils.join(expectedFooCsv, "\n"), StringUtils.join(fooCsv, "\n"));
+
+        // -----
+
+        pig.registerQuery(
+                "barCsv = load '" + barOutFileName + "' ;"
+        );
+        Iterator<Tuple> barCsv = pig.openIterator("barCsv");
+        String[] expectedbarCsv = {
+                // header should be written because we used the 'WRITE_OUTPUT_HEADER' argument
+                "(bar_1,bar_2)",
+                "(B,C)"
+        };
+
+        Assert.assertEquals(StringUtils.join(expectedbarCsv, "\n"), StringUtils.join(barCsv, "\n"));
+    }
+
 }

Modified: pig/branches/spark/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/ivy.xml (original)
+++ pig/branches/spark/ivy.xml Fri Mar  4 18:17:39 2016
@@ -80,6 +80,14 @@
       conf="compile->master"/>
     <dependency org="org.apache.httpcomponents" name="httpcore" rev="${httpcomponents.version}"
       conf="compile->master"/>
+    <dependency org="nl.basjes.parse.httpdlog" name="httpdlog-pigloader" rev="${basjes-httpdlog-pigloader.version}"
+      conf="compile->master"/>
+    <dependency org="nl.basjes.parse.httpdlog" name="httpdlog-inputformat" rev="${basjes-httpdlog-pigloader.version}"
+      conf="compile->master"/>
+    <dependency org="nl.basjes.parse.httpdlog" name="httpdlog-parser" rev="${basjes-httpdlog-pigloader.version}"
+      conf="compile->master"/>
+    <dependency org="nl.basjes.parse" name="parser-core" rev="${basjes-httpdlog-pigloader.version}"
+      conf="compile->master"/>
     <dependency org="commons-configuration" name="commons-configuration" rev="${commons-configuration.version}"
       conf="hadoop23->master"/>
     <dependency org="commons-collections" name="commons-collections" rev="${commons-collections.version}"
@@ -88,12 +96,8 @@
       conf="hadoop23->master"/>
     <dependency org="javax.ws.rs" name="jsr311-api" rev="${jsr311-api.version}"
       conf="hadoop23->master"/>
-    <dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}"
-      conf="hadoop23->master"/>
     <dependency org="com.google.protobuf" name="protobuf-java" rev="${protobuf-java.version}"
       conf="hadoop23->master"/>
-    <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty-util.version}"
-      conf="hadoop23->master"/>
     <dependency org="javax.inject" name="javax.inject" rev="${javax-inject.version}"
       conf="hadoop23->master"/>
     <dependency org="javax.xml.bind" name="jaxb-api" rev="${jaxb-api.version}"
@@ -172,8 +176,10 @@
     <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-hs" 
       rev="${hadoop-mapreduce.version}" conf="hadoop23->master"/>
     <dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}"
-      conf="compile->master"/>
-    <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty-util.version}"
+      conf="compile->master">
+      <artifact name="jetty" ext="jar" />
+    </dependency>
+    <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty.version}"
       conf="compile->master"/>
     <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}"
       conf="compile->master;checkstyle->master"/>
@@ -190,12 +196,14 @@
       conf="hadoop20->default;checkstyle->master">
       <exclude org="org.codehaus.jackson" module="jackson-core-asl"/>
       <exclude org="org.codehaus.jackson" module="jackson-mapper-asl"/>
+      <exclude org="io.netty" module="netty"/>
     </dependency>
     <dependency org="org.apache.avro" name="avro-mapred" rev="${avro.version}"
       conf="hadoop23->default;checkstyle->master">
       <artifact name="avro-mapred" type="jar" m:classifier="hadoop2"/>
       <exclude org="org.codehaus.jackson" module="jackson-core-asl"/>
       <exclude org="org.codehaus.jackson" module="jackson-mapper-asl"/>
+      <exclude org="io.netty" module="netty"/>
     </dependency>
     <dependency org="org.apache.avro" name="trevni-core" rev="${avro.version}"
       conf="compile->default;checkstyle->master">
@@ -234,6 +242,8 @@
       conf="releaseaudit->default"/>
     <dependency org="org.codehaus.groovy" name="groovy-all" rev="${groovy.version}"
       conf="compile->master"/>
+    <dependency org="org.apache.ivy" name="ivy" rev="${ivy.version}"
+      conf="compile->master"/>
     <dependency org="org.codehaus.jackson" name="jackson-mapper-asl" rev="${jackson.version}"
       conf="compile->master"/>
     <dependency org="org.codehaus.jackson" name="jackson-core-asl" rev="${jackson.version}"
@@ -249,15 +259,12 @@
     <dependency org="org.antlr" name="antlr" rev="${antlr.version}" conf="compile->master"/>
     <dependency org="org.antlr" name="antlr-runtime" rev="${antlr.version}" conf="compile->default"/>
     <dependency org="org.antlr" name="ST4" rev="${stringtemplate.version}" conf="compile->default"/>
-    <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="compile->master">
-    	<exclude org="org.jboss.netty" module="netty"/>
-    </dependency>
-
-    <!-- dependency org="org.jboss.netty" name="netty" rev="3.2.2.Final" conf="test->master"/ -->
+    <dependency org="org.apache.zookeeper" name="zookeeper" rev="${zookeeper.version}" conf="compile->master"/>
+    <dependency org="io.netty" name="netty" rev="${netty.version}" conf="test->master"/>
     <dependency org="dk.brics.automaton" name="automaton" rev="1.11-8" conf="compile->default"/>
 
     <dependency org="org.jruby" name="jruby-complete" rev="${jruby.version}" conf="compile->master"/>
-    <!-- dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/ -->
+    <dependency org="asm" name="asm" rev="${asm.version}" conf="compile->default"/>
 
     <!-- HBase dependency in format for releases up to 0.94 (including) -->
     <dependency org="org.apache.hbase" name="hbase" rev="${hbase94.version}" conf="hbase94->master">
@@ -358,10 +365,14 @@
       <exclude org="asm" module="asm"/>
     </dependency>
 
+    <dependency org="org.htrace" name="htrace-core" rev="3.0.4" conf="hadoop23->master"/>
+    <dependency org="org.apache.htrace" name="htrace-core" rev="${htrace.version}" conf="hadoop23->master"/>
+    <dependency org="org.fusesource.leveldbjni" name="leveldbjni-all" rev="${leveldbjni.version}"
+      conf="hadoop23->master"/>
     <dependency org="org.cloudera.htrace" name="htrace-core" rev="2.00" conf="hbase95->master">
       <artifact name="htrace-core" type="jar"/>
     </dependency>
-    <dependency org="org.fusesource.leveldbjni" name="leveldbjni-all" rev="${leveldbjni.version}" conf="hadoop23->master"/>
+    <dependency org="com.lmax" name="disruptor" rev="3.3.0" conf="hbase95->master"/>
 
     <!-- for TestHBaseStorage -->
     <dependency org="com.github.stephenc.high-scale-lib" name="high-scale-lib" rev="${high-scale-lib.version}"
@@ -417,8 +428,6 @@
       conf="compile->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-common" rev="${hive.version}" changing="true"
       conf="compile->master" />
-    <dependency org="org.apache.hive.shims" name="hive-shims-common-secure" rev="${hive.version}" changing="true"
-      conf="compile->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-0.23" rev="${hive.version}" changing="true"
       conf="hadoop23->master" />
     <dependency org="org.apache.hive.shims" name="hive-shims-0.20S" rev="${hive.version}" changing="true"
@@ -427,6 +436,8 @@
       conf="test->master" />
     <dependency org="com.esotericsoftware.kryo" name="kryo" rev="${kryo.version}"
       conf="compile->master" />
+    <dependency org="org.apache.commons" name="commons-lang3" rev="${commons-lang3.version}"
+      conf="compile->master" />
 
     <dependency org="org.vafer" name="jdeb" rev="${jdeb.version}"
       conf="compile->master">
@@ -467,13 +478,18 @@
        conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
        conf="hadoop23->master"/>
+    <dependency org="org.apache.tez" name="tez-yarn-timeline-history-with-acls" rev="${tez.version}"
+       conf="hadoop23->master"/>
     <dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
       conf="hadoop23->master"/>
     <dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"
       conf="hadoop23->master"/>
     <dependency org="org.apache.commons" name="commons-math3" rev="${commons-math3.version}"
       conf="hadoop23->master"/>
-
+    <dependency org="org.apache.curator" name="curator-framework" rev="${curator.version}"
+      conf="hadoop23->master"/>
+    <dependency org="org.apache.curator" name="curator-client" rev="${curator.version}"
+      conf="hadoop23->master"/>
   </dependencies>
 </ivy-module>
 

Modified: pig/branches/spark/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Fri Mar  4 18:17:39 2016
@@ -20,6 +20,7 @@ apacherat.version=0.8
 asm.version=3.2
 automaton.version=1.11-8
 avro.version=1.7.5
+basjes-httpdlog-pigloader.version=2.4
 commons-beanutils.version=1.7.0
 commons-cli.version=1.2
 commons-codec.version=1.4
@@ -37,20 +38,20 @@ jersey.version=1.8
 checkstyle.version=4.2
 ivy.version=2.2.0
 jasper.version=6.1.14
-groovy.version=1.8.6
+groovy.version=2.4.5
 guava.version=11.0
 jersey-core.version=1.8
 hadoop-core.version=1.0.4
 hadoop-test.version=1.0.4
-hadoop-common.version=2.5.0
-hadoop-hdfs.version=2.5.0
-hadoop-mapreduce.version=2.5.0
+hadoop-common.version=2.6.0
+hadoop-hdfs.version=2.6.0
+hadoop-mapreduce.version=2.6.0
 hbase94.version=0.94.1
-hbase95.version=0.96.0-${hbase.hadoop.version}
+hbase95.version=0.98.12-${hbase.hadoop.version}
 hsqldb.version=1.8.0.10
-hive.version=0.14.0
+hive.version=1.2.1
 httpcomponents.version=4.1
-jackson.version=1.8.8
+jackson.version=1.9.13
 jackson-pig-3039-test.version=1.9.9
 javacc.version=4.2
 javax-inject.version=1
@@ -60,9 +61,8 @@ jdeb.version=0.8
 jdiff.version=1.0.9
 jettison.version=1.3.4
 jetty.version=6.1.26
-jetty-util.version=6.1.26
 jline.version=1.0
-joda-time.version=2.5
+joda-time.version=2.8.2
 jopt.version=4.1
 json-simple.version=1.1
 junit.version=4.11
@@ -73,7 +73,7 @@ rhino.version=1.7R2
 antlr.version=3.4
 stringtemplate.version=4.0.4
 log4j.version=1.2.16
-netty.version=3.2.2
+netty.version=3.6.6.Final
 rats-lib.version=0.5.1
 slf4j-api.version=1.6.1
 slf4j-log4j12.version=1.6.1
@@ -94,8 +94,11 @@ jsr311-api.version=1.1.1
 mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
-snappy-java.version=1.1.1.7
-tez.version=0.5.3
+snappy-java.version=1.1.0.1
+tez.version=0.7.0
 parquet-pig-bundle.version=1.2.3
 snappy.version=0.2
 leveldbjni.version=1.8
+curator.version=2.6.0
+htrace.version=3.1.0-incubating
+commons-lang3.version=3.1

Modified: pig/branches/spark/ivy/pig-template.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/pig-template.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/ivy/pig-template.xml (original)
+++ pig/branches/spark/ivy/pig-template.xml Fri Mar  4 18:17:39 2016
@@ -175,5 +175,15 @@
         </exclusion>
       </exclusions>
     </dependency>
+    <dependency>
+      <groupId>org.apache.ivy</groupId>
+      <artifactId>ivy</artifactId>
+      <version>2.2.0</version>
+    </dependency>
+    <dependency>
+      <groupId>org.codehaus.groovy</groupId>
+      <artifactId>groovy-all</artifactId>
+      <version>1.8.6</version>
+    </dependency>
   </dependencies>
 </project>

Modified: pig/branches/spark/ivy/piggybank-template.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/piggybank-template.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/ivy/piggybank-template.xml (original)
+++ pig/branches/spark/ivy/piggybank-template.xml Fri Mar  4 18:17:39 2016
@@ -76,6 +76,11 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>nl.basjes.parse.httpdlog</groupId>
+      <artifactId>httpdlog-pigloader</artifactId>
+      <version>2.2</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
       <version>@version</version>

Modified: pig/branches/spark/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java (original)
+++ pig/branches/spark/lib-src/bzip2/org/apache/tools/bzip2r/CBZip2InputStream.java Fri Mar  4 18:17:39 2016
@@ -223,6 +223,10 @@ public class CBZip2InputStream extends I
 
     @Override
     public int read() throws IOException {
+        if (this.innerBsStream == null) {
+            throw new IOException("stream closed");
+        }
+
         if (streamEnd) {
             return -1;
         } else {
@@ -264,6 +268,18 @@ public class CBZip2InputStream extends I
         }
     }
 
+    @Override
+    public void close() throws IOException {
+        if (this.innerBsStream == null) {
+            return;
+        }
+        try {
+            innerBsStream.close();
+        } finally {
+            this.innerBsStream = null;
+        }
+    }
+
     /**
      * getPos is used by the caller to know when the processing of the current 
      * {@link InputSplit} is complete. In this method, as we read each bzip
@@ -291,7 +307,6 @@ public class CBZip2InputStream extends I
             magic4 = bsGetUChar();
             if (magic1 != 'B' || magic2 != 'Z' || 
                     magic3 != 'h' || magic4 < '1' || magic4 > '9') {
-                bsFinishedWithStream();
                 streamEnd = true;
                 return;
             }
@@ -308,7 +323,6 @@ public class CBZip2InputStream extends I
     
     private void initBlock(boolean searchForMagic) throws IOException {
         if (readCount >= readLimit) {
-            bsFinishedWithStream();
             streamEnd = true;
             return;
         }
@@ -408,7 +422,6 @@ public class CBZip2InputStream extends I
         	throw new IOException("Encountered additional bytes in the filesplit past the crc block. "
         			+ "Loading of concatenated bz2 files is not supported");
         }
-        bsFinishedWithStream();
         streamEnd = true;
     }
 
@@ -424,14 +437,6 @@ public class CBZip2InputStream extends I
         cadvise("CRC error");
     }
 
-    private void bsFinishedWithStream() {
-        if (this.innerBsStream != null) {
-            if (this.innerBsStream != System.in) {
-                this.innerBsStream = null;
-            }
-        }
-    }
-
     private void bsSetStream(FSDataInputStream f) {
         innerBsStream = f;
         bsLive = 0;

Modified: pig/branches/spark/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java (original)
+++ pig/branches/spark/shims/test/hadoop20/org/apache/pig/test/MiniCluster.java Fri Mar  4 18:17:39 2016
@@ -25,7 +25,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
 
 public class MiniCluster extends MiniGenericCluster {
     private static final File CONF_DIR = new File("build/classes");
@@ -95,4 +97,8 @@ public class MiniCluster extends MiniGen
         if (m_mr != null) { m_mr.shutdown(); }
             m_mr = null;
     }
+
+    static public Launcher getLauncher() {
+        return new MapReduceLauncher();
+    }
 }

Modified: pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Fri Mar  4 18:17:39 2016
@@ -120,6 +120,7 @@ public class TezMiniCluster extends Mini
             Configuration tez_conf = new Configuration(false);
             // TODO PIG-3659 - Remove this once memory management is fixed
             tez_conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_IO_SORT_MB, "20");
+            tez_conf.set(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, "false");
             tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");
             // Set to a lower value so that tests don't get stuck for long because of 1 AM running at a time
             tez_conf.set(TezConfiguration.TEZ_SESSION_AM_DAG_SUBMIT_TIMEOUT_SECS, "20");

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/basic.xml Fri Mar  4 18:17:39 2016
@@ -844,7 +844,7 @@ This will cause an error …</source>
    <title>Example: Outer Bag</title>
    <p>In this example A is a relation or bag of tuples. You can think of this bag as an outer bag.</p>
 <source>
-A = LOAD 'data' as (f1:int, f2:int, f3;int);
+A = LOAD 'data' as (f1:int, f2:int, f3:int);
 DUMP A;
 (1,2,3)
 (4,2,1)
@@ -1629,7 +1629,7 @@ a: Schema for a unknown
 <p>Having a deterministic schema is very powerful; however, sometimes it comes at the cost of performance. Consider the following example:</p>  
   
 <source>
-A = load ‘input’ as (x, y, z);
+A = load 'input' as (x, y, z);
 B = foreach A generate x+y;
 </source>
 
@@ -5120,7 +5120,7 @@ X = FILTER A BY (f1 matches '.*apache.*'
 <source>
 A = load 'students' as (name:chararray, age:int, gpa:float);
 B = foreach A generate (name, age);
-store B into ‘results’;
+store B into 'results';
 
 Input (students):
 joe smith  20  3.5
@@ -5138,7 +5138,7 @@ Output (results):
 <source>
 A = load 'students' as (name:chararray, age:int, gpa:float);
 B = foreach A generate {(name, age)}, {name, age};
-store B into ‘results’;
+store B into 'results';
 
 Input (students):
 joe smith  20  3.5
@@ -5156,7 +5156,7 @@ Output (results):
 <source>
 A = load 'students' as (name:chararray, age:int, gpa:float);
 B = foreach A generate [name, gpa];
-store B into ‘results’;
+store B into 'results';
 
 Input (students):
 joe smith  20  3.5
@@ -7522,7 +7522,7 @@ DUMP A;
 <source>
 A = LOAD 'myfile.txt' AS (f1:int, f2:int, f3:int);
 
-A = LOAD 'myfile.txt' USING PigStorage(‘\t’) AS (f1:int, f2:int, f3:int);
+A = LOAD 'myfile.txt' USING PigStorage('\t') AS (f1:int, f2:int, f3:int);
 
 DESCRIBE A;
 a: {f1: int,f2: int,f3: int}
@@ -8922,8 +8922,8 @@ B = FOREACH A GENERATE myFunc($0);
    
    
    <!-- =========================================================================== -->
-   <section id="register">
-   <title>REGISTER</title>
+   <section id="register-jar">
+   <title>REGISTER (a jar/script)</title>
    <p>Registers a JAR file so that the UDFs in the file can be used.</p>
    
    <section>
@@ -8995,7 +8995,222 @@ register jars/*.jar
 </source>
    </section>
    </section>
-   </section>  
 
+      <!-- =========================================================================== -->
+      <section id="register-artifact">
+
+        <title>REGISTER (an artifact)</title>
+
+        <p>
+          Instead of figuring out the dependencies manually, downloading them and registering each jar using the above
+          <a href="#register-jar">register command</a>, you can specify the artifact's coordinates and expect pig to automatically
+          fetch the required dependencies, download and register them.
+        </p>
+
+        <!-- Command Syntax-->
+        <section>
+          <title>Syntax</title>
+          <p>
+            To download an Artifact (and its dependencies), you need to specify the artifact's group, module and version following
+            the syntax shown below. This command will download the Jar specified and all its dependencies and load it into the
+            classpath.
+          </p>
+          <table>
+            <tr>
+              <td>
+                <p>REGISTER ivy://group:module:version?querystring</p>
+              </td>
+            </tr>
+          </table>
+        </section>
+
+        <!-- Terms -->
+        <section>
+          <title>Terms</title>
+          <table>
+            <tr>
+              <td>
+                <p>group</p>
+              </td>
+              <td>
+                <p>Which module group the module comes from. Translates directly to a Maven groupId or an Ivy Organization.</p>
+              </td>
+            </tr>
+            <tr>
+              <td>
+                <p>module</p>
+              </td>
+              <td>
+                <p>The name of the module to load. Translated directly to a Maven artifactId or an Ivy artifact.</p>
+              </td>
+            </tr>
+            <tr>
+              <td>
+                <p>version</p>
+              </td>
+              <td>
+                <p>The version of the module to use. You can specify a specific version or use "+" or "*" to use the latest version.</p>
+              </td>
+            </tr>
+            <tr>
+              <td>
+                <p>querystring</p>
+              </td>
+              <td>
+                <p>This will contain "&amp;" separated key-value pairs to help us exclude all or specific dependencies etc.</p>
+              </td>
+            </tr>
+          </table>
+        </section>
+
+        <section>
+          <title>Usage</title>
+
+          <p>
+            The Register artifact command is an extension to the above register command used to <a href="#register-jar">register a
+            jar</a>. In addition to registering a jar from a local system or from hdfs, you can now specify the coordinates of the
+            artifact and pig will download the artifact (and its dependencies if needed) from the configured repository.
+          </p>
+
+          <section>
+            <title>Parameters Supported in the Query String</title>
+
+            <ul>
+              <li>
+                <strong>Transitive</strong>
+                <p>
+                  Transitive helps specifying if you need the dependencies along with the registering jar. By setting transitive to
+                  false in the querystring we can tell pig to register only the artifact without its dependencies. This will
+                  download only the artifact specified and will not download the dependencies of the jar. The default value of
+                  transitive is true.
+                </p>
+                <strong>Syntax</strong>
+                <table>
+                  <tr>
+                    <td>
+                      <p>REGISTER ivy://org:module:version?transitive=false</p>
+                    </td>
+                  </tr>
+                </table>
+              </li>
+              <li>
+                <strong>Exclude</strong>
+                <p>
+                  While registering an artifact if you wish to exclude some dependencies you can specify them using the exclude
+                  key. Suppose you want to use a specific version of a dependent jar which doesn't match the version of the jar
+                  when automatically fetched, then you could exclude such dependencies by specifying a comma separated list of
+                  dependencies and register the dependent jar separately.
+                </p>
+                <strong>Syntax</strong>
+                <table>
+                  <tr>
+                    <td>
+                      <p>REGISTER ivy://org:module:version?exclude=org:mod,org:mod,...</p>
+                    </td>
+                  </tr>
+                </table>
+              </li>
+              <li>
+                <strong>Classifier</strong>
+                <p>
+                  Some maven dependencies need classifiers in order to be able to resolve. You can specify them using a classifier
+                  key.
+                </p>
+                <strong>Syntax</strong>
+                <table>
+                  <tr>
+                    <td>
+                      <p>REGISTER ivy://org:module:version?classifier=value</p>
+                    </td>
+                  </tr>
+                </table>
+              </li>
+            </ul>
+          </section>
+
+          <section>
+            <title>Other properties</title>
+
+            <ul>
+              <li>
+                <p>
+                  An optional pig property, pig.artifacts.download.location, can be used to configure the location where the
+                  artifacts should be downloaded. By default, they will be downloaded to ~/.groovy/grapes
+                </p>
+              </li>
+
+              <li>
+                <p>
+                  This command can be used or can replace the <a href="#register-jar">register jar</a> command wherever used
+                  including macros.<br></br>
+                </p>
+              </li>
+
+              <li>
+                <p>
+                  Group/Organization and Version are optional fields. In such cases you can leave them blank.<br></br>
+                </p>
+              </li>
+
+              <li>
+                <p>
+                  The repositories can be configured using an ivysettings file. Pig will search for an ivysettings.xml file
+                  in the following locations in order. PIG_CONF_DIR > PIG_HOME > Classpath<br></br>
+                </p>
+              </li>
+            </ul>
+          </section>
+        </section>
+
+        <!-- Examples-->
+        <section>
+          <title>Examples</title>
+
+          <ul>
+            <li>
+              <p>Registering an Artifact and all its dependencies.</p>
+              <source>
+                -- Both are the same<br></br>
+                REGISTER ivy://org.apache.avro:avro:1.5.1<br></br>
+                REGISTER ivy://org.apache.avro:avro:1.5.1?transitive=true</source>
+            </li>
+
+            <li>
+              <p>Registering an artifact without getting its dependencies.</p>
+              <source>
+               REGISTER ivy://org.apache.avro:avro:1.5.1?transitive=false</source>
+            </li>
+
+            <li>
+              <p>Registering the latest artifact.</p>
+              <source>
+                -- Both of the following syntaxes work.<br></br>
+                REGISTER ivy://org.apache.avro:avro:+<br></br>
+                REGISTER ivy://org.apache.avro:avro:*</source>
+            </li>
+
+            <li>
+              <p>Registering an artifact by excluding specific dependencies.</p>
+              <source>
+                REGISTER ivy://org.apache.pig:pig:0.10.0?exclude=commons-cli:commons-cli,commons-codec:commons-codec</source>
+            </li>
+
+            <li>
+              <p>Specifying a classifier</p>
+              <source>
+                REGISTER ivy://net.sf.json-lib:json-lib:2.4?classifier=jdk15</source>
+            </li>
+
+            <li>
+              <p>Registering an artifact without a group or organization. Just skip them.</p>
+              <source>
+                REGISTER ivy://:module:</source>
+            </li>
+          </ul>
+        </section>
+      </section>
+
+      <!-- =========================================================================== -->
+    </section>
   </body>
 </document>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/func.xml Fri Mar  4 18:17:39 2016
@@ -167,10 +167,10 @@ DUMP C;
                <p>AVG </p>
             </td>
             <td>
-               <p>long </p>
+               <p>double </p>
             </td>
             <td>
-               <p>long </p>
+               <p>double </p>
             </td>
             <td>
                <p>double </p>
@@ -294,6 +294,87 @@ team_parkyearslist = FOREACH (GROUP team
   </section>
 </section>
 
+<section id="bloom">
+  <title>Bloom</title>
+  <p>Bloom filters are a common way to select a limited set of records before
+    moving data for a join or other heavy weight operation.</p>
+
+  <section>
+    <title>Syntax</title>
+    <table>
+      <tr>
+        <td>
+          <p>BuildBloom(String hashType, String mode, String vectorSize, String nbHash)</p>
+        </td>
+      </tr>
+      <tr>
+        <td>
+          <p>Bloom(String filename)</p>
+        </td>
+      </tr>
+  </table></section>
+
+  <section>
+    <title>Terms</title>
+    <table>
+      <tr>
+	<td><p>hashtype</p></td>
+        <td><p>The type of hash function to use. Valid values for the hash functions are 'jenkins' and 'murmur'.</p></td>
+      </tr>
+      <tr>
+	<td><p>mode</p></td>
+        <td><p>Will be ignored, though by convention it should be "fixed" or "fixedsize"</p></td>
+      </tr>
+      <tr>
+	<td><p>vectorSize</p></td>
+        <td><p>The number of bits in the bloom filter.</p></td>
+      </tr>
+      <tr>
+	<td><p>nbHash</p></td>
+        <td><p>The number of hash functions used in constructing the bloom filter.</p></td>
+      </tr>
+      <tr>
+	<td><p>filename</p></td>
+        <td><p>File containing the serialized Bloom filter.</p></td>
+      </tr>
+    </table>
+    <p>See <a href="http://en.wikipedia.org/wiki/Bloom_filter">Bloom Filter</a> for
+      a discussion of how to select the number of bits and the number of hash
+      functions.
+    </p>
+  </section>
+
+  <section>
+    <title>Usage</title>
+    <p>Bloom filters are a common way to select a limited set of records before
+      moving data for a join or other heavy weight operation. For example, if
+      one wanted to join a very large data set L with a smaller set S, and it
+      was known that the number of keys in L that will match with S is small,
+      building a bloom filter on S and then applying it to L before the join
+      can greatly reduce the number of records from L that have to be moved
+      from the map to the reduce, thus speeding the join.
+    </p>
+    <p>The implementation uses Hadoop's bloom filters
+      (org.apache.hadoop.util.bloom.BloomFilter) internally.
+    </p>
+  </section>
+  <section>
+    <title>Examples</title>
+<source>
+  define bb BuildBloom('128', '3', 'jenkins');
+  small = load 'S' as (x, y, z);
+  grpd = group small all;
+  fltrd = foreach grpd generate bb(small.x);
+  store fltrd in 'mybloom';
+  exec;
+  define bloom Bloom('mybloom');
+  large = load 'L' as (a, b, c);
+  flarge = filter large by bloom(L.a);
+  joined = join small by x, flarge by a;
+  store joined into 'results';
+</source>
+  </section>
+</section>
    
    <!-- ++++++++++++++++++++++++++++++++++++++++++++++ --> 
    <section id="concat">
@@ -634,8 +715,8 @@ SSN = load 'ssn.txt' using PigStorage()
 
 SSN_NAME = load 'students.txt' using PigStorage() as (ssn:long, name:chararray);
 
-/* do a left outer join of SSN with SSN_Name */
-X = JOIN SSN by ssn LEFT OUTER, SSN_NAME by ssn;
+/* do a cogroup of SSN with SSN_Name */
+X = COGROUP SSN by ssn, SSN_NAME by ssn;
 
 /* only keep those ssn's for which there is no name */
 Y = filter X by IsEmpty(SSN_NAME);
@@ -915,7 +996,8 @@ DUMP X;
    
    <section id="plucktuple">
      <title>PluckTuple</title>
-     <p>Allows the user to specify a string prefix, and then filter for the columns in a relation that begin with that prefix.</p>
+     <p>Allows the user to specify a string prefix, and then filter for the columns in a relation that begin with that prefix or match that regex pattern. Optionally, include flag 'false' to filter
+      for columns that do not match that prefix or match that regex pattern</p>
      
      <section>
        <title>Syntax</title>
@@ -923,6 +1005,7 @@ DUMP X;
          <tr>
            <td>
              <p>DEFINE pluck PluckTuple(expression1)</p>
+             <p>DEFINE pluck PluckTuple(expression1,expression3)</p>
              <p>pluck(expression2)</p>
            </td>
          </tr>
@@ -937,7 +1020,7 @@ DUMP X;
              <p>expression1</p>
            </td>
            <td>
-             <p>A prefix to pluck by</p>
+             <p>A prefix to pluck by or an regex pattern to pluck by</p>
            </td>
 	 </tr>
 	 <tr>
@@ -948,6 +1031,14 @@ DUMP X;
              <p>The fields to apply the pluck to, usually '*'</p>
            </td>
 	 </tr>
+    <tr>
+           <td>
+             <p>expression3</p>
+           </td>
+           <td>
+             <p>A boolean flag to indicate whether to include or exclude matching columns</p>
+           </td>
+   </tr>
        </table>
      </section>
      
@@ -964,6 +1055,10 @@ describe c;
 c: {a::x: bytearray,a::y: bytearray,b::x: bytearray,b::y: bytearray}
 describe d;
 d: {plucked::a::x: bytearray,plucked::a::y: bytearray}
+DEFINE pluckNegative PluckTuple('a::','false');
+d = foreach c generate FLATTEN(pluckNegative(*));
+describe d;
+d: {plucked::b::x: bytearray,plucked::b::y: bytearray}
 </source>
      </section>
    </section>
@@ -1369,23 +1464,20 @@ DUMP B;
 <p>To work with gzip compressed files, input/output files need to have a .gz extension. Gzipped files cannot be split across multiple maps; this means that the number of maps created is equal to the number of part files in the input location.</p>
 
 <source>
-A = load ‘myinput.gz’;
-store A into ‘myoutput.gz’; 
+A = load 'myinput.gz';
+store A into 'myoutput.gz';
 </source>
 
 <p>To work with bzip compressed files, the input/output files need to have a .bz or .bz2 extension. Because the compression is block-oriented, bzipped files can be split across multiple maps.</p>
 
 <source>
-A = load ‘myinput.bz’;
-store A into ‘myoutput.bz’; 
+A = load 'myinput.bz';
+store A into 'myoutput.bz';
 </source>
 
-<p>Note: PigStorage and TextLoader correctly read compressed files as long as they are NOT CONCATENATED FILES generated in this manner: </p>
+<p>Note: PigStorage and TextLoader correctly read compressed files as long as they are NOT CONCATENATED bz/bz2 FILES generated in this manner: </p>
   <ul>
       <li>
-         <p>cat *.gz > text/concat.gz</p>
-      </li>
-      <li>
          <p>cat *.bz > text/concat.bz </p>
       </li>
       <li>
@@ -1393,7 +1485,7 @@ store A into ‘myoutput.bz’;
       </li>
    </ul>
 <p></p>
-<p>If you use concatenated gzip or bzip files with your Pig jobs, you will NOT see a failure but the results will be INCORRECT.</p>
+<p>If you use concatenated bzip files with your Pig jobs, you will NOT see a failure but the results will be INCORRECT.</p>
 <p></p>
 
 </section>
@@ -1525,7 +1617,7 @@ dump X;
    <table>
        <tr>
             <td>
-               <p>JsonLoader( [‘schema’] ) </p>
+               <p>JsonLoader( ['schema'] ) </p>
             </td>
         </tr> 
         <tr>
@@ -1648,11 +1740,11 @@ STORE X INTO 'output' USING PigDump();
                <p id="pigstorage-options">'options'</p>
             </td>
             <td>
-               <p>A string that contains space-separated options (‘optionA  optionB  optionC’)</p>
+               <p>A string that contains space-separated options ('optionA  optionB  optionC')</p>
                <p>Currently supported options are:</p>
                <ul>
-                    <li>(‘schema’) - Stores the schema of the relation using a hidden JSON file.</li>
-                    <li>(‘noschema’) - Ignores a stored schema during the load.</li>
+                    <li>('schema') - Stores the schema of the relation using a hidden JSON file.</li>
+                    <li>('noschema') - Ignores a stored schema during the load.</li>
                     <li>('tagsource') - (deprecated, Use tagPath instead) Add a first column indicates the input file of the record.</li>
                     <li>('tagPath') - Add a first column indicates the input path of the record.</li>
                     <li>('tagFile') - Add a first column indicates the input file name of the record.</li>
@@ -1863,6 +1955,8 @@ A = LOAD 'data' USING TextLoader();
                     less than this value</li>
                 <li>-timestamp=timestamp Return cell values that have a creation timestamp equal to
                     this value</li>
+                <li>-includeTimestamp=Record will include the timestamp after the rowkey on store (rowkey, timestamp, ...)</li>
+                <li>-includeTombstone=Record will include a tombstone marker on store after the rowKey and timestamp (if included) (rowkey, [timestamp,] tombstone, ...)</li>
                </ul>
             </td>
          </tr>
@@ -1928,7 +2022,7 @@ STORE A INTO 'hbase://users_table' USING
    <table>
        <tr>
             <td>
-               <p>Avrostorage(['schema|record name'], ['options'])</p>
+               <p>AvroStorage(['schema|record name'], ['options'])</p>
             </td>
          </tr>
    </table>
@@ -5934,7 +6028,7 @@ In this example, student names (type cha
  <source>
 A = load 'students' as (name:chararray, age:int, gpa:float);
 B = foreach A generate TOMAP(name, gpa);
-store B into ‘results’;
+store B into 'results';
 
 Input (students)
 joe smith 20 3.5
@@ -6042,6 +6136,74 @@ bottomResults = FOREACH D {
 </section>
 </section>
 <!-- End Other Functions -->
-
+<!-- ======================================================== -->
+<!-- ======================================================== -->
+<!-- Other Functions -->
+<section id="hive-udf">
+<title>Hive UDF</title>
+<p>Pig invokes all types of Hive UDF, including UDF, GenericUDF, UDAF, GenericUDAF and GenericUDTF. Depending on the Hive UDF you want to use, you need to declare it in Pig with HiveUDF(handles UDF and GenericUDF), HiveUDAF(handles UDAF and GenericUDAF), HiveUDTF(handles GenericUDTF).</p>
+  <section>
+    <title>Syntax</title>
+    <p>HiveUDF, HiveUDAF, HiveUDTF share the same syntax.</p>
+    <table>
+      <tr>
+        <td>
+          <p>HiveUDF(name[, constant parameters])</p>
+        </td>
+      </tr>
+    </table>
+  </section>
+  <section>
+    <title>Terms</title>
+    <table>
+      <tr>
+        <td>
+          <p>name</p>
+        </td>
+        <td>
+          <p>Hive UDF name. This can be a fully qualified class name of the Hive UDF/UDTF/UDAF class, or a registered short name in Hive FunctionRegistry (most Hive builtin UDF does that)</p>
+        </td>
+      </tr>
+      <tr>
+        <td>
+          <p>constant parameters</p>
+        </td>
+        <td>
+          <p>Optional tuple representing constant parameters of a Hive UDF/UDTF/UDAF. If Hive UDF requires a constant parameter, there is no other way Pig can pass that information to Hive, since Pig schema does not carry the information whether a parameter is constant or not. Null item in the tuple means this field is not a constant. Non-null item represents a constant field. Data type for the item is determined by Pig contant parser.</p>
+        </td>
+      </tr>
+    </table>
+  </section>
+  <section>
+  <title>Example</title>
+  <p>HiveUDF</p>
+  <source>
+define sin HiveUDF('sin');
+A = LOAD 'student' as (name:chararray, age:int, gpa:double);
+B = foreach A generate sin(gpa);
+  </source>
+  <p>HiveUDTF</p>
+  <source>
+define explode HiveUDTF('explode');
+A = load 'mydata' as (a0:{(b0:chararray)});
+B = foreach A generate flatten(explode(a0));
+  </source>
+  <p>HiveUDAF</p>
+  <source>
+define avg HiveUDAF('avg');
+A = LOAD 'student' as (name:chararray, age:int, gpa:double);
+B = group A by name;
+C = foreach B generate group, avg(A.age);
+  </source>
+  </section>
+  <p>HiveUDAF with constant parameter</p>
+<source>
+define in_file HiveUDF('in_file', '(null, "names.txt")');
+A = load 'student' as (name:chararray, age:long, gpa:double);
+B = foreach A generate in_file(name, 'names.txt');
+</source>
+<p>In this example, we pass (null, "names.txt") to the construct of UDF in_file, meaning the first parameter is regular, the second parameter is a constant. names.txt can be double quoted (unlike other Pig syntax), or quoted in \'. Note we need to pass 'names.txt' again in line 3. This looks stupid but we need to do this to fill the semantic gap between Pig and Hive. We need to pass the constant in the data pipeline in line 3, which is similar Pig UDF. Initialization code in Hive UDF takes ObjectInspector, which capture the data type and whether or not the parameter is a constant. However, initialization code in Pig takes schema, which only capture the former. We need to use additional mechanism (construct parameter) to convey the later.</p>
+<p>Note: A few Hive 0.14 UDF contains bug which affects Pig and are fixed in Hive 1.0. Here is a list: compute_stats, context_ngrams, count, ewah_bitmap, histogram_numeric, collect_list, collect_set, ngrams, case, in, named_struct, stack, percentile_approx.</p>
+</section>
   </body>
 </document>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/perf.xml Fri Mar  4 18:17:39 2016
@@ -47,10 +47,11 @@
   <section id="auto-parallelism">
     <title>Automatic parallelism</title>
     <p>Just like MapReduce, if user specify "parallel" in their Pig statement, or user define default_parallel in Tez mode, Pig will honor it (the only exception is if user specify a parallel which is apparently too low, Pig will override it) </p>
-    <p>If user specify neither "parallel" or "default_parallel", Pig will use automatic parallelism. In MapReduce, Pig submit one MapReduce job a time and before submiting a job, Pig has chance to automatically set reduce parallelism based on the size of input file. On the contrary, Tez submit a DAG as a unit and automatic parallelism is managed in two parts</p>
+    <p>If user specify neither "parallel" or "default_parallel", Pig will use automatic parallelism. In MapReduce, Pig submit one MapReduce job a time and before submiting a job, Pig has chance to automatically set reduce parallelism based on the size of input file. On the contrary, Tez submit a DAG as a unit and automatic parallelism is managed in three parts</p>
     <ul>
     <li>Before submiting a DAG, Pig estimate parallelism of each vertex statically based on the input file size of the DAG and the complexity of the pipeline of each vertex</li>
-    <li>At runtime, Tez adjust vertex parallelism dynamically based on the input data volume of the vertex. Note currently Tez can only decrease the parallelism dynamically not increase. So in step 1, Pig overestimate the parallelism</li>
+    <li>When DAG progress, Pig adjust the parallelism of vertexes with the best knowledge available at that moment (Pig grace paralellism)</li>
+    <li>At runtime, Tez adjust vertex parallelism dynamically based on the input data volume of the vertex. Note currently Tez can only decrease the parallelism dynamically not increase. So in step 1 and 2, Pig overestimate the parallelism</li>
     </ul>
     <p>The following parameter control the behavior of automatic parallelism in Tez (share with MapReduce):</p>
 <source>
@@ -492,7 +493,7 @@ Gtab = .... aggregation function
 STORE Gtab INTO '/user/vxj/finalresult2';
 </source>
 
-<p>To make the script works, add the exec statement.  </p>
+<p>To make the script work, add the exec statement. </p>
 
 <source>
 A = LOAD '/user/xxx/firstinput' USING PigStorage();
@@ -517,6 +518,11 @@ Ftab = group ....
 Gtab = .... aggregation function
 STORE Gtab INTO '/user/vxj/finalresult2';
 </source>
+
+<p>If the STORE and LOAD both had exact matching file paths, Pig will recognize the implicit dependency
+and launch two different mapreduce jobs/Tez DAGs with the second job depending on the output of the first one. 
+exec is not required to be specified in that case.</p>
+
 </section>
 </section>
 </section>
@@ -978,11 +984,11 @@ B = GROUP A BY t PARALLEL 18;
 <p>In this example all the MapReduce jobs that get launched use 20 reducers.</p>
 <source>
 SET default_parallel 20;
-A = LOAD ‘myfile.txt’ USING PigStorage() AS (t, u, v);
+A = LOAD 'myfile.txt' USING PigStorage() AS (t, u, v);
 B = GROUP A BY t;
 C = FOREACH B GENERATE group, COUNT(A.t) as mycount;
 D = ORDER C BY mycount;
-STORE D INTO ‘mysortedcount’ USING PigStorage();
+STORE D INTO 'mysortedcount' USING PigStorage();
 </source>
 </section>
 
@@ -1286,9 +1292,9 @@ C = JOIN A BY a1, B BY b1, C BY c1 USING
 <li>Data must come directly from either a Load or an Order statement.</li>
 <li>There may be filter statements and foreach statements between the sorted data source and the join statement. The foreach statement should meet the following conditions: 
   <ul>
-    <li>There should be no UDFs in the foreach statement. </li>
     <li>The foreach statement should not change the position of the join keys. </li>
-    <li>There should be no transformation on the join keys which will change the sort order. </li>
+    <li>There should be no transformation on the join keys which will change the sort order.</li>
+    <li>UDFs also have to adhere to the previous condition and should not transform the JOIN keys in a way that would change the sort order.</li>
   </ul>
 </li>
 <li>Data must be sorted on join keys in ascending (ASC) order on both sides.</li>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/pig-index.xml Fri Mar  4 18:17:39 2016
@@ -819,7 +819,7 @@
 
 <p><a href="func.html#regex-extract-all">REGEX_EXTRACT_ALL</a> function</p>
 
-<p><a href="basic.html#register">REGISTER</a> statement</p>
+<p><a href="basic.html#register-jar">REGISTER</a> statement</p>
 
 <p>regular expressions. <em>See</em> pattern matching</p>
 

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri Mar  4 18:17:39 2016
@@ -249,7 +249,7 @@ grunt>
 
 A = load 'passwd' using PigStorage(':');  -- load the passwd file 
 B = foreach A generate $0 as id;  -- extract the user IDs 
-store B into ‘id.out’;  -- write the results to a file name id.out
+store B into 'id.out';  -- write the results to a file name id.out
 </source>
 
 <p><strong>Local Mode</strong></p>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/tabs.xml Fri Mar  4 18:17:39 2016
@@ -32,6 +32,6 @@
   -->
   <tab label="Project" href="http://hadoop.apache.org/pig/" type="visible" /> 
   <tab label="Wiki" href="http://wiki.apache.org/pig/" type="visible" /> 
-  <tab label="Pig 0.14.0 Documentation" dir="" type="visible" /> 
+  <tab label="Pig 0.16.0 Documentation" dir="" type="visible" /> 
 
 </tabs>

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/udf.xml Fri Mar  4 18:17:39 2016
@@ -1752,7 +1752,7 @@ function helloworld() {
     return 'Hello, World';
 }
     
-complex.outputSchema = "word:chararray,num:long";
+complex.outputSchema = "(word:chararray,num:long)";
 function complex(word){
     return {word:word, num:word.length};
 }
@@ -1760,8 +1760,8 @@ function complex(word){
  
 <p>This Pig script registers the JavaScript UDF (udf.js).</p>
 <source>
- register ‘udf.js’ using javascript as myfuncs; 
-A = load ‘data’ as (a0:chararray, a1:int);
+register 'udf.js' using javascript as myfuncs; 
+A = load 'data' as (a0:chararray, a1:int);
 B = foreach A generate myfuncs.helloworld(), myfuncs.complex(a0);
 ... ... 
 </source>

Modified: pig/branches/spark/src/org/apache/pig/EvalFunc.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/EvalFunc.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/EvalFunc.java (original)
+++ pig/branches/spark/src/org/apache/pig/EvalFunc.java Fri Mar  4 18:17:39 2016
@@ -362,4 +362,11 @@ public abstract class EvalFunc<T>  {
     public boolean allowCompileTimeCalculation() {
         return false;
     }
+
+    public boolean needEndOfAllInputProcessing() {
+        return false;
+    }
+
+    public void setEndOfAllInput(boolean endOfAllInput) {
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java (original)
+++ pig/branches/spark/src/org/apache/pig/JVMReuseImpl.java Fri Mar  4 18:17:39 2016
@@ -22,29 +22,69 @@ import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.classification.InterfaceStability;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.SpillableMemoryManager;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
 public class JVMReuseImpl {
 
     private static Log LOG = LogFactory.getLog(JVMReuseImpl.class);
 
     public void cleanupStaticData() {
+
+        // Calling Pig builtin ones directly without reflection for optimization
+        // and to reduce probability of NPE in PIG-4418
+        SpillableMemoryManager.staticDataCleanup();
+        PhysicalOperator.staticDataCleanup();
+        PigContext.staticDataCleanup();
+        PigGenericMapReduce.staticDataCleanup();
+        PigStatusReporter.staticDataCleanup();
+        PigCombiner.Combine.staticDataCleanup();
+
+        String className = null;
+        String msg = null;
         List<Method> staticCleanupMethods = JVMReuseManager.getInstance()
                 .getStaticDataCleanupMethods();
         for (Method m : staticCleanupMethods) {
             try {
-                String msg = "Invoking method " + m.getName() + " in class "
-                        + m.getDeclaringClass().getName() + " for static data cleanup";
-                if (m.getDeclaringClass().getName().startsWith("org.apache.pig")) {
+                className = m.getDeclaringClass() == null ? "anonymous" : m.getDeclaringClass().getName();
+                msg = "Invoking method " + m.getName() + " in class "
+                        + className + " for static data cleanup";
+                if (className.startsWith("org.apache.pig")) {
                     LOG.debug(msg);
                 } else {
                     LOG.info(msg);
                 }
                 m.invoke(null);
+                msg = null;
             } catch (Exception e) {
-                throw new RuntimeException("Error while invoking method "
-                        + m.getName() + " in class " + m.getDeclaringClass().getName()
-                        + " for static data cleanup", e);
+                LOG.error("Exception while calling static methods:"
+                        + getMethodNames(staticCleanupMethods) + ". " + msg, e);
+                throw new RuntimeException("Error while " + msg, e);
+            }
+        }
+    }
+
+    private String getMethodNames(List<Method> staticCleanupMethods) {
+        StringBuilder sb = new StringBuilder();
+        for (Method m : staticCleanupMethods) {
+            if (m == null) {
+                sb.append("null,");
+            } else {
+                sb.append(m.getDeclaringClass() == null ? "anonymous" : m.getDeclaringClass().getName());
+                sb.append(".").append(m.getName()).append(",");
             }
         }
+        if (sb.length() > 1) {
+            sb.deleteCharAt(sb.length() - 1);
+        }
+        return sb.toString();
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/Main.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/Main.java (original)
+++ pig/branches/spark/src/org/apache/pig/Main.java Fri Mar  4 18:17:39 2016
@@ -28,8 +28,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
+import java.io.Reader;
 import java.io.StringReader;
-import java.io.Writer;
+import java.net.URL;
+import java.nio.charset.Charset;
 import java.text.ParseException;
 import java.util.AbstractList;
 import java.util.ArrayList;
@@ -86,6 +88,10 @@ import org.joda.time.Period;
 import org.joda.time.PeriodType;
 import org.joda.time.format.PeriodFormat;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.io.Closeables;
+
 /**
  * Main class for Pig engine.
  */
@@ -742,22 +748,7 @@ public class Main {
             logLevel = Level.toLevel(logLevelString, Level.INFO);
         }
 
-        Properties props = new Properties();
-        FileReader propertyReader = null;
-        if (log4jconf != null) {
-            try {
-                propertyReader = new FileReader(log4jconf);
-                props.load(propertyReader);
-            }
-            catch (IOException e)
-            {
-                System.err.println("Warn: Cannot open log4j properties file, use default");
-            }
-            finally
-            {
-                if (propertyReader != null) try {propertyReader.close();} catch(Exception e) {}
-            }
-        }
+        final Properties props = log4jConfAsProperties(log4jconf);
         if (props.size() == 0) {
             props.setProperty("log4j.logger.org.apache.pig", logLevel.toString());
             if((logLevelString = System.getProperty("pig.logfile.level")) == null){
@@ -790,8 +781,37 @@ public class Main {
         pigContext.setDefaultLogLevel(logLevel);
     }
 
+   @VisibleForTesting
+   static Properties log4jConfAsProperties(String log4jconf) {
+       final Properties properties = new Properties();
+       if (!Strings.isNullOrEmpty(log4jconf)) {
+           Reader propertyReader = null;
+           try {
+               final File file = new File(log4jconf);
+               if (file.exists()) {
+                   propertyReader = new FileReader(file);
+                   properties.load(propertyReader);
+                   log.info("Loaded log4j properties from file: " + file);
+               } else {
+                   final URL resource = Main.class.getClassLoader().getResource(log4jconf);
+                   if (resource != null) {
+                       propertyReader = new InputStreamReader(resource.openStream(), Charset.forName("UTF-8"));
+                       properties.load(propertyReader);
+                       log.info("Loaded log4j properties from resource: " +  resource);
+                   } else {
+                       log.warn("No file or resource found by the name: " + log4jconf);
+                   }
+               }
+           } catch (IOException e)  {
+               log.warn("Cannot open log4j properties file " + log4jconf + ", using default");
+           } finally {
+               Closeables.closeQuietly(propertyReader);
+           }
+       }
+       return properties;
+  }
 
-    private static List<String> fetchRemoteParamFiles(List<String> paramFiles, Properties properties)
+  private static List<String> fetchRemoteParamFiles(List<String> paramFiles, Properties properties)
             throws IOException {
         List<String> paramFiles2 = new ArrayList<String>();
         for (String param: paramFiles) {
@@ -895,7 +915,6 @@ public class Main {
             System.out.println("            GroupByConstParallelSetter - Force parallel 1 for \"group all\" statement");
             System.out.println("            PartitionFilterOptimizer - Pushdown partition filter conditions to loader implementing LoadMetaData");
             System.out.println("            PredicatePushdownOptimizer - Pushdown filter predicates to loader implementing LoadPredicatePushDown");
-            System.out.println("            RollupHIIOptimizer - Apply Rollup HII optimization");
             System.out.println("            All - Disable all optimizations");
             System.out.println("        All optimizations listed here are enabled by default. Optimization values are case insensitive.");
             System.out.println("    -v, -verbose - Print all error messages to screen");
@@ -943,10 +962,12 @@ public class Main {
             System.out.println("    Miscellaneous:");
             System.out.println("        exectype=mapreduce|tez|local; default is mapreduce. This property is the same as -x switch");
             System.out.println("        pig.additional.jars.uris=<comma seperated list of jars>. Used in place of register command.");
-            System.out.println("        udf.import.list=<comma seperated list of imports>. Used to avoid package names in UDF.");
+            System.out.println("        udf.import.list=<Colon seperated list of imports>. Used to avoid package names in UDF.");
             System.out.println("        stop.on.failure=true|false; default is false. Set to true to terminate on the first error.");
             System.out.println("        pig.datetime.default.tz=<UTC time offset>. e.g. +08:00. Default is the default timezone of the host.");
             System.out.println("            Determines the timezone used to handle datetime datatype and UDFs. ");
+            System.out.println("        pig.artifacts.download.location=<path to download artifacts>; default is ~/.groovy/grapes");
+            System.out.println("            Determines the location to download the artifacts when registering jars using ivy coordinates.");
             System.out.println("Additionally, any Hadoop property can be specified.");
     }
 

Modified: pig/branches/spark/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConfiguration.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConfiguration.java Fri Mar  4 18:17:39 2016
@@ -60,6 +60,22 @@ public class PigConfiguration {
      * This key is used to enable or disable union optimization in tez. True by default
      */
     public static final String PIG_TEZ_OPT_UNION = "pig.tez.opt.union";
+    /**
+     * These keys are used to enable or disable tez union optimization for
+     * specific StoreFuncs so that optimization is only applied to StoreFuncs
+     * that do not hard part file names and honor mapreduce.output.basename and
+     * is turned of for those that do not. Refer PIG-4649
+     */
+    public static final String PIG_TEZ_OPT_UNION_SUPPORTED_STOREFUNCS = "pig.tez.opt.union.supported.storefuncs";
+    public static final String PIG_TEZ_OPT_UNION_UNSUPPORTED_STOREFUNCS = "pig.tez.opt.union.unsupported.storefuncs";
+
+    /**
+     * Pig only reads once from datasource for LoadFuncs specified here during sort instead of
+     * loading once for sampling and loading again for partitioning.
+     * Used to avoid hitting external non-filesystem datasources like HBase and Accumulo twice.
+     * Honored only by Pig on Tez now.
+     */
+    public static final String PIG_SORT_READONCE_LOADFUNCS = "pig.sort.readonce.loadfuncs";
 
     /**
      * Boolean value to enable or disable partial aggregation in map. Disabled by default
@@ -97,6 +113,13 @@ public class PigConfiguration {
     public static final String PIG_SKEWEDJOIN_REDUCE_MEMUSAGE = "pig.skewedjoin.reduce.memusage";
 
     /**
+     * Memory available (in bytes) in reduce when calculating memory available for skewed join.
+     * By default, it is set to Runtime.getRuntime().maxMemory(). Override it only
+     * for debug purpose
+     */
+    public static final String PIG_SKEWEDJOIN_REDUCE_MEM = "pig.skewedjoin.reduce.mem";
+
+    /**
      * This key used to control the maximum size loaded into
      * the distributed cache when doing fragment-replicated join
      */
@@ -117,6 +140,25 @@ public class PigConfiguration {
      * This key is used to configure auto parallelism in tez. Default is true.
      */
     public static final String PIG_TEZ_AUTO_PARALLELISM = "pig.tez.auto.parallelism";
+    /**
+     * This key is used to configure grace parallelism in tez. Default is true.
+     */
+    public static final String PIG_TEZ_GRACE_PARALLELISM = "pig.tez.grace.parallelism";
+
+    /**
+     * This key is used to configure compression for the pig input splits which
+     * are not FileSplit. Default is false
+     */
+    public static final String PIG_COMPRESS_INPUT_SPLITS = "pig.compress.input.splits";
+    public static final boolean PIG_COMPRESS_INPUT_SPLITS_DEFAULT = false;
+
+    /**
+     * Serialize input splits to disk if the input splits size exceeds a
+     * threshold to avoid hitting default RPC transfer size limit of 64MB.
+     * Default is 33554432 (32MB)
+     */
+    public static final String PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD = "pig.tez.input.splits.mem.threshold";
+    public static final int PIG_TEZ_INPUT_SPLITS_MEM_THRESHOLD_DEFAULT = 33554432;
 
     // Pig UDF profiling settings
     /**
@@ -268,6 +310,26 @@ public class PigConfiguration {
     public static final String PIG_USER_CACHE_LOCATION = "pig.user.cache.location";
 
     /**
+     * Replication factor for files in pig jar cache
+     */
+    public static final String PIG_USER_CACHE_REPLICATION = "pig.user.cache.replication";
+
+    /**
+     * Boolean value used to enable or disable error handling for storers
+     */
+    public static final String PIG_ALLOW_STORE_ERRORS = "pig.allow.store.errors";
+
+    /**
+     * Controls the minimum number of errors
+     */
+    public static final String PIG_ERRORS_MIN_RECORDS = "pig.errors.min.records";
+
+    /**
+     * Set the threshold for percentage of errors
+     */
+    public static final String PIG_ERROR_THRESHOLD_PERCENT = "pig.error.threshold.percent";
+    
+    /**
      * Comma-delimited entries of commands/operators that must be disallowed.
      * This is a security feature to be used by administrators to block use of
      * commands by users. For eg, an admin might like to block all filesystem
@@ -295,6 +357,16 @@ public class PigConfiguration {
      */
     public static final String PIG_DATETIME_DEFAULT_TIMEZONE = "pig.datetime.default.tz";
 
+    /**
+     * Using hadoop's TextInputFormat for reading bzip input instead of using Pig's Bzip2TextInputFormat. True by default
+     * (only valid for 0.23/2.X)
+     */
+    public static final String PIG_BZIP_USE_HADOOP_INPUTFORMAT = "pig.bzip.use.hadoop.inputformat";
+
+    /**
+     * This key is used to set the download location when registering an artifact using ivy coordinate
+     */
+    public static final String PIG_ARTIFACTS_DOWNLOAD_LOCATION = "pig.artifacts.download.location";
 
     // Pig on Tez runtime settings
     /**

Modified: pig/branches/spark/src/org/apache/pig/PigConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigConstants.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigConstants.java Fri Mar  4 18:17:39 2016
@@ -59,45 +59,4 @@ public class PigConstants {
     public static final String TIME_UDFS_ELAPSED_TIME_COUNTER = "approx_microsecs";
 
     public static final String TASK_INDEX = "mapreduce.task.index";
-
-    /**
-     * This parameter is used to check if the rollup is optimizable or not after going
-     * through the RollupHIIOptimizer
-     */
-    public static final String PIG_HII_ROLLUP_OPTIMIZABLE = "pig.hii.rollup.optimizable";
-
-    /**
-     * This parameter stores the value of the pivot position. If the rollup is not optimizable
-     * this value will be -1; If the rollup is optimizable: if the user did specify the pivot
-     * in the rollup clause, this parameter will get that value; if the user did not specify
-     * the pivot in the rollup clause, this parameter will get the value of the median position
-     * of the fields in the rollup clause
-     */
-    public static final String PIG_HII_ROLLUP_PIVOT = "pig.hii.rollup.pivot";
-
-    /**
-     * This parameter stores the index of the first field involves in the rollup (or the first field
-     * involves in the rollup after changing the position of rollup to the end in case of having cube)
-     */
-    public static final String PIG_HII_ROLLUP_FIELD_INDEX = "pig.hii.rollup.field.index";
-
-    /**
-     * This parameter stores the index of the first field involves in the rollup before
-     * changing the position of rollup to the end in case of having cube
-     */
-    public static final String PIG_HII_ROLLUP_OLD_FIELD_INDEX = "pig.hii.rollup.old.field.index";
-
-    /**
-     * This parameter stores the size of total fields which involve in the CUBE clause. For example, we
-     * have two CUBE clause:
-     * B = CUBE A BY CUBE(year, month, day), ROLLUP(hour, minute, second);
-     * B = CUBE A BY ROLLUP(year, month, day, hour, minute, second);
-     * So this parameter will be 6 at both cases.
-     */
-    public static final String PIG_HII_NUMBER_TOTAL_FIELD = "pig.hii.number.total.field";
-
-    /**
-     * This parameter stores the number of algebraic functions that used after rollup.
-     */
-    public static final String PIG_HII_NUMBER_ALGEBRAIC = "pig.hii.number.algebraic";
 }
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/PigServer.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/PigServer.java (original)
+++ pig/branches/spark/src/org/apache/pig/PigServer.java Fri Mar  4 18:17:39 2016
@@ -44,6 +44,7 @@ import java.util.concurrent.atomic.Atomi
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.pig.backend.datastorage.ContainerDescriptor;
@@ -61,6 +62,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileLocalizer.FetchFileRet;
+import org.apache.pig.impl.io.compress.BZip2CodecWithExtensionBZ;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.streaming.StreamingCommand;
@@ -228,6 +230,7 @@ public class PigServer {
 
         this.filter = new BlackAndWhitelistFilter(this);
 
+        addHadoopProperties();
         addJarsFromProperties();
         markPredeployedJarsFromProperties();
 
@@ -240,6 +243,25 @@ public class PigServer {
 
     }
 
+    private void addHadoopProperties() throws ExecException {
+        // For BZip input on hadoop 0.23/2.X
+        // with PIG_BZIP_USE_HADOOP_INPUTFORMAT turned on,
+        // PigTextInputFormat depends on hadoop's TextInputFormat
+        // for handling bzip2 input. One problem is it only recognize 'bz2'
+        // as extension and not 'bz'.
+        // Adding custom BZip2 codec that returns 'bz' as extension
+        // for backward compatibility.
+        String codecs =
+            pigContext.getProperties().getProperty("io.compression.codecs");
+
+        if( codecs != null
+            && codecs.contains(BZip2Codec.class.getCanonicalName() ) ) {
+            pigContext.getProperties().setProperty("io.compression.codecs",
+                codecs + ","
+                + BZip2CodecWithExtensionBZ.class.getCanonicalName() );
+        }
+    }
+
     private void addJarsFromProperties() throws ExecException {
         //add jars from properties to extraJars
         String jar_str = pigContext.getProperties().getProperty("pig.additional.jars");

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/HDataType.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/HDataType.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/HDataType.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/HDataType.java Fri Mar  4 18:17:39 2016
@@ -86,6 +86,48 @@ public class HDataType {
         }
     }
 
+    public static PigNullableWritable getNewWritableComparable(byte keyType) throws ExecException {
+        switch (keyType) {
+            case DataType.BAG:
+                return new NullableBag();
+            case DataType.BOOLEAN:
+                return new NullableBooleanWritable();
+            case DataType.BYTEARRAY:
+                return new NullableBytesWritable();
+            case DataType.CHARARRAY:
+                return new NullableText();
+            case DataType.DOUBLE:
+                return new NullableDoubleWritable();
+            case DataType.FLOAT:
+                return new NullableFloatWritable();
+            case DataType.INTEGER:
+                return new NullableIntWritable();
+            case DataType.BIGINTEGER:
+                return new NullableBigIntegerWritable();
+            case DataType.BIGDECIMAL:
+                return new NullableBigDecimalWritable();
+            case DataType.LONG:
+                return new NullableLongWritable();
+            case DataType.DATETIME:
+                return new NullableDateTimeWritable();
+            case DataType.TUPLE:
+                return new NullableTuple();
+            case DataType.MAP: {
+                int errCode = 1068;
+                String msg = "Using Map as key not supported.";
+                throw new ExecException(msg, errCode, PigException.INPUT);
+            }
+            default: {
+                if (typeToName == null) typeToName = DataType.genTypeToNameMap();
+                int errCode = 2044;
+                String msg = "The type "
+                    + typeToName.get(keyType) == null ? "" + keyType : typeToName.get(keyType)
+                    + " cannot be collected as a Key type";
+                throw new ExecException(msg, errCode, PigException.BUG);
+            }
+        }
+    }
+
     public static PigNullableWritable getWritableComparableTypes(Object o, byte keyType) throws ExecException{
 
         byte newKeyType = keyType;
@@ -261,6 +303,14 @@ public class HDataType {
         return wcKey;
     }
 
+    public static byte findTypeFromClassName(String className) throws ExecException {
+        if (classToTypeMap.containsKey(className)) {
+            return classToTypeMap.get(className);
+        } else {
+            throw new ExecException("Unable to map " + className + " to known types." + Arrays.toString(classToTypeMap.keySet().toArray()));
+        }
+    }
+
     public static byte findTypeFromNullableWritable(PigNullableWritable o) throws ExecException {
         if (o instanceof NullableBooleanWritable)
             return DataType.BOOLEAN;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/ConfigurationUtil.java Fri Mar  4 18:17:39 2016
@@ -80,7 +80,7 @@ public class ConfigurationUtil {
             localConf.addResource("core-default.xml");
         } else {
             if (PigMapReduce.sJobContext!=null) {
-                localConf = PigMapReduce.sJobContext.getConfiguration();
+                localConf = new Configuration(PigMapReduce.sJobContext.getConfiguration());
             } else {
                 localConf = new Configuration(true);
             }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HPath.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HPath.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HPath.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/datastorage/HPath.java Fri Mar  4 18:17:39 2016
@@ -124,10 +124,11 @@ public abstract class HPath implements E
 
     public Properties getConfiguration() throws IOException {
         HConfiguration props = new HConfiguration();
+        FileStatus fileStatus = fs.getHFS().getFileStatus(path);
 
-        long blockSize = fs.getHFS().getFileStatus(path).getBlockSize();
+        long blockSize = fileStatus.getBlockSize();
 
-        short replication = fs.getHFS().getFileStatus(path).getReplication();
+        short replication = fileStatus.getReplication();
 
         props.setProperty(BLOCK_SIZE_KEY, (Long.valueOf(blockSize)).toString());
         props.setProperty(BLOCK_REPLICATION_KEY, (Short.valueOf(replication)).toString());

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Mar  4 18:17:39 2016
@@ -42,6 +42,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.fetch.FetchLauncher;
 import org.apache.pig.backend.hadoop.executionengine.fetch.FetchOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -115,16 +116,17 @@ public abstract class HExecutionEngine i
     public JobConf getS3Conf() throws ExecException {
         JobConf jc = new JobConf();
         jc.addResource(CORE_SITE);
+        JobConf s3Jc = new JobConf(false);
         Iterator<Entry<String, String>> i = jc.iterator();
         while (i.hasNext()) {
             Entry<String, String> e = i.next();
             String key = e.getKey();
             String value = e.getValue();
             if (key.startsWith("fs.s3") || key.startsWith("fs.s3n")) {
-                jc.set(key, value);
+                s3Jc.set(key, value);
             }
         }
-        return jc;
+        return s3Jc;
     }
 
     public JobConf getLocalConf() {
@@ -187,10 +189,9 @@ public abstract class HExecutionEngine i
         // existing properties All of the above is accomplished in the method
         // call below
 
-        JobConf jc = getS3Conf();
+        JobConf jc;
         if (!this.pigContext.getExecType().isLocal()) {
-            JobConf execConf = getExecConf(properties);
-            ConfigurationUtil.mergeConf(jc, execConf);
+            jc = getExecConf(properties);
 
             // Trick to invoke static initializer of DistributedFileSystem to
             // add hdfs-default.xml into configuration
@@ -204,8 +205,9 @@ public abstract class HExecutionEngine i
             properties.setProperty(FILE_SYSTEM_LOCATION, "file:///");
             properties.setProperty(ALTERNATIVE_FILE_SYSTEM_LOCATION, "file:///");
 
-            JobConf localConf = getLocalConf();
-            ConfigurationUtil.mergeConf(jc, localConf);
+            jc = getLocalConf();
+            JobConf s3Jc = getS3Conf();
+            ConfigurationUtil.mergeConf(jc, s3Jc);
         }
 
         // the method below alters the properties object by overriding the
@@ -296,6 +298,7 @@ public abstract class HExecutionEngine i
             //skipped; a SimpleFetchPigStats will be returned through which the result
             //can be directly fetched from the underlying storage
             if (FetchOptimizer.isPlanFetchable(pc, pp)) {
+                new PhyPlanSetter(pp).visit();
                 return new FetchLauncher(pc).launchPig(pp);
             }
             return launcher.launchPig(pp, grpName, pigContext);




Mime
View raw message