pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From z..@apache.org
Subject svn commit: r1784237 [10/22] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/apachel...
Date Fri, 24 Feb 2017 08:19:46 GMT
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Feb 24 08:19:42 2017
@@ -62,7 +62,6 @@ import org.apache.tez.dag.api.EdgeProper
  */
 public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
 
-    static private int maxTaskCount;
     static final double DEFAULT_FLATTEN_FACTOR = 10;
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
@@ -76,6 +75,8 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_AGGREGATION_FACTOR = 0.7;
 
     private PigContext pc;
+    private int maxTaskCount;
+    private long bytesPerReducer;
 
     @Override
     public void setPigContext(PigContext pc) {
@@ -94,16 +95,18 @@ public class TezOperDependencyParallelis
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
-        // If parallelism is set explicitly, respect it
-        if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
-            return tezOper.getRequestedParallelism();
-        }
+        bytesPerReducer = conf.getLong(PigReducerEstimator.BYTES_PER_REDUCER_PARAM, PigReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
 
         // If we have already estimated parallelism, use that one
-        if (tezOper.getEstimatedParallelism()!=-1) {
+        if (tezOper.getEstimatedParallelism() != -1) {
             return tezOper.getEstimatedParallelism();
         }
 
+        // If parallelism is set explicitly, respect it
+        if (!tezOper.isIntermediateReducer() && tezOper.getRequestedParallelism()!=-1) {
+            return tezOper.getRequestedParallelism();
+        }
+
         List<TezOperator> preds = plan.getPredecessors(tezOper);
         if (preds==null) {
             throw new IOException("Cannot estimate parallelism for source vertex");
@@ -130,6 +133,12 @@ public class TezOperDependencyParallelis
                 boolean applyFactor = !tezOper.isUnion();
                 if (!pred.isVertexGroup() && applyFactor) {
                     predParallelism = predParallelism * pred.getParallelismFactor(tezOper);
+                    if (pred.getTotalInputFilesSize() > 0) {
+                        // Estimate similar to mapreduce and use the maximum of two
+                        int parallelismBySize = (int) Math.ceil((double) pred
+                                .getTotalInputFilesSize() / bytesPerReducer);
+                        predParallelism = Math.max(predParallelism, parallelismBySize);
+                    }
                 }
                 estimatedParallelism += predParallelism;
             }
@@ -157,9 +166,7 @@ public class TezOperDependencyParallelis
         }
 
         if (roundedEstimatedParallelism == 0) {
-            throw new IOException("Estimated parallelism for "
-                    + tezOper.getOperatorKey().toString()
-                    + " is 0 which is unexpected");
+            roundedEstimatedParallelism = 1; // We need to produce empty output file
         }
 
         return roundedEstimatedParallelism;
@@ -196,7 +203,7 @@ public class TezOperDependencyParallelis
             if (successor != null) {
                 // Map side combiner
                 TezEdgeDescriptor edge = tezOp.outEdges.get(successor.getOperatorKey());
-                if (!edge.combinePlan.isEmpty()) {
+                if (!edge.combinePlan.isEmpty() || edge.needsDistinctCombiner()) {
                     if (successor.isDistinct()) {
                         factor = DEFAULT_DISTINCT_FACTOR;
                     } else {

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Feb 24 08:19:42 2017
@@ -29,6 +29,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -44,6 +45,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -52,7 +54,6 @@ import org.apache.pig.builtin.AvroStorag
 import org.apache.pig.builtin.JsonStorage;
 import org.apache.pig.builtin.OrcStorage;
 import org.apache.pig.builtin.PigStorage;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.mock.Storage;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -108,6 +109,12 @@ public class UnionOptimizer extends TezO
         if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
             return false;
         }
+
+        // If user has specified a PARALLEL clause with the union operator
+        // turn off union optimization
+        if (tezOp.getRequestedParallelism() != -1) {
+            return false;
+        }
         // Two vertices separately ranking with 1 to n and writing to output directly
         // will make each rank repeate twice which is wrong. Rank always needs to be
         // done from single vertex to have the counting correct.
@@ -120,10 +127,25 @@ public class UnionOptimizer extends TezO
     public static boolean isOptimizableStoreFunc(TezOperator tezOp,
             List<String> supportedStoreFuncs, List<String> unsupportedStoreFuncs)
             throws VisitorException {
-        if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
-            List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
-            for (POStoreTez store : stores) {
-                String name = store.getStoreFunc().getClass().getName();
+        List<POStoreTez> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStoreTez.class);
+
+        for (POStoreTez store : stores) {
+            String name = store.getStoreFunc().getClass().getName();
+            if (store.getStoreFunc() instanceof StoreFunc) {
+                StoreFunc func = (StoreFunc) store.getStoreFunc();
+                if (func.supportsParallelWriteToStoreLocation() != null) {
+                    if (func.supportsParallelWriteToStoreLocation()) {
+                        continue;
+                    } else {
+                        LOG.warn(name + " does not support union optimization."
+                                + " Disabling it. There will be some performance degradation.");
+                        return false;
+                    }
+                }
+            }
+            // If StoreFunc does not explicitly state support, then check supported and
+            // unsupported config settings.
+            if (supportedStoreFuncs != null || unsupportedStoreFuncs != null) {
                 if (unsupportedStoreFuncs != null
                         && unsupportedStoreFuncs.contains(name)) {
                     return false;
@@ -237,8 +259,23 @@ public class UnionOptimizer extends TezO
                 for (TezOperator succ : successors) {
                     if (succ.isVertexGroup() && unionStoreOutputs.get(i).getSFile().equals(succ.getVertexGroupInfo().getSFile())) {
                         existingVertexGroup = succ;
+                        break;
+                    }
+                }
+            }
+            if (existingVertexGroup == null) {
+                // In the case of union + split + union + store, the different stores in the Split
+                // will be writing to same location after second union operator is optimized.
+                // So while optimizing the first union, we should just make it write to one vertex group
+                for (int j = 0; j < i; j++) {
+                    if (unionStoreOutputs.get(i).getSFile().equals(storeVertexGroupOps[j].getVertexGroupInfo().getSFile())) {
+                        storeVertexGroupOps[i] = storeVertexGroupOps[j];
+                        break;
                     }
                 }
+                if (storeVertexGroupOps[i] != null) {
+                    continue;
+                }
             }
             if (existingVertexGroup != null) {
                 storeVertexGroupOps[i] = existingVertexGroup;
@@ -270,6 +307,15 @@ public class UnionOptimizer extends TezO
         TezOperator[] outputVertexGroupOps = new TezOperator[unionOutputKeys.size()];
         String[] newOutputKeys = new String[unionOutputKeys.size()];
         for (int i=0; i < outputVertexGroupOps.length; i++) {
+            for (int j = 0; j < i; j++) {
+                if (unionOutputKeys.get(i).equals(unionOutputKeys.get(j))) {
+                    outputVertexGroupOps[i] = outputVertexGroupOps[j];
+                    break;
+                }
+            }
+            if (outputVertexGroupOps[i] != null) {
+                continue;
+            }
             outputVertexGroupOps[i] = new TezOperator(OperatorKey.genOpKey(scope));
             outputVertexGroupOps[i].setVertexGroupInfo(new VertexGroupInfo());
             outputVertexGroupOps[i].getVertexGroupInfo().setOutput(unionOutputKeys.get(i));
@@ -515,15 +561,24 @@ public class UnionOptimizer extends TezO
         // Connect predecessor to the storeVertexGroups
         int i = 0;
         for (TezOperator storeVertexGroup : storeVertexGroupOps) {
+            // Skip connecting if they are already connected. Can happen in case of
+            // union + split + union + store. Because of the split all the stores
+            // will be writing to same location
+            List<OperatorKey> inputs = storeVertexGroup.getVertexGroupInfo().getInputs();
+            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
+                tezPlan.connect(pred, storeVertexGroup);
+            }
             storeVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
             pred.addVertexGroupStore(clonedUnionStoreOutputs.get(i++).getOperatorKey(),
                     storeVertexGroup.getOperatorKey());
-            tezPlan.connect(pred, storeVertexGroup);
         }
 
         for (TezOperator outputVertexGroup : outputVertexGroupOps) {
+            List<OperatorKey> inputs = outputVertexGroup.getVertexGroupInfo().getInputs();
+            if (inputs == null || !inputs.contains(pred.getOperatorKey())) {
+                tezPlan.connect(pred, outputVertexGroup);
+            }
             outputVertexGroup.getVertexGroupInfo().addInput(pred.getOperatorKey());
-            tezPlan.connect(pred, outputVertexGroup);
         }
 
         copyOperatorProperties(pred, unionOp);
@@ -568,7 +623,7 @@ public class UnionOptimizer extends TezO
             // more union predecessors. Change it to SCATTER_GATHER
             if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                 edge.dataMovementType = DataMovementType.SCATTER_GATHER;
-                edge.partitionerClass = RoundRobinPartitioner.class;
+                edge.partitionerClass = HashValuePartitioner.class;
                 edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
                 edge.inputClassName = UnorderedKVInput.class.getName();
             }

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java?rev=1784237&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/HashValuePartitioner.java Fri Feb 24 08:19:42 2017
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.Partitioner;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+
+public class HashValuePartitioner extends Partitioner<Writable, Writable> {
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public int getPartition(Writable key, Writable value, int numPartitions) {
+        int hash = 17;
+        Tuple tuple;
+        if (value instanceof Tuple) {
+            // union optimizer turned off
+            tuple = (Tuple) value;
+        } else {
+            // union followed by order by or skewed join
+            tuple = (Tuple)((NullableTuple) value).getValueAsPigType();
+        }
+        if (tuple != null) {
+            for (Object o : tuple.getAll()) {
+                if (o != null) {
+                    // Skip computing hashcode for bags.
+                    // Order of elements in the map/bag may be different on each run
+                    // Can't even include size as some DataBag implementations
+                    // iterate through all elements in the bag to get the size.
+                    if (o instanceof DataBag) {
+                        hash = 31 * hash;
+                    } else {
+                        hash = 31 * hash + o.hashCode();
+                    }
+                }
+            }
+        }
+        return (hash & Integer.MAX_VALUE) % numPartitions;
+    }
+
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PartitionerDefinedVertexManager.java Fri Feb 24 08:19:42 2017
@@ -17,23 +17,25 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.runtime;
 
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.tez.dag.api.EdgeManagerPluginDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.VertexManagerPlugin;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
 import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
-import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.dag.api.event.VertexState;
+import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.runtime.api.Event;
 import org.apache.tez.runtime.api.events.VertexManagerEvent;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 /**
@@ -46,8 +48,13 @@ import com.google.common.collect.Lists;
 public class PartitionerDefinedVertexManager extends VertexManagerPlugin {
     private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
 
-    private boolean isParallelismSet = false;
+    private volatile boolean parallelismSet;
     private int dynamicParallelism = -1;
+    private int numConfiguredSources;
+    private int numSources = -1;
+    private volatile boolean configured;
+    private volatile boolean started;
+    private volatile boolean scheduled;
 
     public PartitionerDefinedVertexManager(VertexManagerPluginContext context) {
         super(context);
@@ -55,7 +62,31 @@ public class PartitionerDefinedVertexMan
 
     @Override
     public void initialize() {
-        // Nothing to do
+        // this will prevent vertex from starting until we notify we are done
+        getContext().vertexReconfigurationPlanned();
+        parallelismSet = false;
+        numConfiguredSources = 0;
+        configured = false;
+        started = false;
+        numSources = getContext().getInputVertexEdgeProperties().size();
+        // wait for sources and self to start
+        Map<String, EdgeProperty> edges = getContext().getInputVertexEdgeProperties();
+        for (String entry : edges.keySet()) {
+            getContext().registerForVertexStateUpdates(entry, EnumSet.of(VertexState.CONFIGURED));
+        }
+    }
+
+    @Override
+    public synchronized void onVertexStateUpdated(VertexStateUpdate stateUpdate)
+            throws Exception {
+        numConfiguredSources++;
+        LOG.info("For vertex: " + getContext().getVertexName() + " Received configured signal from: "
+            + stateUpdate.getVertexName() + " numConfiguredSources: " + numConfiguredSources
+            + " needed: " + numSources);
+        Preconditions.checkState(numConfiguredSources <= numSources, "Vertex: " + getContext().getVertexName());
+        if (numConfiguredSources == numSources) {
+            configure();
+        }
     }
 
     @Override
@@ -73,10 +104,9 @@ public class PartitionerDefinedVertexMan
     public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) throws Exception {
         // There could be multiple partition vertex sending VertexManagerEvent
         // Only need to setVertexParallelism once
-        if (isParallelismSet) {
+        if (parallelismSet) {
             return;
         }
-        isParallelismSet = true;
         // Need to distinguish from VertexManagerEventPayloadProto emitted by OrderedPartitionedKVOutput
         if (vmEvent.getUserPayload().limit()==4) {
             dynamicParallelism = vmEvent.getUserPayload().getInt();
@@ -96,18 +126,50 @@ public class PartitionerDefinedVertexMan
                     edgeManagers.put(entry.getKey(), edge);
                 }
                 getContext().reconfigureVertex(dynamicParallelism, null, edgeManagers);
+                parallelismSet = true;
+                configure();
             }
         }
     }
 
-    @Override
-    public void onVertexStarted(Map<String, List<Integer>> completions) {
-        if (dynamicParallelism != -1) {
+    private void configure() {
+        if(parallelismSet && (numSources == numConfiguredSources)) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Done reconfiguring vertex " + getContext().getVertexName());
+            }
+            getContext().doneReconfiguringVertex();
+            configured = true;
+            trySchedulingTasks();
+        }
+    }
+
+    private synchronized void trySchedulingTasks() {
+        if (configured && started && !scheduled) {
+            LOG.info("Scheduling " + dynamicParallelism + " tasks for vertex " + getContext().getVertexName());
             List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
-            for (int i=0; i<dynamicParallelism; ++i) {
+            for (int i = 0; i < dynamicParallelism; ++i) {
                 tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
             }
             getContext().scheduleVertexTasks(tasksToStart);
+            scheduled = true;
         }
     }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+        // This vertex manager will be getting the following calls
+        //   1) onVertexManagerEventReceived - Parallelism vertex manager event sent by sample aggregator vertex
+        //   2) onVertexStateUpdated - Vertex CONFIGURED status updates from
+        //       - Order by Partitioner vertex (1-1) in case of Order by
+        //       - Skewed Join Left Partitioner (1-1) and Right Input Vertices in case of SkewedJoin
+        //   3) onVertexStarted
+        // Calls 2) and 3) can happen in any order. So we should schedule tasks
+        // only after start is called and configuration is also complete
+        started = true;
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Vertex start received for " + getContext().getVertexName());
+        }
+        trySchedulingTasks();
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigGraceShuffleVertexManager.java Fri Feb 24 08:19:42 2017
@@ -33,15 +33,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezEstimatedParallelismClearer;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.EdgeProperty;
-import org.apache.tez.dag.api.TezException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.TezUncheckedException;
 import org.apache.tez.dag.api.VertexManagerPluginContext;
-import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.event.VertexState;
 import org.apache.tez.dag.api.event.VertexStateUpdate;
 import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
@@ -72,7 +72,7 @@ public class PigGraceShuffleVertexManage
             conf = TezUtils.createConfFromUserPayload(getContext().getUserPayload());
             bytesPerTask = conf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
-            pc = (PigContext)ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+            pc = (PigContext)ObjectSerializer.deserialize(conf.get(PigImplConstants.PIG_CONTEXT));
             tezPlan = (TezOperPlan)ObjectSerializer.deserialize(conf.get("pig.tez.plan"));
             TezEstimatedParallelismClearer clearer = new TezEstimatedParallelismClearer(tezPlan);
             try {
@@ -81,9 +81,10 @@ public class PigGraceShuffleVertexManage
                 throw new TezUncheckedException(e);
             }
             TezOperator op = tezPlan.getOperator(OperatorKey.fromString(getContext().getVertexName()));
-    
+
             // Collect grandparents of the vertex
-            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() { 
+            Function<TezOperator, String> tezOpToString = new Function<TezOperator, String>() {
+                @Override
                 public String apply(TezOperator op) { return op.getOperatorKey().toString(); }
             };
             grandParents = Lists.transform(TezOperPlan.getGrandParentsForGraceParallelism(tezPlan, op), tezOpToString);
@@ -135,7 +136,7 @@ public class PigGraceShuffleVertexManage
         // Now one of the predecessor is about to start, we need to make a decision now
         if (anyPredAboutToStart) {
             // All grandparents finished, start parents with right parallelism
-            
+
             for (TezOperator pred : preds) {
                 if (pred.getRequestedParallelism()==-1) {
                     List<TezOperator> predPreds = tezPlan.getPredecessors(pred);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/PigProcessor.java Fri Feb 24 08:19:42 2017
@@ -25,6 +25,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.TaskAttemptID;
 import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.log4j.PropertyConfigurator;
 import org.apache.pig.JVMReuseImpl;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
@@ -39,6 +41,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigHadoopLogger;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.ProgressableReporter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.UDFFinishVisitor;
@@ -53,6 +56,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -132,7 +136,11 @@ public class PigProcessor extends Abstra
         SpillableMemoryManager.getInstance().configure(conf);
         PigContext.setPackageImportList((ArrayList<String>) ObjectSerializer
                 .deserialize(conf.get("udf.import.list")));
-        PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
+        Properties log4jProperties = (Properties) ObjectSerializer
+                .deserialize(conf.get(PigImplConstants.PIG_LOG4J_PROPERTIES));
+        if (log4jProperties != null) {
+            PropertyConfigurator.configure(log4jProperties);
+        }
 
         // To determine front-end in UDFContext
         conf.set(MRConfiguration.JOB_APPLICATION_ATTEMPT_ID, getContext().getUniqueIdentifier());
@@ -151,6 +159,12 @@ public class PigProcessor extends Abstra
         conf.setInt(JobContext.TASK_PARTITION,
               taskAttemptId.getTaskID().getId());
         conf.set(JobContext.ID, taskAttemptId.getJobID().toString());
+        if (conf.get(PigInputFormat.PIG_INPUT_LIMITS) != null) {
+            // Has Load and is a root vertex
+            conf.setInt(JobContext.NUM_MAPS, getContext().getVertexParallelism());
+        } else {
+            conf.setInt(JobContext.NUM_REDUCES, getContext().getVertexParallelism());
+        }
 
         conf.set(PigConstants.TASK_INDEX, Integer.toString(getContext().getTaskIndex()));
         UDFContext.getUDFContext().addJobConf(conf);
@@ -158,7 +172,7 @@ public class PigProcessor extends Abstra
 
         String execPlanString = conf.get(PLAN);
         execPlan = (PhysicalPlan) ObjectSerializer.deserialize(execPlanString);
-        SchemaTupleBackend.initialize(conf, pc);
+        SchemaTupleBackend.initialize(conf);
         PigMapReduce.sJobContext = HadoopShims.createJobContext(conf, new org.apache.hadoop.mapreduce.JobID());
 
         // Set the job conf as a thread-local member of PigMapReduce
@@ -167,7 +181,7 @@ public class PigProcessor extends Abstra
 
         Utils.setDefaultTimeZone(conf);
 
-        boolean aggregateWarning = "true".equalsIgnoreCase(pc.getProperties().getProperty("aggregate.warning"));
+        boolean aggregateWarning = "true".equalsIgnoreCase(conf.get("aggregate.warning"));
         PigStatusReporter pigStatusReporter = PigStatusReporter.getInstance();
         pigStatusReporter.setContext(new TezTaskContext(getContext()));
         pigHadoopLogger = PigHadoopLogger.getInstance();

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java Fri Feb 24 08:19:42 2017
@@ -43,6 +43,15 @@ public interface TezInput {
      */
     public void addInputsToSkip(Set<String> inputsToSkip);
 
+    /**
+     * Attach the inputs to the operator. Also ensure reader.next() is called to force fetch
+     * the input so that all inputs are fetched and memory released before memory is allocated
+     * for outputs
+     *
+     * @param inputs available inputs
+     * @param conf configuration
+     * @throws ExecException
+     */
     public void attachInputs(Map<String, LogicalInput> inputs,
             Configuration conf) throws ExecException;
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/WeightedRangePartitionerTez.java Fri Feb 24 08:19:42 2017
@@ -23,6 +23,7 @@ import java.util.Map.Entry;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -30,6 +31,7 @@ import org.apache.pig.data.InternalMap;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.UDFContext;
 import org.apache.tez.runtime.library.common.ConfigUtils;
 
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
@@ -64,11 +66,13 @@ public class WeightedRangePartitionerTez
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
             estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
+            long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
                 float[] probVec = getProbVec((Tuple) ent.getValue());
                 weightedParts.put(getPigNullableWritable(key),
-                        new DiscreteProbabilitySampleGenerator(probVec));
+                        new DiscreteProbabilitySampleGenerator(randomSeed, probVec));
             }
         } catch (Exception e) {
             throw new RuntimeException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java Fri Feb 24 08:19:42 2017
@@ -50,6 +50,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.classification.InterfaceAudience;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfo;
 import org.apache.tez.mapreduce.hadoop.InputSplitInfoDisk;
@@ -102,7 +103,6 @@ public class MRToTezHelper {
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_SPECULATIVE, TezConfiguration.TEZ_AM_SPECULATION_ENABLED);
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.REDUCE_LOG_LEVEL, TezConfiguration.TEZ_TASK_LOG_LEVEL);
         mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.reduce.limit", "tez.am.vertex.max-task-concurrency");
-        mrReduceParamToTezVertexParamMap.put("mapreduce.job.running.map.limit", "tez.am.vertex.max-task-concurrency");
         mrReduceParamToTezVertexParamMap.put(MRJobConfig.TASK_TIMEOUT, "tez.am.progress.stuck.interval-ms");
     }
 
@@ -165,11 +165,7 @@ public class MRToTezHelper {
                     continue;
                 }
             }
-            if (key.startsWith("dfs.datanode")) {
-                tezConf.unset(key);
-            } else if (key.startsWith("dfs.namenode")) {
-                tezConf.unset(key);
-            } else if (key.startsWith("yarn.nodemanager")) {
+            if (key.startsWith("yarn.nodemanager")) {
                 tezConf.unset(key);
             } else if (key.startsWith("mapreduce.jobhistory")) {
                 tezConf.unset(key);
@@ -181,20 +177,15 @@ public class MRToTezHelper {
         }
     }
 
-    public static TezConfiguration getDAGAMConfFromMRConf(
-            Configuration tezConf) {
-
-        // Set Tez parameters based on MR parameters.
-        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
-
+    public static void translateMRSettingsForTezAM(TezConfiguration dagAMConf) {
 
         convertMRToTezConf(dagAMConf, dagAMConf, DeprecatedKeys.getMRToDAGParamMap());
         convertMRToTezConf(dagAMConf, dagAMConf, mrAMParamToTezAMParamMap);
 
-        String env = tezConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
-        if (tezConf.get(MRJobConfig.MR_AM_ENV) != null) {
-            env = (env == null) ? tezConf.get(MRJobConfig.MR_AM_ENV)
-                                : env + "," + tezConf.get(MRJobConfig.MR_AM_ENV);
+        String env = dagAMConf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV);
+        if (dagAMConf.get(MRJobConfig.MR_AM_ENV) != null) {
+            env = (env == null) ? dagAMConf.get(MRJobConfig.MR_AM_ENV)
+                                : env + "," + dagAMConf.get(MRJobConfig.MR_AM_ENV);
         }
 
         if (env != null) {
@@ -203,24 +194,23 @@ public class MRToTezHelper {
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 org.apache.tez.mapreduce.hadoop.MRHelpers
-                        .getJavaOptsForMRAM(tezConf));
+                        .getJavaOptsForMRAM(dagAMConf));
 
-        String queueName = tezConf.get(JobContext.QUEUE_NAME,
+        String queueName = dagAMConf.get(JobContext.QUEUE_NAME,
                 YarnConfiguration.DEFAULT_QUEUE_NAME);
         dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_VIEW_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_VIEW_JOB, MRJobConfig.DEFAULT_JOB_ACL_VIEW_JOB));
 
         dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MODIFY_ACLS,
-                tezConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
+                dagAMConf.get(MRJobConfig.JOB_ACL_MODIFY_JOB, MRJobConfig.DEFAULT_JOB_ACL_MODIFY_JOB));
 
         // Hardcoding at AM level instead of setting per vertex till TEZ-2710 is available
         dagAMConf.setIfUnset(TezConfiguration.TEZ_TASK_SCALE_MEMORY_RESERVE_FRACTION, "0.5");
 
         removeUnwantedSettings(dagAMConf, true);
 
-        return dagAMConf;
     }
 
     /**
@@ -263,6 +253,14 @@ public class MRToTezHelper {
         JobControlCompiler.configureCompression(tezConf);
         convertMRToTezConf(tezConf, mrConf, DeprecatedKeys.getMRToTezRuntimeParamMap());
         removeUnwantedSettings(tezConf, false);
+
+        // ShuffleVertexManager Plugin settings
+        // DeprecatedKeys.getMRToTezRuntimeParamMap() only translates min and not max
+        String slowStartFraction = mrConf.get(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART);
+        if (slowStartFraction != null) {
+            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MIN_SRC_FRACTION, slowStartFraction);
+            tezConf.setIfUnset(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_MAX_SRC_FRACTION, slowStartFraction);
+        }
     }
 
     /**

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Fri Feb 24 08:19:42 2017
@@ -36,13 +36,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POFRJoinTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POStoreTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.runtime.HashValuePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
-import org.apache.pig.builtin.RoundRobinPartitioner;
 import org.apache.pig.builtin.TOBAG;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.TupleFactory;
@@ -198,8 +199,8 @@ public class TezCompilerUtil {
 
     public static boolean isNonPackageInput(String inputKey, TezOperator tezOp) throws PlanException {
         try {
-            List<TezInput> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, TezInput.class);
-            for (TezInput input : inputs) {
+            List<POFRJoinTez> inputs = PlanHelper.getPhysicalOperators(tezOp.plan, POFRJoinTez.class);
+            for (POFRJoinTez input : inputs) {
                 if (ArrayUtils.contains(input.getTezInputs(), inputKey)) {
                     return true;
                 }
@@ -269,7 +270,7 @@ public class TezCompilerUtil {
         } else if (dataMovementType == DataMovementType.SCATTER_GATHER) {
             edge.outputClassName = UnorderedPartitionedKVOutput.class.getName();
             edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
+            edge.partitionerClass = HashValuePartitioner.class;
         }
         edge.setIntermediateOutputKeyClass(POValueOutputTez.EmptyWritable.class.getName());
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/MapRedUtil.java Fri Feb 24 08:19:42 2017
@@ -70,7 +70,7 @@ public class MapRedUtil {
     private static Log log = LogFactory.getLog(MapRedUtil.class);
     private static final TupleFactory tf = TupleFactory.getInstance();
 
-    public static final String FILE_SYSTEM_NAME = "fs.default.name";
+    public static final String FILE_SYSTEM_NAME = FileSystem.FS_DEFAULT_NAME_KEY;
 
     /**
      * Loads the key distribution sampler file
@@ -301,7 +301,7 @@ public class MapRedUtil {
     /**
      * Returns the total number of bytes for this file, or if a directory all
      * files in the directory.
-     * 
+     *
      * @param fs FileSystem
      * @param status FileStatus
      * @param max Maximum value of total length that will trigger exit. Many

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/hbase/HBaseStorage.java Fri Feb 24 08:19:42 2017
@@ -18,7 +18,6 @@ package org.apache.pig.backend.hadoop.hb
 
 import java.io.IOException;
 import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
 import java.lang.reflect.UndeclaredThrowableException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
@@ -65,6 +64,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableSplit;
+import org.apache.hadoop.hbase.security.token.TokenUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
@@ -86,7 +86,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.StoreResources;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.hbase.HBaseTableInputFormat.HBaseTableIFBuilder;
 import org.apache.pig.builtin.FuncUtils;
 import org.apache.pig.builtin.Utf8StorageConverter;
@@ -597,7 +596,9 @@ public class HBaseStorage extends LoadFu
                             new BinaryComparator(colInfo.getColumnName())));
                 }
             }
-            thisColumnGroupFilter.addFilter(columnFilters);
+            if (columnFilters.getFilters().size() != 0) {
+                thisColumnGroupFilter.addFilter(columnFilters);
+            }
             allColumnFilters.addFilter(thisColumnGroupFilter);
         }
         if (allColumnFilters != null) {
@@ -792,46 +793,35 @@ public class HBaseStorage extends LoadFu
     public List<String> getShipFiles() {
         // Depend on HBase to do the right thing when available, as of HBASE-9165
         try {
-            Method addHBaseDependencyJars =
-              TableMapReduceUtil.class.getMethod("addHBaseDependencyJars", Configuration.class);
-            if (addHBaseDependencyJars != null) {
-                Configuration conf = new Configuration();
-                addHBaseDependencyJars.invoke(null, conf);
-                if (conf.get("tmpjars") != null) {
-                    String[] tmpjars = conf.getStrings("tmpjars");
-                    List<String> shipFiles = new ArrayList<String>(tmpjars.length);
-                    for (String tmpjar : tmpjars) {
-                        shipFiles.add(new URL(tmpjar).getPath());
-                    }
-                    return shipFiles;
+            Configuration conf = new Configuration();
+            TableMapReduceUtil.addHBaseDependencyJars(conf);
+            if (conf.get("tmpjars") != null) {
+                String[] tmpjars = conf.getStrings("tmpjars");
+                List<String> shipFiles = new ArrayList<String>(tmpjars.length);
+                for (String tmpjar : tmpjars) {
+                    shipFiles.add(new URL(tmpjar).getPath());
                 }
+                return shipFiles;
+            }
+        } catch (IOException e) {
+            if(e instanceof MalformedURLException){
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
+                        + " had malformed url. Falling back to previous logic.", e);
+            }else {
+                LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
+                        + " failed. Falling back to previous logic.", e);
             }
-        } catch (NoSuchMethodException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars not available."
-              + " Falling back to previous logic.", e);
-        } catch (IllegalAccessException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " not permitted. Falling back to previous logic.", e);
-        } catch (InvocationTargetException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars invocation"
-              + " failed. Falling back to previous logic.", e);
-        } catch (MalformedURLException e) {
-            LOG.debug("TableMapReduceUtils#addHBaseDependencyJars tmpjars"
-                    + " had malformed url. Falling back to previous logic.", e);
         }
 
         List<Class> classList = new ArrayList<Class>();
         classList.add(org.apache.hadoop.hbase.client.HTable.class); // main hbase jar or hbase-client
         classList.add(org.apache.hadoop.hbase.mapreduce.TableSplit.class); // main hbase jar or hbase-server
-        if (!HadoopShims.isHadoopYARN()) { //Avoid shipping duplicate. Hadoop 0.23/2 itself has guava
-            classList.add(com.google.common.collect.Lists.class); // guava
-        }
         classList.add(org.apache.zookeeper.ZooKeeper.class); // zookeeper
         // Additional jars that are specific to v0.95.0+
         addClassToList("org.cloudera.htrace.Trace", classList); // htrace
         addClassToList("org.apache.hadoop.hbase.protobuf.generated.HBaseProtos", classList); // hbase-protocol
         addClassToList("org.apache.hadoop.hbase.TableName", classList); // hbase-common
-        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compar
+        addClassToList("org.apache.hadoop.hbase.CompatibilityFactory", classList); // hbase-hadoop-compat
         addClassToList("org.jboss.netty.channel.ChannelFactory", classList); // netty
         return FuncUtils.getShipFiles(classList);
     }
@@ -882,27 +872,13 @@ public class HBaseStorage extends LoadFu
         }
 
         if ("kerberos".equalsIgnoreCase(hbaseConf.get(HBASE_SECURITY_CONF_KEY))) {
-            // Will not be entering this block for 0.20.2 as it has no security.
             try {
-                // getCurrentUser method is not public in 0.20.2
-                Method m1 = UserGroupInformation.class.getMethod("getCurrentUser");
-                UserGroupInformation currentUser = (UserGroupInformation) m1.invoke(null,(Object[]) null);
-                // hasKerberosCredentials method not available in 0.20.2
-                Method m2 = UserGroupInformation.class.getMethod("hasKerberosCredentials");
-                boolean hasKerberosCredentials = (Boolean) m2.invoke(currentUser, (Object[]) null);
-                if (hasKerberosCredentials) {
-                    // Class and method are available only from 0.92 security release
-                    Class tokenUtilClass = Class
-                            .forName("org.apache.hadoop.hbase.security.token.TokenUtil");
-                    Method m3 = tokenUtilClass.getMethod("obtainTokenForJob", new Class[] {
-                            Configuration.class, UserGroupInformation.class, Job.class });
-                    m3.invoke(null, new Object[] { hbaseConf, currentUser, job });
+                UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
+                if (currentUser.hasKerberosCredentials()) {
+                    TokenUtil.obtainTokenForJob(hbaseConf,currentUser,job);
                 } else {
                     LOG.info("Not fetching hbase delegation token as no Kerberos TGT is available");
                 }
-            } catch (ClassNotFoundException cnfe) {
-                throw new RuntimeException("Failure loading TokenUtil class, "
-                        + "is secure RPC available?", cnfe);
             } catch (RuntimeException re) {
                 throw re;
             } catch (Exception e) {

Modified: pig/branches/spark/src/org/apache/pig/builtin/Bloom.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/Bloom.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/Bloom.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/Bloom.java Fri Feb 24 08:19:42 2017
@@ -35,6 +35,7 @@ import org.apache.pig.FilterFunc;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 
 /**
  * Use a Bloom filter build previously by BuildBloom.  You would first
@@ -54,14 +55,36 @@ import org.apache.pig.data.Tuple;
  * C = filter B by bloom(z);
  * D = join C by z, A by x;
  * It uses {@link org.apache.hadoop.util.bloom.BloomFilter}.
+ *
+ * You can also pass the Bloom filter from BuildBloom directly to Bloom UDF
+ * as a scalar instead of storing it to file and loading again. This is simpler
+ * if the Bloom filter will not be reused and needs to be discarded after the
+ * run of the script.
+ *
+ * define bb BuildBloom('jenkins', '100', '0.1');
+ * A = load 'foo' as (x, y);
+ * B = group A all;
+ * C = foreach B generate bb(A.x) as bloomfilter;
+ * D = load 'bar' as (z);
+ * E = filter D by Bloom(C.bloomfilter, z);
+ * F = join E by z, A by x;
  */
 public class Bloom extends FilterFunc {
 
+    private static TupleFactory mTupleFactory = TupleFactory.getInstance();
+
     private String bloomFile;
-    public BloomFilter filter = null;
+    private BloomFilter filter = null;
 
-    /** 
-     * @param filename file containing the serialized Bloom filter
+    public Bloom() {
+    }
+
+    /**
+     * The filename containing the serialized Bloom filter. If filename is null
+     * or the no-arg constructor is used, then the bloomfilter bytearray which
+     * is the output of BuildBloom should be passed as the first argument to the UDF
+     *
+     * @param filename  file containing the serialized Bloom filter
      */
     public Bloom(String filename) {
         bloomFile = filename;
@@ -70,11 +93,25 @@ public class Bloom extends FilterFunc {
     @Override
     public Boolean exec(Tuple input) throws IOException {
         if (filter == null) {
-            init();
+            init(input);
         }
         byte[] b;
-        if (input.size() == 1) b = DataType.toBytes(input.get(0));
-        else b = DataType.toBytes(input, DataType.TUPLE);
+        if (bloomFile == null) {
+            // The first one is the bloom filter. Skip that
+            if (input.size() == 2) {
+                b = DataType.toBytes(input.get(1));
+            } else {
+                List<Object> inputList = input.getAll();
+                Tuple tuple = mTupleFactory.newTupleNoCopy(inputList.subList(1, inputList.size()));
+                b = DataType.toBytes(tuple, DataType.TUPLE);
+            }
+        } else {
+            if (input.size() == 1) {
+                b = DataType.toBytes(input.get(0));
+            } else {
+                b = DataType.toBytes(input, DataType.TUPLE);
+            }
+        }
 
         Key k = new Key(b);
         return filter.membershipTest(k);
@@ -82,34 +119,46 @@ public class Bloom extends FilterFunc {
 
     @Override
     public List<String> getCacheFiles() {
-        List<String> list = new ArrayList<String>(1);
-        // We were passed the name of the file on HDFS.  Append a
-        // name for the file on the task node.
-        try {
-            list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
-        } catch (IOException e) {
-            throw new RuntimeException(e);
+        if (bloomFile != null) {
+            List<String> list = new ArrayList<String>(1);
+            // We were passed the name of the file on HDFS.  Append a
+            // name for the file on the task node.
+            try {
+                list.add(bloomFile + "#" + getFilenameFromPath(bloomFile));
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+            return list;
         }
-        return list;
+        return null;
     }
 
-    private void init() throws IOException {
-        filter = new BloomFilter();
-        String dir = "./" + getFilenameFromPath(bloomFile);
-        String[] partFiles = new File(dir)
-                .list(new FilenameFilter() {
-                    @Override
-                    public boolean accept(File current, String name) {
-                        return name.startsWith("part");
-                    }
-                });
-
-        String dcFile = dir + "/" + partFiles[0];
-        DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
-        try {
-            filter.readFields(dis);
-        } finally {
-            dis.close();
+    private void init(Tuple input) throws IOException {
+        if (bloomFile == null) {
+            if (input.get(0) instanceof DataByteArray) {
+                filter = BuildBloomBase.bloomIn((DataByteArray) input.get(0));
+            } else {
+                throw new IllegalArgumentException("The first argument to the Bloom UDF should be"
+                        + " the bloom filter if a bloom file is not specified in the constructor");
+            }
+        } else {
+            filter = new BloomFilter();
+            String dir = "./" + getFilenameFromPath(bloomFile);
+            String[] partFiles = new File(dir)
+                    .list(new FilenameFilter() {
+                        @Override
+                        public boolean accept(File current, String name) {
+                            return name.startsWith("part");
+                        }
+                    });
+
+            String dcFile = dir + "/" + partFiles[0];
+            DataInputStream dis = new DataInputStream(new FileInputStream(dcFile));
+            try {
+                filter.readFields(dis);
+            } finally {
+                dis.close();
+            }
         }
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/BuildBloomBase.java Fri Feb 24 08:19:42 2017
@@ -18,16 +18,15 @@
 
 package org.apache.pig.builtin;
 
-import java.io.IOException;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.hadoop.util.bloom.BloomFilter;
 import org.apache.hadoop.util.hash.Hash;
-
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataBag;
@@ -47,7 +46,7 @@ public abstract class BuildBloomBase<T>
     protected BuildBloomBase() {
     }
 
-    /** 
+    /**
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param mode Will be ignored, though by convention it should be
@@ -64,7 +63,7 @@ public abstract class BuildBloomBase<T>
         hType = convertHashType(hashType);
     }
 
-    /** 
+    /**
      * @param hashType type of the hashing function (see
      * {@link org.apache.hadoop.util.hash.Hash}).
      * @param numElements The number of distinct elements expected to be
@@ -104,7 +103,7 @@ public abstract class BuildBloomBase<T>
         return new DataByteArray(baos.toByteArray());
     }
 
-    protected BloomFilter bloomIn(DataByteArray b) throws IOException {
+    public static BloomFilter bloomIn(DataByteArray b) throws IOException {
         DataInputStream dis = new DataInputStream(new
             ByteArrayInputStream(b.get()));
         BloomFilter f = new BloomFilter();

Modified: pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/HiveUDFBase.java Fri Feb 24 08:19:42 2017
@@ -37,6 +37,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.hive.shims.ShimLoader;
 import org.apache.hadoop.mapred.Counters;
@@ -180,20 +181,9 @@ abstract class HiveUDFBase extends EvalF
 
     @Override
     public List<String> getShipFiles() {
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         List<String> files = FuncUtils.getShipFiles(new Class[] {GenericUDF.class,
-                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class, 
-                hadoopVersionShimsClass, HadoopShimsSecure.class, Collector.class});
+                PrimitiveObjectInspector.class, HiveConf.class, Serializer.class, ShimLoader.class,
+                Hadoop23Shims.class, HadoopShimsSecure.class, Collector.class});
         return files;
     }
 

Modified: pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/OrcStorage.java Fri Feb 24 08:19:42 2017
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.serde2.obj
 import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.hive.shims.Hadoop23Shims;
 import org.apache.hadoop.hive.shims.HadoopShimsSecure;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
@@ -389,20 +390,8 @@ public class OrcStorage extends LoadFunc
 
     @Override
     public List<String> getShipFiles() {
-        List<String> cacheFiles = new ArrayList<String>();
-        String hadoopVersion = "20S";
-        if (Utils.isHadoop23() || Utils.isHadoop2()) {
-            hadoopVersion = "23";
-        }
-        Class hadoopVersionShimsClass;
-        try {
-            hadoopVersionShimsClass = Class.forName("org.apache.hadoop.hive.shims.Hadoop" +
-                    hadoopVersion + "Shims");
-        } catch (ClassNotFoundException e) {
-            throw new RuntimeException("Cannot find Hadoop" + hadoopVersion + "ShimsClass in classpath");
-        }
         Class[] classList = new Class[] {OrcFile.class, HiveConf.class, AbstractSerDe.class,
-                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, hadoopVersionShimsClass,
+                org.apache.hadoop.hive.shims.HadoopShims.class, HadoopShimsSecure.class, Hadoop23Shims.class,
                 Input.class};
         return FuncUtils.getShipFiles(classList);
     }
@@ -456,7 +445,7 @@ public class OrcStorage extends LoadFunc
     }
 
     private TypeInfo getTypeInfoFromLocation(String location, Job job) throws IOException {
-        FileSystem fs = FileSystem.get(job.getConfiguration());
+        FileSystem fs = FileSystem.get(new Path(location).toUri(), job.getConfiguration());
         Path path = getFirstFile(location, fs, new NonEmptyOrcFileFilter(fs));
         if (path == null) {
             log.info("Cannot find any ORC files from " + location +

Modified: pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/PigStorage.java Fri Feb 24 08:19:42 2017
@@ -68,7 +68,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextOutputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
@@ -171,7 +170,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
         validOptions.addOption(TAG_SOURCE_FILE, false, "Appends input source file name to beginning of each tuple.");
         validOptions.addOption(TAG_SOURCE_PATH, false, "Appends input source file path to beginning of each tuple.");
         validOptions.addOption("tagsource", false, "Appends input source file name to beginning of each tuple.");
-        Option overwrite = new Option(" ", "Overwrites the destination.");
+        Option overwrite = new Option("overwrite", "Overwrites the destination.");
         overwrite.setLongOpt("overwrite");
         overwrite.setOptionalArg(true);
         overwrite.setArgs(1);
@@ -412,7 +411,7 @@ LoadPushDown, LoadMetadata, StoreMetadat
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && (!bzipinput_usehadoops || !HadoopShims.isHadoopYARN()) ) {
+           && (!bzipinput_usehadoops) ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/RoundRobinPartitioner.java Fri Feb 24 08:19:42 2017
@@ -17,15 +17,63 @@
  */
 package org.apache.pig.builtin;
 
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapreduce.Partitioner;
 
-public class RoundRobinPartitioner extends Partitioner<Writable, Writable> {
-    private int num = 0;
+/**
+ * This partitioner should be used with extreme caution and only in cases
+ * where the order of output records is guaranteed to be same. If the order of
+ * output records can vary on retries which is mostly the case, map reruns
+ * due to shuffle fetch failures can lead to data being partitioned differently
+ * and result in incorrect output due to loss or duplication of data.
+ * Refer PIG-5041 for more details.
+ *
+ * This will be removed in the next release as it is risky to use in most cases.
+ */
+@Deprecated
+public class RoundRobinPartitioner extends Partitioner<Writable, Writable>
+        implements Configurable {
+
+    /**
+     * Batch size for round robin partitioning. Batch size number of records
+     * will be distributed to each partition in a round robin fashion. Default
+     * value is 0 which distributes each record in a circular fashion. Higher
+     * number for batch size can be used to increase probability of keeping
+     * similar records in the same partition if output is already sorted and get
+     * better compression.
+     */
+    public static String PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE = "pig.round.robin.partitioner.batch.size";
+    private int num = -1;
+    private int batchSize = 0;
+    private int currentBatchCount = 0;
+    private Configuration conf;
 
     @Override
     public int getPartition(Writable key, Writable value, int numPartitions) {
-        num = ++num % numPartitions;
+        if (batchSize > 0) {
+            if (currentBatchCount == 0) {
+                num = ++num % numPartitions;
+            }
+            if (++currentBatchCount == batchSize) {
+                currentBatchCount = 0;
+            }
+        } else {
+            num = ++num % numPartitions;
+        }
         return num;
     }
+
+    @Override
+    public void setConf(Configuration conf) {
+        this.conf = conf;
+        batchSize = conf.getInt(PIG_ROUND_ROBIN_PARTITIONER_BATCH_SIZE, 0);
+    }
+
+    @Override
+    public Configuration getConf() {
+        return conf;
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java (original)
+++ pig/branches/spark/src/org/apache/pig/builtin/TextLoader.java Fri Feb 24 08:19:42 2017
@@ -37,7 +37,6 @@ import org.apache.pig.ResourceSchema.Res
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.bzip2r.Bzip2TextInputFormat;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -259,8 +258,7 @@ public class TextLoader extends LoadFunc
     @Override
     public InputFormat getInputFormat() {
         if((loadLocation.endsWith(".bz2") || loadLocation.endsWith(".bz"))
-           && !HadoopShims.isHadoopYARN()
-           && !bzipinput_usehadoops ) {
+                && !bzipinput_usehadoops ) {
             mLog.info("Using Bzip2TextInputFormat");
             return new Bzip2TextInputFormat();
         } else {

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultAbstractBag.java Fri Feb 24 08:19:42 2017
@@ -423,7 +423,7 @@ public abstract class DefaultAbstractBag
     }
 
     @SuppressWarnings("rawtypes")
-    protected void warn(String msg, Enum warningEnum, Exception e) {
+    protected void warn(String msg, Enum warningEnum, Throwable e) {
         pigLogger = PhysicalOperator.getPigLogger();
         if(pigLogger != null) {
             pigLogger.warn(this, msg, warningEnum);

Modified: pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DefaultDataBag.java Fri Feb 24 08:19:42 2017
@@ -22,11 +22,11 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
-import java.io.FileNotFoundException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -42,12 +42,12 @@ import org.apache.pig.PigWarning;
 public class DefaultDataBag extends DefaultAbstractBag {
 
     /**
-     * 
+     *
      */
     private static final long serialVersionUID = 2L;
 
     private static final Log log = LogFactory.getLog(DefaultDataBag.class);
-    
+
     private static final InterSedes SEDES = InterSedesFactory.getInterSedesInstance();
 
     public DefaultDataBag() {
@@ -70,12 +70,12 @@ public class DefaultDataBag extends Defa
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return false;
     }
-    
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DefaultDataBagIterator();
@@ -110,12 +110,15 @@ public class DefaultDataBag extends Defa
                     if ((spilled & 0x3fff) == 0) reportProgress();
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -126,7 +129,6 @@ public class DefaultDataBag extends Defa
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -156,7 +158,7 @@ public class DefaultDataBag extends Defa
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // Once we call hasNext(), set the flag, so we can call hasNext() repeated without fetching next tuple
             if (hasCachedTuple)
                 return (mBuf != null);
@@ -209,7 +211,7 @@ public class DefaultDataBag extends Defa
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -223,7 +225,7 @@ public class DefaultDataBag extends Defa
                         log.fatal(msg, eof);
                         throw new RuntimeException(msg, eof);
                     } catch (IOException ioe) {
-                        String msg = "Unable to read our spill file."; 
+                        String msg = "Unable to read our spill file.";
                         log.fatal(msg, ioe);
                         throw new RuntimeException(msg, ioe);
                     }
@@ -259,7 +261,7 @@ public class DefaultDataBag extends Defa
                         log.warn("Failed to close spill file.", e);
                     }
                 } catch (IOException ioe) {
-                    String msg = "Unable to read our spill file."; 
+                    String msg = "Unable to read our spill file.";
                     log.fatal(msg, ioe);
                     throw new RuntimeException(msg, ioe);
                 }

Modified: pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/DistinctDataBag.java Fri Feb 24 08:19:42 2017
@@ -67,17 +67,17 @@ public class DistinctDataBag extends Def
     public boolean isSorted() {
         return false;
     }
-    
+
     @Override
     public boolean isDistinct() {
         return true;
     }
-    
-    
+
+
     @Override
     public long size() {
         if (mSpillFiles != null && mSpillFiles.size() > 0){
-            //We need to racalculate size to guarantee a count of unique 
+            //We need to racalculate size to guarantee a count of unique
             //entries including those on disk
             Iterator<Tuple> iter = iterator();
             int newSize = 0;
@@ -85,7 +85,7 @@ public class DistinctDataBag extends Def
                 newSize++;
                 iter.next();
             }
-            
+
             synchronized(mContents) {
                 //we don't want adds to change our numbers
                 //the lock may need to cover more of the method
@@ -94,8 +94,8 @@ public class DistinctDataBag extends Def
         }
         return mSize;
     }
-    
-    
+
+
     @Override
     public Iterator<Tuple> iterator() {
         return new DistinctDataBagIterator();
@@ -155,12 +155,15 @@ public class DistinctDataBag extends Def
                     }
                 }
                 out.flush();
-            } catch (IOException ioe) {
+                out.close();
+                out = null;
+                mContents.clear();
+            } catch (Throwable e) {
                 // Remove the last file from the spilled array, since we failed to
                 // write to it.
                 mSpillFiles.remove(mSpillFiles.size() - 1);
                 warn(
-                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, ioe);
+                    "Unable to spill contents to disk", PigWarning.UNABLE_TO_SPILL, e);
                 return 0;
             } finally {
                 if (out != null) {
@@ -171,7 +174,6 @@ public class DistinctDataBag extends Def
                     }
                 }
             }
-            mContents.clear();
         }
         // Increment the spill count
         incSpillCount(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT);
@@ -208,7 +210,7 @@ public class DistinctDataBag extends Def
 
             @Override
             public int hashCode() {
-                return tuple.hashCode(); 
+                return tuple.hashCode();
             }
         }
 
@@ -237,7 +239,7 @@ public class DistinctDataBag extends Def
         }
 
         @Override
-        public boolean hasNext() { 
+        public boolean hasNext() {
             // See if we can find a tuple.  If so, buffer it.
             mBuf = next();
             return mBuf != null;
@@ -295,7 +297,7 @@ public class DistinctDataBag extends Def
                 } catch (FileNotFoundException fnfe) {
                     // We can't find our own spill file?  That should never
                     // happen.
-                    String msg = "Unable to find our spill file."; 
+                    String msg = "Unable to find our spill file.";
                     log.fatal(msg, fnfe);
                     throw new RuntimeException(msg, fnfe);
                 }
@@ -346,7 +348,7 @@ public class DistinctDataBag extends Def
                 Iterator<File> i = mSpillFiles.iterator();
                 while (i.hasNext()) {
                     try {
-                        DataInputStream in = 
+                        DataInputStream in =
                             new DataInputStream(new BufferedInputStream(
                                 new FileInputStream(i.next())));
                         mStreams.add(in);
@@ -502,7 +504,7 @@ public class DistinctDataBag extends Def
                             addToQueue(null, mStreams.size() - 1);
                             i.remove();
                             filesToDelete.add(f);
-                            
+
                         } catch (FileNotFoundException fnfe) {
                             // We can't find our own spill file?  That should
                             // neer happen.
@@ -545,7 +547,7 @@ public class DistinctDataBag extends Def
                         log.warn("Failed to delete spill file: " + f.getPath());
                     }
                 }
-                
+
                 // clear the list, so that finalize does not delete any files,
                 // when mSpillFiles is assigned a new value
                 mSpillFiles.clear();
@@ -560,6 +562,6 @@ public class DistinctDataBag extends Def
             }
         }
     }
-    
+
 }
 

Modified: pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/ReadOnceBag.java Fri Feb 24 08:19:42 2017
@@ -50,6 +50,9 @@ public class ReadOnceBag implements Data
      */
     private static final long serialVersionUID = 2L;
 
+    public ReadOnceBag() {
+    }
+
     /**
      * This constructor creates a bag out of an existing iterator
      * of tuples by taking ownership of the iterator and NOT

Modified: pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java?rev=1784237&r1=1784236&r2=1784237&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SchemaTupleBackend.java Fri Feb 24 08:19:42 2017
@@ -39,6 +39,7 @@ import org.apache.pig.data.utils.Structu
 import org.apache.pig.data.utils.StructuresHelper.Triple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.Utils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
@@ -272,14 +273,20 @@ public class SchemaTupleBackend {
     private static SchemaTupleBackend stb;
 
     public static void initialize(Configuration jConf, PigContext pigContext) throws IOException {
-        initialize(jConf, pigContext, pigContext.getExecType().isLocal());
+        if (stb != null) {
+            SchemaTupleFrontend.lazyReset(pigContext);
+        }
+        initialize(jConf, pigContext.getExecType().isLocal());
     }
 
-    public static void initialize(Configuration jConf, PigContext pigContext, boolean isLocal) throws IOException {
+    public static void initialize(Configuration jConf) throws IOException {
+        initialize(jConf, Utils.isLocal(jConf));
+    }
+
+    public static void initialize(Configuration jConf, boolean isLocal) throws IOException {
         if (stb != null) {
             LOG.warn("SchemaTupleBackend has already been initialized");
         } else {
-            SchemaTupleFrontend.lazyReset(pigContext);
             SchemaTupleFrontend.reset();
             SchemaTupleBackend stbInstance = new SchemaTupleBackend(jConf, isLocal);
             stbInstance.copyAndResolve();



Mime
View raw message