pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1783988 [12/24] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Date Wed, 22 Feb 2017 09:43:46 GMT
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Wed Feb 22 09:43:41 2017
@@ -70,7 +70,7 @@ public class MapRedUtil {
     private static Log log = LogFactory.getLog(MapRedUtil.class);
     private static final TupleFactory tf = TupleFactory.getInstance();
 
-    public static final String FILE_SYSTEM_NAME = "fs.default.name";
+    public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY;
 
     /**
      * Loads the key distribution sampler file
@@ -301,7 +301,7 @@ public class MapRedUtil {
     /**
      * Returns the total number of bytes for this file, or if a directory all
      * files in the directory.
-     * 
+     *
      * @param fs FileSystem
      * @param status FileStatus
      * @param max Maximum value of total length that will trigger exit. Many

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Wed Feb 22 09:43:41 2017
@@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -597,7 +596,9 @@ public class HBaseStorage extends LoadFu
                             new BinaryComparator(colInfo.getColumnName())));
                 }
             }
-            thisColumnGroupFilter.addFilter(columnFilters);
+            if (columnFilters.getFilters().size() != 0) {
+                thisColumnGroupFilter.addFilter(columnFilters);
+            }
             allColumnFilters.addFilter(thisColumnGroupFilter);
         }
         if (allColumnFilters != null) {
@@ -792,46 +793,35 @@ public class HBaseStorage extends LoadFu
     public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
-            Method addHBaseDependencyJars =
-              TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
-            if (addHBaseDependencyJars != null) {
-                Configuration conf = new Configuration();
-                addHBaseDependencyJars.invoke(null, conf);
-                if (conf.get("tmpjars") != null) {
-                    String[] tmpjars = conf.getStrings("tmpjars");
-                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
-                    for (String tmpjar : tmpjars) {
-                        shipFiles.add(new URL(tmpjar).getPath());
-                    }
-                    return shipFiles;
+            Configuration conf = new Configuration();
+            TableMapReduceUtil.addHBaseDependencyJars(conf);
+            if (conf.get("tmpjars") != null) {
+                String[] tmpjars = conf.getStrings("tmpjars");
+                List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                for (String tmpjar : tmpjars) {
+                    shipFiles.add(new URL(tmpjar).getPath());
                 }
+                return shipFiles;
+            }
+        } catch (IOException e) {
+            if(e instanceof MalformedURLException){
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                        + " had malformed url. Falling back to previous logic.", e);
+            }else {
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+                        + " failed. Falling back to previous logic.", e);
             }
-        } catch (NoSuchMethodException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
-              + " Falling back to previous logic.", e);
-        } catch (IllegalAccessException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " not permitted. Falling back to previous logic.", e);
-        } catch (InvocationTargetException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " failed. Falling back to previous logic.", e);
-        } catch (MalformedURLException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
-                    + " had malformed url. Falling back to previous logic.", e);
         }
 
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
-        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
-            classList.add(com.google.common.collect.Lists.class); // guava
-        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
         addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
         addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
-        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
         addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
         return FuncUtils.getShipFiles(classList);
     }
@@ -882,27 +872,13 @@ public class HBaseStorage extends LoadFu
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
-            // Will not be entering this block for 0.20.2 as it has no security.
             try {
-                // getCurrentUser method is not public in 0.20.2
-                Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
-                UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
-                // hasKerberosCredentials method not available in 0.20.2
-                Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
-                boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
-                if (hasKerberosCredentials) {
-                    // Class and method are available only from 0.92 security release
-                    Class tokenUtilClass = Class
-                            .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
-                    Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
-                            Configuration.class, UserGroupInformation.class, Job.class });
-                    m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
+                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+                if (currentUser.hasKerberosCredentials()) {
+                    TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
                 } else {
                     LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
                 }
-            } catch (ClassNotFoundException cnfe) {
-                throw new RuntimeException("Failure loading TokenUtil class, "
-                        + "is secure RPC available?", cnfe);
             } catch (RuntimeException re) {
                 throw re;
             } catch (Exception e) {

Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Wed Feb 22 09:43:41 2017
@@ -35,6 +35,7 @@ import org.apache.pig.FilterFunc;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 /**
  * Use a Bloom filter build previously by BuildBloom.  You would first
@@ -54,14 +55,36 @@ import org.apache.pig.data.Tuple;
  * C = filter B by bloom(z);
  * D = join C by z, A by x;
  * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ *
+ * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF
+ * as a scalar instead of storing it to file and loading again. This is simpler
+ * if the Bloom filter will not be reused and needs to be discarded after the
+ * run of the script.
+ *
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate bb(A.x) as bloomfilter;
+ * D = load 'bar' as (z);
+ * E = filter D by Bloom(C.bloomfilter, z);
+ * F = join E by z, A by x;
  */
 public class Bloom extends FilterFunc {
 
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
     private String bloomFile;
-    public BloomFilter filter = null;
+    private BloomFilter filter = null;
 
-    /** 
-     * @param filename file containing the serialized Bloom filter
+    public Bloom() {
+    }
+
+    /**
+     * The filename containing the serialized Bloom filter. If filename is null
+     * or the no-arg constructor is used, then the bloomfilter bytearray which
+     * is the output of BuildBloom should be passed as the first argument to the UDF
+     *
+     * @param filename  file containing the serialized Bloom filter
      */
     public Bloom(String filename) {
         bloomFile = filename;
@@ -70,11 +93,25 @@ public class Bloom extends FilterFunc {
     @Override
     public Boolean exec(Tuple input) throws IOException {
         if (filter == null) {
-            init();
+            init(input);
         }
         byte[] b;
-        if (input.size() == 1) b = DataType.toBytes(input.get(0));
-        else b = DataType.toBytes(input, DataType.TUPLE);
+        if (bloomFile == null) {
+            // The first one is the bloom filter. Skip that
+            if (input.size() == 2) {
+                b = DataType.toBytes(input.get(1));
+            } else {
+                List<Object> inputList = input.getAll();
+                Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size()));
+                b = DataType.toBytes(tuple, DataType.TUPLE);
+            }
+        } else {
+            if (input.size() == 1) {
+                b = DataType.toBytes(input.get(0));
+            } else {
+                b = DataType.toBytes(input, DataType.TUPLE);
+            }
+        }
 
         Key k = new Key(b);
         return filter.membershipTest(k);
@@ -82,34 +119,46 @@ public class Bloom extends FilterFunc {
 
     @Override
     public List<String> getCacheFiles() {
-        List<String> list = new ArrayList<String>(1);
-        // We were passed the name of the file on HDFS.  Append a
-        // name for the file on the task node.
-        try {
-            list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        if (bloomFile != null) {
+            List<String> list = new ArrayList<String>(1);
+            // We were passed the name of the file on HDFS.  Append a
+            // name for the file on the task node.
+            try {
+                list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return list;
         }
-        return list;
+        return null;
     }
 
-    private void init() throws IOException {
-        filter = new BloomFilter();
-        String dir = "./" + getFilenameFromPath(bloomFile);
-        String[] partFiles = new File(dir)
-                .list(new FilenameFilter() {
-                    @Override
-                    public boolean accept(File current, String name) {
-                        return name.startsWith("part");
-                    }
-                });
-
-        String dcFile = dir + "/" + partFiles[0];
-        DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
-        try {
-            filter.readFields(dis);
-        } finally {
-            dis.close();
+    private void init(Tuple input) throws IOException {
+        if (bloomFile == null) {
+            if (input.get(0) instanceof DataByteArray) {
+                filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0));
+            } else {
+                throw new IllegalArgumentException("The first argument to the Bloom UDF should be"
+                        + " the bloom filter if a bloom file is not specified in the constructor");
+            }
+        } else {
+            filter = new BloomFilter();
+            String dir = "./" + getFilenameFromPath(bloomFile);
+            String[] partFiles = new File(dir)
+                    .list(new FilenameFilter() {
+                        @Override
+                        public boolean accept(File current, String name) {
+                            return name.startsWith("part");
+                        }
+                    });
+
+            String dcFile = dir + "/" + partFiles[0];
+            DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+            try {
+                filter.readFields(dis);
+            } finally {
+                dis.close();
+            }
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Wed Feb 22 09:43:41 2017
@@ -18,16 +18,15 @@
 
 package org.apache.pig.builtin;
 
-import java.io.IOException;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.hash.Hash;
-
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
@@ -47,7 +46,7 @@ public abstract class BuildBloomBase<T>
     protected BuildBloomBase() {
     }
 
-    /** 
+    /**
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param mode Will be ignored, though by convention it should be
@@ -64,7 +63,7 @@ public abstract class BuildBloomBase<T>
         hType = convertHashType(hashType);
     }
 
-    /** 
+    /**
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param numElements The number of distinct elements expected to be
@@ -104,7 +103,7 @@ public abstract class BuildBloomBase<T>
         return new DataByteArray(baos.toByteArray());
     }
 
-    protected BloomFilter bloomIn(DataByteArray b) throws IOException {
+    public static BloomFilter bloomIn(DataByteArray b) throws IOException {
         DataInputStream dis = new DataInputStream(new
             ByteArrayInputStream(b.get()));
         BloomFilter f = new BloomFilter();

Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Wed Feb 22 09:43:41 2017
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.Counters;
@@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF
 
     @Override
     public List<String> getShipFiles() {
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
-                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, 
-                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+                Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
         return files;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Wed Feb 22 09:43:41 2017
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc
 
     @Override
     public List<String> getShipFiles() {
-        List<String> cacheFiles = new ArrayList<String>();
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
-                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
                 Input.class};
         return FuncUtils.getShipFiles(classList);
     }
@@ -456,7 +445,7 @@ public class OrcStorage extends LoadFunc
     }
 
     private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
+        FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration());
         Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
         if (path == null) {
             log.info("Cannot find any ORC files from " + location +

Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Wed Feb 22 09:43:41 2017
@@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -171,7 +170,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
         validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
         validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
         validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
-        Option overwrite = new Option(" ", "Overwrites the destination.");
+        Option overwrite = new Option("overwrite", "Overwrites the destination.");
         overwrite.setLongOpt("overwrite");
         overwrite.setOptionalArg(true);
         overwrite.setArgs(1);
@@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+           && (!bzipinput_usehadoops) ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Wed Feb 22 09:43:41 2017
@@ -17,15 +17,63 @@
  */
 package org.apache.pig.builtin;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
-    private int num = 0;
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most cases.
+ */
+@Deprecated
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
+        implements Configurable {
+
+    /**
+     * Batch size for round robin partitioning. Batch size number of records
+     * will be distributed to each partition in a round robin fashion. Default
+     * value is 0 which distributes each record in a circular fashion. Higher
+     * number for batch size can be used to increase probability of keeping
+     * similar records in the same partition if output is already sorted and get
+     * better compression.
+     */
+    public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
+    private int num = -1;
+    private int batchSize = 0;
+    private int currentBatchCount = 0;
+    private Configuration conf;
 
     @Override
     public int getPartition(Writable key, Writable value, int numPartitions) {
-        num = ++num % numPartitions;
+        if (batchSize > 0) {
+            if (currentBatchCount == 0) {
+                num = ++num % numPartitions;
+            }
+            if (++currentBatchCount == batchSize) {
+                currentBatchCount = 0;
+            }
+        } else {
+            num = ++num % numPartitions;
+        }
         return num;
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Wed Feb 22 09:43:41 2017
@@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && !HadoopShims.isHadoopYARN()
-           && !bzipinput_usehadoops ) {
+                && !bzipinput_usehadoops ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Wed Feb 22 09:43:41 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
     }
 
     @SuppressWarnings("rawtypes")
-    protected void warn(String msg, Enum warningEnum, Exception e) {
+    protected void warn(String msg, Enum warningEnum, Throwable e) {
         pigLogger = PhysicalOperator.getPigLogger();
         if(pigLogger != null) {
             pigLogger.warn(this, msg, warningEnum);

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Wed Feb 22 09:43:41 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.io.FileNotFoundException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
 public class DefaultDataBag extends DefaultAbstractBag {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-    
+
     private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
 
     public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DefaultDataBagIterator();
@@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
             if (hasCachedTuple)
                 return (mBuf != null);
@@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa
                         log.fatal(msg, eof);
                         throw new RuntimeException(msg, eof);
                     } catch (IOException ioe) {
-                        String msg = "Unable to read our spill file."; 
+                        String msg = "Unable to read our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
                     }
@@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa
                         log.warn("Failed to close spill file.", e);
                     }
                 } catch (IOException ioe) {
-                    String msg = "Unable to read our spill file."; 
+                    String msg = "Unable to read our spill file.";
                     log.fatal(msg, ioe);
                     throw new RuntimeException(msg, ioe);
                 }

Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Wed Feb 22 09:43:41 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return true;
     }
-    
-    
+
+
     @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
-            //We need to racalculate size to guarantee a count of unique 
+            //We need to racalculate size to guarantee a count of unique
             //entries including those on disk
             Iterator<Tuple> iter = iterator();
             int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
                 newSize++;
                 iter.next();
             }
-            
+
             synchronized(mContents) {
                 //we don't want adds to change our numbers
                 //the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
         }
         return mSize;
     }
-    
-    
+
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
@@ -155,12 +155,15 @@ public class DistinctDataBag extends Def
                     }
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -171,7 +174,6 @@ public class DistinctDataBag extends Def
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -208,7 +210,7 @@ public class DistinctDataBag extends Def
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -237,7 +239,7 @@ public class DistinctDataBag extends Def
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -295,7 +297,7 @@ public class DistinctDataBag extends Def
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -346,7 +348,7 @@ public class DistinctDataBag extends Def
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -502,7 +504,7 @@ public class DistinctDataBag extends Def
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
                             filesToDelete.add(f);
-                            
+
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -545,7 +547,7 @@ public class DistinctDataBag extends Def
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();
@@ -560,6 +562,6 @@ public class DistinctDataBag extends Def
             }
         }
     }
-    
+
 }
 

Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Wed Feb 22 09:43:41 2017
@@ -50,6 +50,9 @@ public class ReadOnceBag implements Data
      */
     private static final long serialVersionUID = 2L;
 
+    public ReadOnceBag() {
+    }
+
     /**
      * This constructor creates a bag out of an existing iterator
      * of tuples by taking ownership of the iterator and NOT

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Wed Feb 22 09:43:41 2017
@@ -39,6 +39,7 @@ import org.apache.pig.data.utils.Structu
 import org.apache.pig.data.utils.StructuresHelper.Triple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -272,14 +273,20 @@ public class SchemaTupleBackend {
     private static SchemaTupleBackend stb;
 
     public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
-        initialize(jConf, pigContext, pigContext.getExecType().isLocal());
+        if (stb != null) {
+            SchemaTupleFrontend.lazyReset(pigContext);
+        }
+        initialize(jConf, pigContext.getExecType().isLocal());
     }
 
-    public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
+    public static void initialize(Configuration jConf) throws IOException {
+        initialize(jConf, Utils.isLocal(jConf));
+    }
+
+    public static void initialize(Configuration jConf, boolean isLocal) throws IOException {
         if (stb != null) {
             LOG.warn("SchemaTupleBackend has already been initialized");
         } else {
-            SchemaTupleFrontend.lazyReset(pigContext);
             SchemaTupleFrontend.reset();
             SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
             stbInstance.copyAndResolve();

Modified: pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedDataBag.java Wed Feb 22 09:43:41 2017
@@ -32,7 +32,7 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.ListIterator;
 import java.util.PriorityQueue;
-  
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigCounters;
@@ -44,14 +44,14 @@ import org.apache.pig.PigWarning;
  * stored unsorted as it comes in, and only sorted when it is time to dump
  * it to a file or when the first iterator is requested.  Experementation
  * found this to be the faster than storing it sorted to begin with.
- * 
+ *
  * We allow a user defined comparator, but provide a default comparator in
  * cases where the user doesn't specify one.
  */
 public class SortedDataBag extends DefaultAbstractBag{
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
@@ -76,7 +76,7 @@ public class SortedDataBag extends Defau
 
         @Override
         public int hashCode() {
-            return 42; 
+            return 42;
         }
 
     }
@@ -95,12 +95,12 @@ public class SortedDataBag extends Defau
     public boolean isSorted() {
         return true;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new SortedDataBagIterator();
@@ -145,12 +145,15 @@ public class SortedDataBag extends Defau
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -161,7 +164,6 @@ public class SortedDataBag extends Defau
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -203,7 +205,7 @@ public class SortedDataBag extends Defau
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -228,7 +230,7 @@ public class SortedDataBag extends Defau
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -341,7 +343,7 @@ public class SortedDataBag extends Defau
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -351,7 +353,7 @@ public class SortedDataBag extends Defau
                     } catch (FileNotFoundException fnfe) {
                         // We can't find our own spill file?  That should
                         // never happen.
-                        String msg = "Unable to find our spill file."; 
+                        String msg = "Unable to find our spill file.";
                         log.fatal(msg, fnfe);
                         throw new RuntimeException(msg, fnfe);
                     }
@@ -411,7 +413,7 @@ public class SortedDataBag extends Defau
                         in.close();
                     }catch(IOException e) {
                         log.warn("Failed to close spill file.", e);
-                    }                	
+                    }
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -518,7 +520,7 @@ public class SortedDataBag extends Defau
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();

Modified: pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SortedSpillBag.java Wed Feb 22 09:43:41 2017
@@ -29,7 +29,7 @@ import org.apache.pig.classification.Int
 
 /**
  * Common functionality for proactively spilling bags that need to keep the data
- * sorted. 
+ * sorted.
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -54,9 +54,9 @@ public abstract class SortedSpillBag ext
         //count for number of objects that have spilled
         if(mSpillFiles == null)
             incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_BAGS);
-        
+
         long spilled = 0;
-        
+
         DataOutputStream out = null;
         try {
             out = getSpillFile();
@@ -71,13 +71,13 @@ public abstract class SortedSpillBag ext
             //sort the tuples
             // as per documentation of collection.sort(), it copies to an array,
             // sorts and copies back to collection
-            // Avoiding that extra copy back to collection (mContents) by 
+            // Avoiding that extra copy back to collection (mContents) by
             // copying to an array and using Arrays.sort
             Tuple[] array = new Tuple[mContents.size()];
             mContents.toArray(array);
             if(comp == null)
                 Arrays.sort(array);
-            else 
+            else
                 Arrays.sort(array,comp);
 
             //dump the array
@@ -89,12 +89,15 @@ public abstract class SortedSpillBag ext
             }
 
             out.flush();
-        } catch (IOException ioe) {
+            out.close();
+            out = null;
+            mContents.clear();
+        } catch (Throwable e) {
             // Remove the last file from the spilled array, since we failed to
             // write to it.
             mSpillFiles.remove(mSpillFiles.size() - 1);
             warn(
-                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
             return 0;
         } finally {
             if (out != null) {
@@ -105,11 +108,9 @@ public abstract class SortedSpillBag ext
                 }
             }
         }
-        mContents.clear();
-        
         incSpillCount(PigCounters.PROACTIVE_SPILL_COUNT_RECS, spilled);
-        
+
         return spilled;
     }
-    
+
 }

Modified: pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/UnlimitedNullTuple.java Wed Feb 22 09:43:41 2017
@@ -28,7 +28,7 @@ public class UnlimitedNullTuple extends
 
     @Override
     public int size() {
-        throw new RuntimeException("Unimplemented");
+        return Integer.MAX_VALUE;
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/utils/SedesHelper.java Wed Feb 22 09:43:41 2017
@@ -61,25 +61,25 @@ public class SedesHelper {
     public static void writeChararray(DataOutput out, String s) throws IOException {
         // a char can take up to 3 bytes in the modified utf8 encoding
         // used by DataOutput.writeUTF, so use UNSIGNED_SHORT_MAX/3
-        if (s.length() < BinInterSedes.UNSIGNED_SHORT_MAX / 3) {
+        byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
+        int length = utfBytes.length;
+        if (length < BinInterSedes.UNSIGNED_SHORT_MAX) {
             out.writeByte(BinInterSedes.SMALLCHARARRAY);
-            out.writeUTF(s);
+            out.writeShort(length);
         } else {
-            byte[] utfBytes = s.getBytes(BinInterSedes.UTF8);
-            int length = utfBytes.length;
-
             out.writeByte(BinInterSedes.CHARARRAY);
             out.writeInt(length);
-            out.write(utfBytes);
         }
+        out.write(utfBytes);
     }
 
     public static String readChararray(DataInput in, byte type) throws IOException {
+        int size;
         if (type == BinInterSedes.SMALLCHARARRAY) {
-            return in.readUTF();
+            size = in.readUnsignedShort();
+        } else {
+            size = in.readInt();
         }
-
-        int size = in.readInt();
         byte[] buf = new byte[size];
         in.readFully(buf);
         return new String(buf, BinInterSedes.UTF8);

Modified: pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/PigImplConstants.java Wed Feb 22 09:43:41 2017
@@ -38,6 +38,12 @@ public class PigImplConstants {
     public static final String PIG_OPTIMIZER_RULES_KEY = "pig.optimizer.rules";
 
     /**
+     * Used by pig to indicate that current job is running in local mode (local/tez_local)
+     * ie. ExecType.isLocal() is true
+     */
+    public static final String PIG_EXECTYPE_MODE_LOCAL = "pig.exectype.mode.local";
+
+    /**
      * Used by pig to indicate that current job has been converted to run in local mode
      */
     public static final String CONVERTED_TO_LOCAL = "pig.job.converted.local";
@@ -63,4 +69,24 @@ public class PigImplConstants {
      * Parallelism to be used for CROSS operation by GFCross UDF
      */
     public static final String PIG_CROSS_PARALLELISM = "pig.cross.parallelism";
+
+    /**
+     * Pig context
+     */
+    public static final String PIG_CONTEXT = "pig.pigContext";
+
+    /**
+     * Pig log4j properties
+     */
+    public static final String PIG_LOG4J_PROPERTIES = "pig.log4j.properties";
+
+    /**
+     * A unique id for a Pig session used as callerId for underlining component
+     */
+    public static final String PIG_AUDIT_ID = "pig.script.id";
+
+    // Kill the jobs before cleaning up tmp files
+    public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
+    public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
+    public static int SHUTDOWN_HOOK_ATS_CLIENT_PRIORITY = 1;
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/DefaultIndexableLoader.java Wed Feb 22 09:43:41 2017
@@ -64,13 +64,13 @@ import org.apache.pig.impl.util.ObjectSe
 public class DefaultIndexableLoader extends LoadFunc implements IndexableLoadFunc{
 
     private static final Log LOG = LogFactory.getLog(DefaultIndexableLoader.class);
-    
+
     // FileSpec of index file which will be read from HDFS.
     private String indexFile;
     private String indexFileLoadFuncSpec;
-    
+
     private LoadFunc loader;
-    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.  
+    // Index is modeled as FIFO queue and LinkedList implements java Queue interface.
     private LinkedList<Tuple> index;
     private FuncSpec rightLoaderFuncSpec;
 
@@ -79,9 +79,9 @@ public class DefaultIndexableLoader exte
     private transient TupleFactory mTupleFactory;
 
     private String inpLocation;
-    
+
     public DefaultIndexableLoader(
-            String loaderFuncSpec, 
+            String loaderFuncSpec,
             String indexFile,
             String indexFileLoadFuncSpec,
             String scope,
@@ -93,39 +93,39 @@ public class DefaultIndexableLoader exte
         this.scope = scope;
         this.inpLocation = inputLocation;
     }
-    
+
     @SuppressWarnings("unchecked")
     @Override
     public void seekNear(Tuple keys) throws IOException{
         // some setup
         mTupleFactory = TupleFactory.getInstance();
 
-        /* Currently whole of index is read into memory. Typically, index is small. Usually 
+        /* Currently whole of index is read into memory. Typically, index is small. Usually
            few KBs in size. So, this should not be an issue.
            However, reading whole index at startup time is not required. So, this can be improved upon.
            Assumption: Index being read is sorted on keys followed by filename, followed by offset.
          */
 
         // Index is modeled as FIFO Queue, that frees us from keeping track of which index entry should be read next.
-        
+
         // the keys are sent in a tuple. If there is really only
         // 1 join key, it would be the first field of the tuple. If
         // there are multiple Join keys, the tuple itself represents
         // the join key
         Object firstLeftKey = (keys.size() == 1 ? keys.get(0): keys);
         POLoad ld = new POLoad(genKey(), new FileSpec(indexFile, new FuncSpec(indexFileLoadFuncSpec)));
-                
+
         Properties props = ConfigurationUtil.getLocalFSProperties();
         PigContext pc = new PigContext(ExecType.LOCAL, props);
         ld.setPc(pc);
         index = new LinkedList<Tuple>();
         for(Result res=ld.getNextTuple();res.returnStatus!=POStatus.STATUS_EOP;res=ld.getNextTuple())
-            index.offer((Tuple) res.result);   
+            index.offer((Tuple) res.result);
+
 
-        
         Tuple prevIdxEntry = null;
         Tuple matchedEntry;
-     
+
         // When the first call is made, we need to seek into right input at correct offset.
         while(true){
             // Keep looping till we find first entry in index >= left key
@@ -148,15 +148,15 @@ public class DefaultIndexableLoader exte
                 prevIdxEntry = curIdxEntry;
                 continue;
             }
-            
+
             if(((Comparable)extractedKey).compareTo(firstLeftKey) >= 0){
                 index.addFirst(curIdxEntry);  // We need to add back the current index Entry because we are reading ahead.
                 if(null == prevIdxEntry)   // very first entry in index.
                     matchedEntry = curIdxEntry;
                 else{
-                    matchedEntry = prevIdxEntry; 
+                    matchedEntry = prevIdxEntry;
                     // start join from previous idx entry, it might have tuples
-                    // with this key                    
+                    // with this key
                     index.addFirst(prevIdxEntry);
                 }
                 break;
@@ -168,43 +168,43 @@ public class DefaultIndexableLoader exte
         if (matchedEntry == null) {
             LOG.warn("Empty index file: input directory is empty");
         } else {
-        
+
             Object extractedKey = extractKeysFromIdxTuple(matchedEntry);
-            
+
             if (extractedKey != null) {
                 Class idxKeyClass = extractedKey.getClass();
                 if( ! firstLeftKey.getClass().equals(idxKeyClass)){
-    
+
                     // This check should indeed be done on compile time. But to be on safe side, we do it on runtime also.
                     int errCode = 2166;
                     String errMsg = "Key type mismatch. Found key of type "+firstLeftKey.getClass().getCanonicalName()+" on left side. But, found key of type "+ idxKeyClass.getCanonicalName()+" in index built for right side.";
                     throw new ExecException(errMsg,errCode,PigException.BUG);
                 }
-            } 
+            }
         }
-        
+
         //add remaining split indexes to splitsAhead array
         int [] splitsAhead = new int[index.size()];
         int splitsAheadIdx = 0;
         for(Tuple t : index){
             splitsAhead[splitsAheadIdx++] = (Integer) t.get( t.size()-1 );
         }
-        
+
         initRightLoader(splitsAhead);
     }
-    
+
     private void initRightLoader(int [] splitsToBeRead) throws IOException{
-        PigContext pc = (PigContext) ObjectSerializer
-                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.pigContext"));
-        
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        
+        Properties properties = (Properties) ObjectSerializer
+                .deserialize(PigMapReduce.sJobConfInternal.get().get("pig.client.sys.props"));
+
+        Configuration conf = ConfigurationUtil.toConfiguration(properties);
+
         // Hadoop security need this property to be set
         if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
-            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY, 
+            conf.set(MRConfiguration.JOB_CREDENTIALS_BINARY,
                     System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
         }
-        
+
         //create ReadToEndLoader that will read the given splits in order
         loader = new ReadToEndLoader((LoadFunc)PigContext.instantiateFuncFromSpec(rightLoaderFuncSpec),
                 conf, inpLocation, splitsToBeRead);
@@ -216,7 +216,7 @@ public class DefaultIndexableLoader exte
 
         if(idxTupSize == 3)
             return idxTuple.get(0);
-        
+
         int numColsInKey = (idxTupSize - 2);
         List<Object> list = new ArrayList<Object>(numColsInKey);
         for(int i=0; i < numColsInKey; i++)
@@ -228,13 +228,13 @@ public class DefaultIndexableLoader exte
     private OperatorKey genKey(){
         return new OperatorKey(scope,NodeIdGenerator.getGenerator().getNextNodeId(scope));
     }
-    
+
     @Override
     public Tuple getNext() throws IOException {
         Tuple t = loader.getNext();
         return t;
     }
-    
+
     @Override
     public void close() throws IOException {
     }
@@ -242,14 +242,14 @@ public class DefaultIndexableLoader exte
     @Override
     public void initialize(Configuration conf) throws IOException {
         // nothing to do
-        
+
     }
 
     @Override
     public InputFormat getInputFormat() throws IOException {
         throw new UnsupportedOperationException();
     }
-    
+
     @Override
     public LoadCaster getLoadCaster() throws IOException {
         throw new UnsupportedOperationException();
@@ -264,7 +264,7 @@ public class DefaultIndexableLoader exte
     public void setLocation(String location, Job job) throws IOException {
         // nothing to do
     }
-    
+
     public void setIndexFile(String indexFile) {
         this.indexFile = indexFile;
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/GFCross.java Wed Feb 22 09:43:41 2017
@@ -26,6 +26,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
@@ -42,7 +43,7 @@ public class GFCross extends EvalFunc<Da
     private BagFactory mBagFactory = BagFactory.getInstance();
     private TupleFactory mTupleFactory = TupleFactory.getInstance();
     private int parallelism = 0;
-    private Random r = new Random();
+    private Random r;
     private String crossKey;
 
     static private final int DEFAULT_PARALLELISM = 96;
@@ -69,6 +70,14 @@ public class GFCross extends EvalFunc<Da
                 if (parallelism < 0) {
                     throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
                 }
+                long taskIdHashCode = cfg.get(MRConfiguration.TASK_ID).hashCode();
+                long seed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+                r = new Random(seed);
+            } else {
+                // Don't see a case where cfg can be null.
+                // But there is an existing testcase TestGFCross.testDefault
+                // Using constant generated from task_14738102975522_0001_r_000000 hashcode
+                r = new Random(-4235927512599300514L);
             }
 
             numInputs = (Integer)input.get(0);

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/PoissonSampleLoader.java Wed Feb 22 09:43:41 2017
@@ -90,7 +90,9 @@ public class PoissonSampleLoader extends
             // number of tuples to be skipped
             Tuple t = loader.getNext();
             if(t == null) {
-                return createNumRowTuple(null);
+                // since skipInterval is -1, no previous sample,
+                // and next sample is null -> the data set is empty
+                return null;
             }
             long availRedMem = (long) ( totalMemory * heapPerc);
             // availRedMem = 155084396;

Modified: pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/NullableTuple.java Wed Feb 22 09:43:41 2017
@@ -57,6 +57,8 @@ public class NullableTuple extends PigNu
     public void readFields(DataInput in) throws IOException {
         boolean nullness = in.readBoolean();
         setNull(nullness);
+        // Free up the previous value for GC
+        mValue = null;
         if (!nullness) {
             mValue = bis.readTuple(in);
         }

Modified: pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/PigFile.java Wed Feb 22 09:43:41 2017
@@ -102,7 +102,7 @@ public class PigFile {
         if(oc.needsTaskCommit(tac)) {
             oc.commitTask(tac);
         }
-        HadoopShims.commitOrCleanup(oc, jc);
+        oc.commitJob(jc);
     }
 
     @Override

Modified: pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/io/ReadToEndLoader.java Wed Feb 22 09:43:41 2017
@@ -40,17 +40,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 
 /**
  * This is wrapper Loader which wraps a real LoadFunc underneath and allows
- * to read a file completely starting a given split (indicated by a split index 
+ * to read a file completely starting a given split (indicated by a split index
  * which is used to look in the List<InputSplit> returned by the underlying
  * InputFormat's getSplits() method). So if the supplied split index is 0, this
  * loader will read the entire file. If it is non zero it will read the partial
  * file beginning from that split to the last split.
- * 
+ *
  * The call sequence to use this is:
  * 1) construct an object using the constructor
  * 2) Call getNext() in a loop till it returns null
@@ -61,52 +60,50 @@ public class ReadToEndLoader extends Loa
      * the wrapped LoadFunc which will do the actual reading
      */
     private LoadFunc wrappedLoadFunc;
-    
+
     /**
      * the Configuration object used to locate the input location - this will
      * be used to call {@link LoadFunc#setLocation(String, Job)} on
      * the wrappedLoadFunc
      */
     private Configuration conf;
-    
+
     /**
      * the input location string (typically input file/dir name )
      */
     private String inputLocation;
-      
+
     /**
      * If the splits to be read are not in increasing sequence of integers
      * this array can be used
      */
     private int[] toReadSplits = null;
-    
+
     /**
      * index into toReadSplits
      */
     private int toReadSplitsIdx = 0;
-    
+
     /**
      * the index of the split the loader is currently reading from
      */
     private int curSplitIndex;
-    
+
     /**
      * the input splits returned by underlying {@link InputFormat#getSplits(JobContext)}
      */
     private List<InputSplit> inpSplits = null;
-    
+
     /**
      * underlying RecordReader
      */
     private RecordReader reader = null;
-    
+
     /**
      * underlying InputFormat
      */
     private InputFormat inputFormat = null;
-    
-    private PigContext pigContext;
-    
+
     private String udfContextSignature = null;
 
     /**
@@ -114,8 +111,8 @@ public class ReadToEndLoader extends Loa
      * @param conf
      * @param inputLocation
      * @param splitIndex
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex) throws IOException {
@@ -125,17 +122,7 @@ public class ReadToEndLoader extends Loa
         this.curSplitIndex = splitIndex;
         init();
     }
-    
-    public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
-            String inputLocation, int splitIndex, PigContext pigContext) throws IOException {
-        this.wrappedLoadFunc = wrappedLoadFunc;
-        this.inputLocation = inputLocation;
-        this.conf = conf;
-        this.curSplitIndex = splitIndex;
-        this.pigContext = pigContext;
-        init();
-    }
-    
+
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int splitIndex, String signature) throws IOException {
         this.udfContextSignature = signature;
@@ -147,14 +134,14 @@ public class ReadToEndLoader extends Loa
     }
 
     /**
-     * This constructor takes an array of split indexes (toReadSplitIdxs) of the 
+     * This constructor takes an array of split indexes (toReadSplitIdxs) of the
      * splits to be read.
      * @param wrappedLoadFunc
      * @param conf
      * @param inputLocation
      * @param toReadSplitIdxs
-     * @throws IOException 
-     * @throws InterruptedException 
+     * @throws IOException
+     * @throws InterruptedException
      */
     public ReadToEndLoader(LoadFunc wrappedLoadFunc, Configuration conf,
             String inputLocation, int[] toReadSplitIdxs) throws IOException {
@@ -166,21 +153,21 @@ public class ReadToEndLoader extends Loa
             toReadSplitIdxs.length > 0 ? toReadSplitIdxs[0] : Integer.MAX_VALUE;
         init();
     }
-    
+
     @SuppressWarnings("unchecked")
     private void init() throws IOException {
-        if (conf != null && pigContext != null) {
-            SchemaTupleBackend.initialize(conf, pigContext, true);
+        if (conf != null) {
+            SchemaTupleBackend.initialize(conf, true);
         }
 
         // make a copy so that if the underlying InputFormat writes to the
         // conf, we don't affect the caller's copy
         conf = new Configuration(conf);
 
-        // let's initialize the wrappedLoadFunc 
+        // let's initialize the wrappedLoadFunc
         Job job = new Job(conf);
         wrappedLoadFunc.setUDFContextSignature(this.udfContextSignature);
-        wrappedLoadFunc.setLocation(inputLocation, 
+        wrappedLoadFunc.setLocation(inputLocation,
                 job);
         // The above setLocation call could write to the conf within
         // the job - get a hold of the modified conf
@@ -191,10 +178,10 @@ public class ReadToEndLoader extends Loa
                     new JobID()));
         } catch (InterruptedException e) {
             throw new IOException(e);
-        }        
+        }
     }
 
-    private boolean initializeReader() throws IOException, 
+    private boolean initializeReader() throws IOException,
     InterruptedException {
         // Close the previous reader first
         if(reader != null) {
@@ -206,14 +193,14 @@ public class ReadToEndLoader extends Loa
             return false;
         }
         InputSplit curSplit = inpSplits.get(curSplitIndex);
-        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf, 
+        TaskAttemptContext tAContext = HadoopShims.createTaskAttemptContext(conf,
                 new TaskAttemptID());
         reader = inputFormat.createRecordReader(curSplit, tAContext);
         reader.initialize(curSplit, tAContext);
         // create a dummy pigsplit - other than the actual split, the other
         // params are really not needed here where we are just reading the
         // input completely
-        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1, 
+        PigSplit pigSplit = new PigSplit(new InputSplit[] {curSplit}, -1,
                 new ArrayList<OperatorKey>(), -1);
         // Set the conf object so that if the wrappedLoadFunc uses it,
         // it won't be null
@@ -244,7 +231,7 @@ public class ReadToEndLoader extends Loa
             throw new IOException(e);
         }
     }
-    
+
     private Tuple getNextHelper() throws IOException, InterruptedException {
         Tuple t = null;
         while(initializeReader()) {
@@ -258,8 +245,8 @@ public class ReadToEndLoader extends Loa
         }
         return null;
     }
-    
-    
+
+
     /**
      * Updates curSplitIndex , just increment if splitIndexes is null,
      * else get next split in splitIndexes
@@ -331,7 +318,7 @@ public class ReadToEndLoader extends Loa
              ((LoadMetadata) wrappedLoadFunc).setPartitionFilter(partitionFilter);
         }
     }
-    
+
     @Override
     public void setUDFContextSignature(String signature) {
         this.udfContextSignature = signature;

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/NodeIdGenerator.java Wed Feb 22 09:43:41 2017
@@ -20,43 +20,78 @@ package org.apache.pig.impl.plan;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 import com.google.common.annotations.VisibleForTesting;
 
+/**
+ * Generates IDs as long values in a thread safe manner. Each thread has its own generated IDs.
+ */
 public class NodeIdGenerator {
 
-    private Map<String, Long> scopeToIdMap;
-    private static NodeIdGenerator theGenerator = new NodeIdGenerator();
-
-    private NodeIdGenerator() {
-        scopeToIdMap = new HashMap<String, Long>();
-    }
-
+	/**
+	 * Holds a map of generated scoped-IDs per thread. Each map holds generated IDs per scope.
+	 */
+    private ThreadLocal<Map<String, AtomicLong>> scopeToIdMap
+        = new ThreadLocal<Map<String, AtomicLong>>() {
+            protected Map<String, AtomicLong> initialValue() {
+                return new HashMap<String,AtomicLong>();
+            }
+        };
+
+    /**
+     * Singleton instance.
+     */
+    private static final NodeIdGenerator theGenerator = new NodeIdGenerator();
+
+    /**
+     * Private default constructor to force singleton use-case of this class.
+     */
+    private NodeIdGenerator() {}
+
+    /**
+     * Returns the NodeIdGenerator singleton.
+     * @return
+     */
     public static NodeIdGenerator getGenerator() {
         return theGenerator;
     }
 
-    public long getNextNodeId(String scope) {
-        Long val = scopeToIdMap.get(scope);
-
-        long nextId = 0;
-
-        if (val != null) {
-            nextId = val.longValue();
-        }
-
-        scopeToIdMap.put(scope, nextId + 1);
-
-        return nextId;
+    /**
+     * Returns the next ID to be used for the current Thread.
+     * 
+     * @param scope
+     * @return
+     */
+    public long getNextNodeId(final String scope) {
+        // ThreadLocal usage protects us from having the same HashMap instance
+        // being used by several threads, so we can use it without synchronized
+        // blocks and still be thread-safe.
+        Map<String, AtomicLong> map = scopeToIdMap.get();
+
+        // the concurrent properties of the AtomicLong are useless here but
+        // since it cost less to use such an object rather than created a
+        // Long object instance each time we increment a counter ...
+        AtomicLong l = map.get(scope);
+        if ( l == null )
+            map.put( scope, l = new AtomicLong() );
+        return l.getAndIncrement();
     }
 
+    /**
+     * Reset the given scope IDs to 0 for the current Thread.
+     * @param scope
+     */
     @VisibleForTesting
-    public static void reset(String scope) {
-        theGenerator.scopeToIdMap.put(scope, 0L) ;
+    public static void reset(final String scope) {
+        theGenerator.scopeToIdMap.get().remove(scope);
     }
 
+    /**
+     * Reset all scope IDs to 0 for the current Thread.
+     */
     @VisibleForTesting
     public static void reset() {
-        theGenerator.scopeToIdMap.clear();
+        theGenerator.scopeToIdMap.remove();
     }
 }

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/ExecutableManager.java Wed Feb 22 09:43:41 2017
@@ -150,12 +150,13 @@ public class ExecutableManager {
 
         LOG.debug("Process exited with: " + exitCode);
         if (exitCode != SUCCESS) {
-            LOG.error(command + " failed with exit status: "
-                    + exitCode);
+            String errMsg = "'" + command.toString() + "'" + " failed with exit status: " + exitCode;
+            LOG.error(errMsg);
+            Result res = new Result(POStatus.STATUS_ERR, errMsg);
+            sendOutput(poStream.getBinaryOutputQueue(), res);
         }
 
-        if (outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
-
+        if (exitCode == SUCCESS && outputHandler.getOutputType() == OutputType.ASYNCHRONOUS) {
             // Trigger the outputHandler
             outputHandler.bindTo("", null, 0, -1);
 
@@ -178,10 +179,18 @@ public class ExecutableManager {
      * @param process the process to be killed
      * @throws IOException
      */
-    private void killProcess(Process process) throws IOException {
+    private void killProcess(Process process) {
         if (process != null) {
-            inputHandler.close(process);
-            outputHandler.close();
+            try {
+                inputHandler.close(process);
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing inputHandler. Ignoring:" + e.getMessage());
+            }
+            try {
+                outputHandler.close();
+            } catch (Exception e) {
+                LOG.info("Exception in killProcess while closing outputHandler. Ignoring:" + e.getMessage());
+            }
             process.destroy();
         }
     }
@@ -334,7 +343,7 @@ public class ExecutableManager {
                                 // we will only call close() here and not
                                 // worry about deducing whether the process died
                                 // normally or abnormally - if there was any real
-                                // issue the ProcessOutputThread should see
+                                // issue we should see
                                 // a non zero exit code from the process and send
                                 // a POStatus.STATUS_ERR back - what if we got
                                 // an IOException because there was only an issue with
@@ -344,14 +353,6 @@ public class ExecutableManager {
                                 return;
                             } else {
                                 // asynchronous case - then this is a real exception
-                                LOG.error("Exception while trying to write to stream binary's input", e);
-                                // send POStatus.STATUS_ERR to POStream to signal the error
-                                // Generally the ProcessOutputThread would do this but now
-                                // we should do it here since neither the process nor the
-                                // ProcessOutputThread will ever be spawned
-                                Result res = new Result(POStatus.STATUS_ERR,
-                                        "Exception while trying to write to stream binary's input" + e.getMessage());
-                                sendOutput(poStream.getBinaryOutputQueue(), res);
                                 throw e;
                             }
                         }
@@ -362,13 +363,13 @@ public class ExecutableManager {
             } catch (Throwable t) {
                 // Note that an error occurred
                 outerrThreadsError = t;
-                LOG.error( "Error while reading from POStream and " +
-                           "passing it to the streaming process", t);
-                try {
-                    killProcess(process);
-                } catch (IOException ioe) {
-                    LOG.warn(ioe);
-                }
+                Result res = new Result(POStatus.STATUS_ERR,
+                                        "Error while reading from POStream and " +
+                                        "passing it to the streaming process:" + t.getMessage());
+                LOG.error("Error while reading from POStream and " +
+                          "passing it to the streaming process:", t);
+                sendOutput(poStream.getBinaryOutputQueue(), res);
+                killProcess(process);
             }
         }
     }
@@ -452,13 +453,7 @@ public class ExecutableManager {
                 try {
                     exitCode = process.waitFor();
                 } catch (InterruptedException ie) {
-                    try {
-                        killProcess(process);
-                    } catch (IOException e) {
-                        LOG.warn("Exception trying to kill process while processing null output " +
-                                "from binary", e);
-
-                    }
+                    killProcess(process);
                     // signal error
                     String errMsg = "Failure while waiting for process (" + command.toString() + ")" +
                             ie.getMessage();

Modified: pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/streaming/OutputHandler.java Wed Feb 22 09:43:41 2017
@@ -175,8 +175,10 @@ public abstract class OutputHandler {
      */
     public synchronized void close() throws IOException {
         if(!alreadyClosed) {
-            istream.close();
-            istream = null;
+            if( istream != null ) {
+                istream.close();
+                istream = null;
+            }
             alreadyClosed = true;
         }
     }

Modified: pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java?rev=1783988&r1=1783987&r2=1783988&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/util/JarManager.java Wed Feb 22 09:43:41 2017
@@ -47,7 +47,6 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
 import org.apache.tools.bzip2r.BZip2Constants;
 import org.joda.time.DateTime;
@@ -66,7 +65,6 @@ public class JarManager {
         BZIP2R(BZip2Constants.class),
         AUTOMATON(Automaton.class),
         ANTLR(CommonTokenStream.class),
-        GUAVA(Multimaps.class),
         JODATIME(DateTime.class);
 
         private final Class pkgClass;
@@ -208,11 +206,8 @@ public class JarManager {
     public static List<String> getDefaultJars() {
         List<String> defaultJars = new ArrayList<String>();
         for (DefaultPigPackages pkgToSend : DefaultPigPackages.values()) {
-            if(pkgToSend.equals(DefaultPigPackages.GUAVA) && HadoopShims.isHadoopYARN()) {
-                continue; //Skip
-            }
             String jar = findContainingJar(pkgToSend.getPkgClass());
-            if (!defaultJars.contains(jar)) {
+            if (jar != null && !defaultJars.contains(jar)) {
                 defaultJars.add(jar);
             }
         }



Mime
View raw message