pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xu...@apache.org
Subject svn commit: r1733627 [7/18] - in /pig/branches/spark: ./ bin/ conf/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/math/ contrib/piggybank/java/src/main...
Date Fri, 04 Mar 2016 18:17:47 GMT
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Mar  4 18:17:39 2016
@@ -92,40 +92,55 @@ public class TezSessionManager {
         adjustAMConfig(amConf, tezJobConf);
         String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
-        tezClient.start();
-        TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
-        if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {
-            throw new RuntimeException("TezSession has already shutdown");
+        try {
+            tezClient.start();
+            TezAppMasterStatus appMasterStatus = tezClient.getAppMasterStatus();
+            if (appMasterStatus.equals(TezAppMasterStatus.SHUTDOWN)) {
+                throw new RuntimeException("TezSession has already shutdown");
+            }
+            tezClient.waitTillReady();
+        } catch (Throwable e) {
+            log.error("Exception while waiting for Tez client to be ready", e);
+            tezClient.stop();
+            throw new RuntimeException(e);
         }
-        tezClient.waitTillReady();
         return new SessionInfo(tezClient, requestedAMResources);
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
         int requiredAMMaxHeap = -1;
         int requiredAMResourceMB = -1;
-        int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get(
+        String amLaunchOpts = amConf.get(
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
-                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT));
+                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
+        int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amLaunchOpts);
         int configuredAMResourceMB = amConf.getInt(
                 TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
                 TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT);
 
         if (tezJobConf.getEstimatedTotalParallelism() > 0) {
 
-            int minAMMaxHeap = 3584;
+            // Need more room for native memory/virtual address space
+            // when close to 4G due to 32-bit jvm 4G limit
+            int minAMMaxHeap = 3200;
             int minAMResourceMB = 4096;
 
             // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
-            // Increment by 512 mb for every additional 5K tasks.
+            // Increment container size by 512 mb for every additional 5K tasks.
+            //     30000 and above - 3200Xmx, 4096 (896 native memory)
+            //     25000 and above - 3072Xmx, 3584
+            //     20000 and above - 2560Xmx, 3072
+            //     15000 and above - 2048Xmx, 2560
+            //     10000 and above - 1536Xmx, 2048
+            //     5000 and above  - 1024Xmx, 1536 (512 native memory)
             for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
-                if (tezJobConf.getEstimatedTotalParallelism() > taskCount) {
+                if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
                     requiredAMMaxHeap = minAMMaxHeap;
                     requiredAMResourceMB = minAMResourceMB;
                     break;
                 }
-                minAMMaxHeap = minAMMaxHeap - 512;
                 minAMResourceMB = minAMResourceMB - 512;
+                minAMMaxHeap = minAMResourceMB - 512;
             }
 
             if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
@@ -139,13 +154,14 @@ public class TezSessionManager {
 
                 if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
                     amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
-                            amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)
-                                    + " -Xmx" + requiredAMMaxHeap + "M");
+                            amLaunchOpts + " -Xmx" + requiredAMMaxHeap + "M");
                     log.info("Increasing Tez AM Heap Size from "
                             + configuredAMMaxHeap + "M to "
                             + requiredAMMaxHeap
                             + "M as the number of total estimated tasks is "
                             + tezJobConf.getEstimatedTotalParallelism());
+                    log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
+                            + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
                 }
             }
         }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Mar  4 18:17:39 2016
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -34,6 +35,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.pig.CollectableLoadFunc;
 import org.apache.pig.FuncSpec;
 import org.apache.pig.IndexableLoadFunc;
@@ -95,6 +97,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueInputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.FindQuantilesTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.IsFirstReduceOfKeyTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.PartitionSkewedKeysTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.udf.ReadScalarsTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.SkewedPartitionerTez;
@@ -170,6 +173,7 @@ public class TezCompiler extends PhyPlan
 
     private int fileConcatenationThreshold = 100;
     private boolean optimisticFileConcatenation = false;
+    private List<String> readOnceLoadFuncs = null;
 
     private POLocalRearrangeTezFactory localRearrangeFactory;
 
@@ -200,6 +204,12 @@ public class TezCompiler extends PhyPlan
                 OPTIMISTIC_FILE_CONCATENATION, "false").equals("true");
         LOG.info("File concatenation threshold: " + fileConcatenationThreshold
                 + " optimistic? " + optimisticFileConcatenation);
+
+        String loadFuncs = pigContext.getProperties().getProperty(
+                PigConfiguration.PIG_SORT_READONCE_LOADFUNCS);
+        if (loadFuncs != null && loadFuncs.trim().length() > 0) {
+            readOnceLoadFuncs = Arrays.asList(StringUtils.split(loadFuncs.trim()));
+        }
     }
 
     public TezOperPlan getTezPlan() {
@@ -284,17 +294,20 @@ public class TezCompiler extends PhyPlan
                     FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
                     userFunc.setFuncSpec(newSpec);
 
+                    //Remove unused store filename
+                    if (userFunc.getInputs().size() == 2) {
+                        userFunc.getInputs().remove(1);
+                    }
+
                     if (storeSeen.containsKey(store)) {
                         storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
                     } else {
                         POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                        output.setScalarOutput(true);
                         output.addOutputKey(tezOp.getOperatorKey().toString());
                         from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
                         from.plan.addAsLeaf(output);
                         storeSeen.put(store, output);
-
-                        //Remove unused store filename
-                        userFunc.getInputs().remove(1);
                     }
 
                     if (tezPlan.getPredecessors(tezOp)==null || !tezPlan.getPredecessors(tezOp).contains(from)) {
@@ -350,52 +363,18 @@ public class TezCompiler extends PhyPlan
                     String msg = "Predecessor of load should be a store or native oper. Got " + p.getClass();
                     throw new PlanException(msg, errCode, PigException.BUG);
                 }
-                if (p instanceof POStore) {
-                    PhysicalOperator store = oper.plan.getOperator(p.getOperatorKey());
-                    // replace POStore to POValueOutputTez, convert the tezOperator to splitter
-                    oper.plan.disconnect(oper.plan.getPredecessors(store).get(0), store);
-                    oper.plan.remove(store);
-                    POValueOutputTez valueOutput = new POValueOutputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    oper.plan.addAsLeaf(valueOutput);
-                    oper.setSplitter(true);
-
-                    // Create a splittee of store only
-                    TezOperator storeOnlyTezOperator = getTezOp();
-                    PhysicalPlan storeOnlyPhyPlan = new PhysicalPlan();
-                    POValueInputTez valueInput = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    valueInput.setInputKey(oper.getOperatorKey().toString());
-                    storeOnlyPhyPlan.addAsLeaf(valueInput);
-                    storeOnlyPhyPlan.addAsLeaf(store);
-                    storeOnlyTezOperator.plan = storeOnlyPhyPlan;
-                    tezPlan.add(storeOnlyTezOperator);
-                    phyToTezOpMap.put(p, storeOnlyTezOperator);
-
-                    // Create new operator as second splittee
-                    curTezOp = getTezOp();
-                    POValueInputTez valueInput2 = new POValueInputTez(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    valueInput2.setInputKey(oper.getOperatorKey().toString());
-                    curTezOp.plan.add(valueInput2);
-                    tezPlan.add(curTezOp);
-
-                    // Connect splitter to splittee
-                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, oper, storeOnlyTezOperator);
-                    TezCompilerUtil.configureValueOnlyTupleOutput(edge,  DataMovementType.ONE_TO_ONE);
-                    storeOnlyTezOperator.setRequestedParallelismByReference(oper);
-
-                    edge = TezCompilerUtil.connect(tezPlan, oper, curTezOp);
-                    TezCompilerUtil.configureValueOnlyTupleOutput(edge,  DataMovementType.ONE_TO_ONE);
-                    curTezOp.setRequestedParallelismByReference(oper);
-                } else if (p instanceof PONative) {
-                    // Need new operator
-                    curTezOp = getTezOp();
-                    curTezOp.plan.add(op);
-                    tezPlan.add(curTezOp);
-
-                    plan.disconnect(op, p);
-                    TezCompilerUtil.connect(tezPlan, oper, curTezOp);
-                    phyToTezOpMap.put(op, curTezOp);
-                    return;
+                curTezOp = getTezOp();
+                curTezOp.plan.add(op);
+                curTezOp.setUseMRMapSettings(true);
+                if (((POLoad) op).getLFile() != null
+                        && ((POLoad) op).getLFile().getFuncSpec() != null) {
+                        curTezOp.UDFs.add(((POLoad)op).getLFile().getFuncSpec().toString());
                 }
+                tezPlan.add(curTezOp);
+                phyToTezOpMap.put(op, curTezOp);
+                plan.disconnect(op, p);
+                TezCompilerUtil.connect(tezPlan, oper, curTezOp);
+                oper.segmentBelow = true;
                 return;
             }
 
@@ -640,6 +619,7 @@ public class TezCompiler extends PhyPlan
     public void visitCounter(POCounter op) throws VisitorException {
         // Refer visitRank(PORank) for more details
         try{
+            curTezOp.markRankCounter();
             POCounterTez counterTez = new POCounterTez(op);
             nonBlocking(counterTez);
             phyToTezOpMap.put(op, curTezOp);
@@ -685,6 +665,7 @@ public class TezCompiler extends PhyPlan
             clr.setDistinct(true);
             combinePlan.addAsLeaf(clr);
 
+            curTezOp.markDistinct();
             addDistinctPlan(curTezOp.plan, op.getRequestedParallelism());
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             phyToTezOpMap.put(op, curTezOp);
@@ -1134,7 +1115,6 @@ public class TezCompiler extends PhyPlan
     public void visitMergeJoin(POMergeJoin joinOp) throws VisitorException {
 
         try{
-            joinOp.setEndOfRecordMark(POStatus.STATUS_NULL);
             if(compiledInputs.length != 2 || joinOp.getInputs().size() != 2){
                 int errCode=1101;
                 throw new TezCompilerException("Merge Join must have exactly two inputs. Found : "+compiledInputs.length, errCode);
@@ -1476,13 +1456,26 @@ public class TezCompiler extends PhyPlan
             if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE)) {
                 heapPerc = Float.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEMUSAGE));
             }
+            long totalMemory = -1;
+            if (pigProperties.containsKey(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM)) {
+                totalMemory = Long.valueOf(pigProperties.getProperty(PigConfiguration.PIG_SKEWEDJOIN_REDUCE_MEM));
+            }
             POPoissonSample poSample = new POPoissonSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
-                    -1, sampleRate, heapPerc);
+                    -1, sampleRate, heapPerc, totalMemory);
+
+            TezOperator samplerOper = compiledInputs[0];
+            boolean writeDataForPartitioner = shouldWriteDataForPartitioner(samplerOper);
+
+            PhysicalPlan partitionerPlan = null;
+            if (writeDataForPartitioner) {
+                samplerOper.plan.addAsLeaf(lrTez);
+            } else {
+                partitionerPlan = samplerOper.plan.clone();
+                partitionerPlan.addAsLeaf(lrTez);
+            }
 
-            TezOperator prevOp = compiledInputs[0];
-            prevOp.plan.addAsLeaf(lrTez);
-            prevOp.plan.addAsLeaf(poSample);
-            prevOp.markSampler();
+            samplerOper.plan.addAsLeaf(poSample);
+            samplerOper.markSampler();
 
             MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
             List<PhysicalOperator> l = plan.getPredecessors(op);
@@ -1526,9 +1519,9 @@ public class TezCompiler extends PhyPlan
             // This foreach will pick the sort key columns from the POPoissonSample output
             POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
                     -1, eps1, flat1);
-            prevOp.plan.addAsLeaf(nfe1);
-            prevOp.plan.addAsLeaf(lrTezSample);
-            prevOp.setClosed(true);
+            samplerOper.plan.addAsLeaf(nfe1);
+            samplerOper.plan.addAsLeaf(lrTezSample);
+            samplerOper.setClosed(true);
 
             int rp = op.getRequestedParallelism();
             if (rp == -1) {
@@ -1551,10 +1544,9 @@ public class TezCompiler extends PhyPlan
 
             compiledInputs = new TezOperator[] {joinInputs[0]};
 
-            blocking();
-
-            // Add a POIdentityInOutTez to the joinJobs[0] which is a partition vertex.
-            // It just partitions the data from first vertex based on the quantiles from sample vertex.
+            // Add a partitioner vertex that partitions the data based on the quantiles from sample vertex.
+            curTezOp = getTezOp();
+            tezPlan.add(curTezOp);
             joinJobs[0] = curTezOp;
 
             try {
@@ -1572,15 +1564,38 @@ public class TezCompiler extends PhyPlan
             }
             lrTez.setKeyType(type);
             lrTez.setPlans(groups);
-            lrTez.setSkewedJoin(true);
             lrTez.setResultType(DataType.TUPLE);
 
-            POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
-                    OperatorKey.genOpKey(scope), lrTez);
-            identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
-            joinJobs[0].plan.addAsLeaf(identityInOutTez);
+            POLocalRearrangeTez partitionerLR = null;
+            if (!writeDataForPartitioner) {
+                // Read input from hdfs again
+                joinJobs[0].plan = partitionerPlan;
+                partitionerLR = lrTez;
+                lrTez.setSkewedJoin(true);
+            } else {
+                // Add a POIdentityInOutTez which just passes data through from sampler vertex
+                partitionerLR = new POIdentityInOutTez(
+                        OperatorKey.genOpKey(scope),
+                        lrTez,
+                        samplerOper.getOperatorKey().toString());
+                partitionerLR.setSkewedJoin(true);
+                joinJobs[0].plan.addAsLeaf(partitionerLR);
+
+                // Connect the sampler vertex to the partitioner vertex
+                lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, samplerOper, joinJobs[0]);
+                // TODO: PIG-3775 unsorted shuffle
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = UnorderedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+                // If prevOp.requestedParallelism changes based on no. of input splits
+                // it will reflect for joinJobs[0] so that 1-1 edge will work.
+                joinJobs[0].setRequestedParallelismByReference(samplerOper);
+            }
             joinJobs[0].setClosed(true);
             joinJobs[0].markSampleBasedPartitioner();
+            joinJobs[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
             rearrangeOutputs[0] = joinJobs[0];
 
             compiledInputs = new TezOperator[] {joinInputs[1]};
@@ -1633,6 +1648,7 @@ public class TezCompiler extends PhyPlan
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             List<Boolean> flat = new ArrayList<Boolean>();
 
+            boolean containsRightOuter = false;
             // Add corresponding POProjects
             for (int i=0; i < 2; i++) {
                 ep = new PhysicalPlan();
@@ -1643,8 +1659,13 @@ public class TezCompiler extends PhyPlan
                 ep.add(prj);
                 eps.add(ep);
                 if (!inner[i]) {
-                    // Add an empty bag for outer join
-                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                    // Add an empty bag for outer join. For right outer, add IsFirstReduceOfKeyTez UDF as well
+                    if (i == 0) {
+                        containsRightOuter = true;
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), true, IsFirstReduceOfKeyTez.class.getName());
+                    } else {
+                        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i), false, IsFirstReduceOfKeyTez.class.getName());
+                    }
                 }
                 flat.add(true);
             }
@@ -1655,36 +1676,30 @@ public class TezCompiler extends PhyPlan
             fe.visit(this);
 
             // Connect vertices
-            lrTez.setOutputKey(joinJobs[0].getOperatorKey().toString());
             lrTezSample.setOutputKey(sampleJobPair.first.getOperatorKey().toString());
-            identityInOutTez.setOutputKey(joinJobs[2].getOperatorKey().toString());
+            partitionerLR.setOutputKey(joinJobs[2].getOperatorKey().toString());
             pr.setOutputKey(joinJobs[2].getOperatorKey().toString());
 
-            TezEdgeDescriptor edge = joinJobs[0].inEdges.get(prevOp.getOperatorKey());
-            joinJobs[0].setUseMRMapSettings(prevOp.isUseMRMapSettings());
-            // TODO: Convert to unsorted shuffle after TEZ-661
-            // Use 1-1 edge
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = UnorderedKVOutput.class.getName();
-            edge.inputClassName = UnorderedKVInput.class.getName();
-            // If prevOp.requestedParallelism changes based on no. of input splits
-            // it will reflect for joinJobs[0] so that 1-1 edge will work.
-            joinJobs[0].setRequestedParallelismByReference(prevOp);
-
-            TezCompilerUtil.connect(tezPlan, prevOp, sampleJobPair.first);
+            TezCompilerUtil.connect(tezPlan, samplerOper, sampleJobPair.first);
 
             POValueOutputTez sampleOut = (POValueOutputTez) sampleJobPair.first.plan.getLeaves().get(0);
-            for (int i = 0; i < 2; i++) {
-                joinJobs[i].setSampleOperator(sampleJobPair.first);
-
-                // Configure broadcast edges for distribution map
-                edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
-                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
-                sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+            for (int i = 0; i <= 2; i++) {
+                if (i != 2 || containsRightOuter) {
+                    // We need to send sample to left relation partitioner vertex, right relation load vertex,
+                    // and join vertex (IsFirstReduceOfKey in join vertex need sample file as well)
+                    joinJobs[i].setSampleOperator(sampleJobPair.first);
+
+                    // Configure broadcast edges for distribution map
+                    TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, sampleJobPair.first, joinJobs[i]);
+                    TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
+                    sampleOut.addOutputKey(joinJobs[i].getOperatorKey().toString());
+                }
 
                 // Configure skewed partitioner for join
-                edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
-                edge.partitionerClass = SkewedPartitionerTez.class;
+                if (i != 2) {
+                    TezEdgeDescriptor edge = joinJobs[2].inEdges.get(joinJobs[i].getOperatorKey());
+                    edge.partitionerClass = SkewedPartitionerTez.class;
+                }
             }
 
             joinJobs[2].markSkewedJoin();
@@ -1712,55 +1727,79 @@ public class TezCompiler extends PhyPlan
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
 
+    private boolean shouldWriteDataForPartitioner(TezOperator samplerOper) {
+        // If there are operators other than load and foreach (like filter
+        // split, etc) in the plan, then process and write the data out
+        // and save the cost of processing in the partitioner vertex
+        // Else read from hdfs again save the IO cost of the extra write
+        boolean writeDataForPartitioner = false;
+        if (samplerOper.plan.getRoots().get(0) instanceof POLoad) {
+            for (PhysicalOperator oper : samplerOper.plan) {
+                if (oper instanceof POForEach) {
+                    continue;
+                } else if (oper instanceof POLoad && oper.getInputs() == null) {
+                    // TODO: oper.getInputs() is not null in case of PONative and
+                    // clone needs to be fixed in that case. e2e test - Native_2.
+                    String loadFunc = ((POLoad) oper).getLoadFunc().getClass().getName();
+                    // We do not want to read all data again from hbase/accumulo for sampling.
+                    if (readOnceLoadFuncs == null || !readOnceLoadFuncs.contains(loadFunc)) {
+                        continue;
+                    }
+                }
+                writeDataForPartitioner = true;
+                break;
+            }
+        } else {
+            writeDataForPartitioner = true;
+        }
+        return writeDataForPartitioner;
+    }
+
     /**
-     * Force an end to the current vertex with store and sample
-     * @return Tez operator that now is finished with a store.
-     * @throws PlanException
+     * Get LocalRearrange for POSort input
      */
-    private TezOperator endSingleInputWithStoreAndSample(
-            POSort sort,
-            POLocalRearrangeTez lr,
-            POLocalRearrangeTez lrSample,
-            byte keyType,
-            Pair<POProject, Byte>[] fields) throws PlanException {
-        if(compiledInputs.length>1) {
-            int errCode = 2023;
-            String msg = "Received a multi input plan when expecting only a single input one.";
-            throw new PlanException(msg, errCode, PigException.BUG);
+    private POLocalRearrangeTez getLocalRearrangeForSortInput(POSort sort,
+            byte keyType, Pair<POProject, Byte>[] fields)
+            throws PlanException {
+        POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
+        List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+        if (fields == null) {
+            // This is project *
+            PhysicalPlan ep = new PhysicalPlan();
+            POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            prj.setStar(true);
+            prj.setOverloaded(false);
+            prj.setResultType(DataType.TUPLE);
+            ep.add(prj);
+            eps.add(ep);
+        } else {
+            // Attach the sort plans to the local rearrange to get the
+            // projection.
+            eps.addAll(sort.getSortPlans());
         }
-        TezOperator oper = compiledInputs[0];
-        if (!oper.isClosed()) {
 
-            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-            if (fields == null) {
-                // This is project *
-                PhysicalPlan ep = new PhysicalPlan();
-                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                prj.setStar(true);
-                prj.setOverloaded(false);
-                prj.setResultType(DataType.TUPLE);
-                ep.add(prj);
-                eps.add(ep);
-            } else {
-                // Attach the sort plans to the local rearrange to get the
-                // projection.
-                eps.addAll(sort.getSortPlans());
-            }
+        try {
+            lr.setIndex(0);
+        } catch (ExecException e) {
+            int errCode = 2058;
+            String msg = "Unable to set index on newly created POLocalRearrange.";
+            throw new PlanException(msg, errCode, PigException.BUG, e);
+        }
+        lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
+        lr.setPlans(eps);
+        lr.setResultType(DataType.TUPLE);
+        lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+        return lr;
+    }
 
-            try {
-                lr.setIndex(0);
-            } catch (ExecException e) {
-                int errCode = 2058;
-                String msg = "Unable to set index on newly created POLocalRearrange.";
-                throw new PlanException(msg, errCode, PigException.BUG, e);
-            }
-            lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE : keyType);
-            lr.setPlans(eps);
-            lr.setResultType(DataType.TUPLE);
-            lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
+    /**
+     * Add a sampler to the sort input
+     */
+    private POLocalRearrangeTez addSamplingToSortInput(POSort sort, TezOperator oper,
+            byte keyType, Pair<POProject, Byte>[] fields) throws PlanException {
 
-            lr.setOutputKey(curTezOp.getOperatorKey().toString());
-            oper.plan.addAsLeaf(lr);
+        POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
+        if (!oper.isClosed()) {
 
             List<Boolean> flat1 = new ArrayList<Boolean>();
             List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -1861,7 +1900,8 @@ public class TezCompiler extends PhyPlan
             String msg = "The current operator is closed. This is unexpected while compiling.";
             throw new PlanException(msg, errCode, PigException.BUG);
         }
-        return oper;
+        oper.markSampler();
+        return lrSample;
     }
 
     private Pair<TezOperator,Integer> getOrderbySamplingAggregationJob(
@@ -2098,32 +2138,42 @@ public class TezCompiler extends PhyPlan
 
     private TezOperator[] getSortJobs(
             TezOperator inputOper,
+            PhysicalPlan partitionerPlan,
             POLocalRearrangeTez inputOperRearrange,
             POSort sort,
             byte keyType,
             Pair<POProject, Byte>[] fields) throws PlanException{
+
         TezOperator[] opers = new TezOperator[2];
+
+        // Partitioner Vertex
         TezOperator oper1 = getTezOp();
         tezPlan.add(oper1);
         opers[0] = oper1;
-
-        POIdentityInOutTez identityInOutTez = new POIdentityInOutTez(
-                OperatorKey.genOpKey(scope),
-                inputOperRearrange);
-        identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
-        oper1.plan.addAsLeaf(identityInOutTez);
+        POLocalRearrangeTez partitionerLR = null;
+        if (partitionerPlan != null) {
+            // Read from hdfs again
+            oper1.plan = partitionerPlan;
+            partitionerLR = inputOperRearrange;
+        } else {
+            partitionerLR = new POIdentityInOutTez(
+                    OperatorKey.genOpKey(scope),
+                    inputOperRearrange,
+                    inputOper.getOperatorKey().toString());
+            oper1.plan.addAsLeaf(partitionerLR);
+        }
         oper1.setClosed(true);
         oper1.markSampleBasedPartitioner();
 
+        // Global Sort Vertex
         TezOperator oper2 = getTezOp();
+        partitionerLR.setOutputKey(oper2.getOperatorKey().toString());
         oper2.markGlobalSort();
         opers[1] = oper2;
         tezPlan.add(oper2);
 
         long limit = sort.getLimit();
-
         boolean[] sortOrder;
-
         List<Boolean> sortOrderList = sort.getMAscCols();
         if(sortOrderList != null) {
             sortOrder = new boolean[sortOrderList.size()];
@@ -2133,8 +2183,6 @@ public class TezCompiler extends PhyPlan
             oper2.setSortOrder(sortOrder);
         }
 
-        identityInOutTez.setOutputKey(oper2.getOperatorKey().toString());
-
         if (limit!=-1) {
             POPackage pkg_c = new POPackage(OperatorKey.genOpKey(scope));
             pkg_c.setPkgr(new LitePackager());
@@ -2214,6 +2262,13 @@ public class TezCompiler extends PhyPlan
     @Override
     public void visitSort(POSort op) throws VisitorException {
         try{
+
+            if (compiledInputs.length > 1) {
+                int errCode = 2023;
+                String msg = "Received a multi input plan when expecting only a single input one.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
             Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
             byte keyType = DataType.UNKNOWN;
 
@@ -2228,58 +2283,65 @@ public class TezCompiler extends PhyPlan
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
 
-            POLocalRearrangeTez lr = new POLocalRearrangeTez(OperatorKey.genOpKey(scope));
-            POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
+            TezOperator samplerOper = compiledInputs[0];
+            boolean writeDataForPartitioner = shouldWriteDataForPartitioner(samplerOper);
 
-            TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
-            prevOper.markSampler();
+            POLocalRearrangeTez lr = getLocalRearrangeForSortInput(op, keyType, fields);
+            PhysicalPlan partitionerPlan = null;
+            if (writeDataForPartitioner) {
+                samplerOper.plan.addAsLeaf(lr);
+            } else {
+                partitionerPlan = samplerOper.plan.clone();
+                partitionerPlan.addAsLeaf(lr);
+            }
 
+            // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp
             int rp = op.getRequestedParallelism();
             if (rp == -1) {
                 rp = pigContext.defaultParallel;
             }
 
-            // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp
+            // Add sampling to sort input. Create a sample aggregation operator and connect both
+            POLocalRearrangeTez lrSample = addSamplingToSortInput(op, samplerOper, keyType, fields);
             Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
-            TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
+            TezCompilerUtil.connect(tezPlan, samplerOper, quantJobParallelismPair.first);
 
-            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
-            sortOpers[0].setUseMRMapSettings(prevOper.isUseMRMapSettings());
+            // Create the partitioner and the global sort vertices
+            TezOperator[] sortOpers = getSortJobs(samplerOper, partitionerPlan, lr, op, keyType, fields);
+            sortOpers[0].setUseMRMapSettings(samplerOper.isUseMRMapSettings());
+
+            if (writeDataForPartitioner) {
+                // Connect the sampler and partitioner vertex
+                lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+                TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, samplerOper, sortOpers[0]);
 
-            // Use 1-1 edge
-            edge.dataMovementType = DataMovementType.ONE_TO_ONE;
-            edge.outputClassName = UnorderedKVOutput.class.getName();
-            edge.inputClassName = UnorderedKVInput.class.getName();
-            // If prevOper.requestedParallelism changes based on no. of input splits
-            // it will reflect for sortOpers[0] so that 1-1 edge will work.
-            sortOpers[0].setRequestedParallelismByReference(prevOper);
-            if (rp==-1) {
-                quantJobParallelismPair.first.setNeedEstimatedQuantile(true);
+                // Use 1-1 edge
+                edge.dataMovementType = DataMovementType.ONE_TO_ONE;
+                edge.outputClassName = UnorderedKVOutput.class.getName();
+                edge.inputClassName = UnorderedKVInput.class.getName();
+                // If prevOper.requestedParallelism changes based on no. of input splits
+                // it will reflect for sortOpers[0] so that 1-1 edge will work.
+                sortOpers[0].setRequestedParallelismByReference(samplerOper);
             }
-            sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
 
-            /*
-            // TODO: Convert to unsorted shuffle after TEZ-661
-            // edge.outputClassName = UnorderedKVOutput.class.getName();
-            // edge.inputClassName = UnorderedKVInput.class.getName();
-            edge.partitionerClass = RoundRobinPartitioner.class;
-            sortOpers[0].setRequestedParallelism(quantJobParallelismPair.second);
+            if (rp == -1) {
+                quantJobParallelismPair.first.setNeedEstimatedQuantile(true);
+            }
+            quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
             sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
-            */
 
-            TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
-            lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
-            lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
-
-            edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
+            // Broadcast the sample to Partitioner vertex
+            TezEdgeDescriptor edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
             TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.BROADCAST);
             POValueOutputTez sampleOut = (POValueOutputTez)quantJobParallelismPair.first.plan.getLeaves().get(0);
             sampleOut.addOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].setSampleOperator(quantJobParallelismPair.first);
 
+            lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
+
+            // Connect the Partitioner and Global Sort vertex
             edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
             edge.partitionerClass = WeightedRangePartitionerTez.class;
-
             curTezOp = sortOpers[1];
 
             // TODO: Review sort udf
@@ -2287,7 +2349,6 @@ public class TezCompiler extends PhyPlan
 //                curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
 //                curTezOp.isUDFComparatorUsed = true;
 //            }
-            quantJobParallelismPair.first.setSortOperator(sortOpers[1]);
 
             // If Order by followed by Limit and parallelism of order by is not 1
             // add a new vertex for Limit with parallelism 1.

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezEdgeDescriptor.java Fri Mar  4 18:17:39 2016
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
+import java.io.Serializable;
+
 import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
@@ -28,9 +30,9 @@ import org.apache.tez.runtime.library.ou
 /**
  * Descriptor for Tez edge. It holds combine plan as well as edge properties.
  */
-public class TezEdgeDescriptor {
+public class TezEdgeDescriptor implements Serializable {
     // Combiner runs on both input and output of Tez edge.
-    public PhysicalPlan combinePlan;
+    transient public PhysicalPlan combinePlan;
 
     public String inputClassName;
     public String outputClassName;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Mar  4 18:17:39 2016
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
@@ -50,11 +51,18 @@ public class TezOperPlan extends Operato
 
     private static final long serialVersionUID = 1L;
 
-    private Map<String, Path> extraResources = new HashMap<String, Path>();
+    private transient Map<String, Path> extraResources = new HashMap<String, Path>();
 
     private int estimatedTotalParallelism = -1;
 
+    private transient Credentials creds;
+
     public TezOperPlan() {
+        creds = new Credentials();
+    }
+
+    public Credentials getCredentials() {
+        return creds;
     }
 
     public int getEstimatedTotalParallelism() {
@@ -146,7 +154,7 @@ public class TezOperPlan extends Operato
                 URI resourceURI = new URI(fileName);
                 String fragment = resourceURI.getFragment();
 
-                Path remoteFsPath = new Path(resourceURI.getPath());
+                Path remoteFsPath = new Path(resourceURI);
                 String resourceName = (fragment != null && fragment.length() > 0) ? fragment : remoteFsPath.getName();
 
                 addExtraResource(resourceName, remoteFsPath);
@@ -185,10 +193,8 @@ public class TezOperPlan extends Operato
     public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException {
         List<TezOperator> list = new ArrayList<TezOperator>();
         list.add(root);
-        int prevSize = 0;
         int pos = 0;
-        while (list.size() > prevSize) {
-            prevSize = list.size();
+        while (list.size() > pos) {
             TezOperator node = list.get(pos);
             if (getSuccessors(node)!=null) {
                 for (TezOperator succ : getSuccessors(node)) {
@@ -234,5 +240,47 @@ public class TezOperPlan extends Operato
             super.remove(node);
         }
     }
+
+    // This method is used in PigGraceShuffleVertexManager to get a list of grandparents.
+    // Also need to exclude grandparents which also a parent (a is both parent and grandparent in the diagram below)
+    //    a   ->    c
+    //      \  b  /
+    //
+    public static List<TezOperator> getGrandParentsForGraceParallelism(TezOperPlan tezPlan, TezOperator op) {
+        List<TezOperator> grandParents = new ArrayList<TezOperator>();
+        List<TezOperator> preds = tezPlan.getPredecessors(op);
+        if (preds != null) {
+            for (TezOperator pred : preds) {
+                if (pred.isVertexGroup()) {
+                    grandParents.clear();
+                    return grandParents;
+                }
+                List<TezOperator> predPreds = tezPlan.getPredecessors(pred);
+                if (predPreds!=null) {
+                    for (TezOperator predPred : predPreds) {
+                        if (predPred.isVertexGroup()) {
+                            grandParents.clear();
+                            return grandParents;
+                        }
+                        if (!grandParents.contains(predPred)) {
+                            grandParents.add(predPred);
+                        }
+                    }
+                } else {
+                    grandParents.clear();
+                    break;
+                }
+            }
+
+            if (!grandParents.isEmpty()) {
+                for (TezOperator pred : preds) {
+                    if (grandParents.contains(pred)) {
+                        grandParents.remove(pred);
+                    }
+                }
+            }
+        }
+        return grandParents;
+    }
 }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Mar  4 18:17:39 2016
@@ -18,11 +18,16 @@
 package org.apache.pig.backend.hadoop.executionengine.tez.plan;
 
 import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -30,6 +35,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.TezOperDependencyParallelismEstimator.TezParallelismFactorVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.Operator;
@@ -50,7 +57,11 @@ public class TezOperator extends Operato
     private static final long serialVersionUID = 1L;
 
     // Processor pipeline
-    public PhysicalPlan plan;
+    // Note TezOperator needs to be serialized and de-serialized to
+    // be used in PigGraceShuffleVertexManager, some fields are either
+    // big, or not serializable, and not in use in PigGraceShuffleVertexManager,
+    // mark them as transient: plan, vertexGroupInfo, inputSplitInfo
+    public transient PhysicalPlan plan;
 
     // Descriptors for out-bound edges.
     public Map<OperatorKey, TezEdgeDescriptor> outEdges;
@@ -133,6 +144,12 @@ public class TezOperator extends Operato
 
     private boolean useMRMapSettings = false;
 
+    private boolean useGraceParallelism = false;
+
+    private Map<OperatorKey, Double> parallelismFactorPerSuccessor;
+
+    private Boolean intermediateReducer = null;
+
     // Types of blocking operators. For now, we only support the following ones.
     public static enum OPER_FEATURE {
         // Indicate if this job is a merge indexer
@@ -159,8 +176,12 @@ public class TezOperator extends Operato
         LIMIT_AFTER_SORT,
         // Indicate if this job is a union job
         UNION,
+        // Indicate if this job is a distinct job
+        DISTINCT,
         // Indicate if this job is a native job
-        NATIVE;
+        NATIVE,
+        // Indicate if this job does rank counter
+        RANK_COUNTER;
     };
 
     // Features in the job/vertex. Mostly will be only one feature.
@@ -170,16 +191,17 @@ public class TezOperator extends Operato
 
     private List<OperatorKey> vertexGroupMembers;
     // For union
-    private VertexGroupInfo vertexGroupInfo;
+    private transient VertexGroupInfo vertexGroupInfo;
     // Mapping of OperatorKey of POStore OperatorKey to vertexGroup TezOperator
     private Map<OperatorKey, OperatorKey> vertexGroupStores = null;
+    private boolean isVertexGroup = false;
 
-    public static class LoaderInfo {
+    public static class LoaderInfo implements Serializable {
         private List<POLoad> loads = null;
         private ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         private ArrayList<String> inpSignatureLists = new ArrayList<String>();
         private ArrayList<Long> inpLimits = new ArrayList<Long>();
-        private InputSplitInfo inputSplitInfo = null;
+        private transient InputSplitInfo inputSplitInfo = null;
         public List<POLoad> getLoads() {
             return loads;
         }
@@ -262,9 +284,10 @@ public class TezOperator extends Operato
         this.estimatedParallelism = estimatedParallelism;
     }
 
-    public int getEffectiveParallelism() {
+    public int getEffectiveParallelism(int defaultParallelism) {
         // PIG-4162: For intermediate reducers, use estimated parallelism over user set parallelism.
-        return getEstimatedParallelism() == -1 ? getRequestedParallelism()
+        return getEstimatedParallelism() == -1
+                ? (getRequestedParallelism() == -1 ? defaultParallelism : getRequestedParallelism())
                 : getEstimatedParallelism();
     }
 
@@ -405,6 +428,14 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.UNION.ordinal());
     }
 
+    public boolean isDistinct() {
+        return feature.get(OPER_FEATURE.DISTINCT.ordinal());
+    }
+
+    public void markDistinct() {
+        feature.set(OPER_FEATURE.DISTINCT.ordinal());
+    }
+
     public boolean isNative() {
         return feature.get(OPER_FEATURE.NATIVE.ordinal());
     }
@@ -413,6 +444,14 @@ public class TezOperator extends Operato
         feature.set(OPER_FEATURE.NATIVE.ordinal());
     }
 
+    public boolean isRankCounter() {
+        return feature.get(OPER_FEATURE.RANK_COUNTER.ordinal());
+    }
+
+    public void markRankCounter() {
+        feature.set(OPER_FEATURE.RANK_COUNTER.ordinal());
+    }
+
     public void copyFeatures(TezOperator copyFrom, List<OPER_FEATURE> excludeFeatures) {
         for (OPER_FEATURE opf : OPER_FEATURE.values()) {
             if (excludeFeatures != null && excludeFeatures.contains(opf)) {
@@ -440,7 +479,7 @@ public class TezOperator extends Operato
         this.useSecondaryKey = useSecondaryKey;
     }
 
-    public List<OperatorKey> getUnionPredecessors() {
+    public List<OperatorKey> getUnionMembers() {
         return vertexGroupMembers;
     }
 
@@ -462,7 +501,7 @@ public class TezOperator extends Operato
     // Union is the only operator that uses alias vertex (VertexGroup) now. But
     // more operators could be added to the list in the future.
     public boolean isVertexGroup() {
-        return vertexGroupInfo != null;
+        return isVertexGroup;
     }
 
     public VertexGroupInfo getVertexGroupInfo() {
@@ -471,6 +510,7 @@ public class TezOperator extends Operato
 
     public void setVertexGroupInfo(VertexGroupInfo vertexGroup) {
         this.vertexGroupInfo = vertexGroup;
+        this.isVertexGroup = true;
     }
 
     public void addVertexGroupStore(OperatorKey storeKey, OperatorKey vertexGroupKey) {
@@ -480,6 +520,16 @@ public class TezOperator extends Operato
         this.vertexGroupStores.put(storeKey, vertexGroupKey);
     }
 
+    public void removeVertexGroupStore(OperatorKey vertexGroupKey) {
+        Iterator<Entry<OperatorKey, OperatorKey>> iter = vertexGroupStores.entrySet().iterator();
+        while (iter.hasNext()) {
+            Entry<OperatorKey, OperatorKey> entry = iter.next();
+            if (entry.getValue().equals(vertexGroupKey)) {
+                iter.remove();
+            }
+        }
+    }
+
     public Map<OperatorKey, OperatorKey> getVertexGroupStores() {
         return this.vertexGroupStores;
     }
@@ -496,7 +546,7 @@ public class TezOperator extends Operato
     public String toString() {
         StringBuilder sb = new StringBuilder(name() + ":\n");
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        if (!plan.isEmpty()) {
+        if (plan!=null && !plan.isEmpty()) {
             plan.explain(baos);
             String mp = new String(baos.toByteArray());
             sb.append(shiftStringByTabs(mp, "|   "));
@@ -601,6 +651,52 @@ public class TezOperator extends Operato
         return loaderInfo;
     }
 
+    public void setUseGraceParallelism(boolean useGraceParallelism) {
+        this.useGraceParallelism = useGraceParallelism;
+    }
+    public boolean isUseGraceParallelism() {
+        return useGraceParallelism;
+    }
+
+    public double getParallelismFactor(TezOperator successor) throws VisitorException {
+        if (parallelismFactorPerSuccessor == null) {
+            parallelismFactorPerSuccessor = new HashMap<OperatorKey, Double>();
+        }
+        Double factor = parallelismFactorPerSuccessor.get(successor.getOperatorKey());
+        if (factor == null) {
+            // We determine different parallelism factors for different successors (edges).
+            // For eg: If we have two successors, one with combine plan and other without
+            // we want to compute lesser parallelism factor for the one with the combine plan
+            // as that edge will get less data.
+            // TODO: To be more perfect, we need only look at the split sub-plan that
+            // writes to that successor edge. If there is a FILTER in one sub-plan it is accounted
+            // for all the successors now which is not right.
+            TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(this, successor);
+            parallelismFactorVisitor.visit();
+            factor = parallelismFactorVisitor.getFactor();
+            parallelismFactorPerSuccessor.put(successor.getOperatorKey(), factor);
+        }
+        return factor;
+    }
+
+    public Boolean isIntermediateReducer() throws IOException {
+        if (intermediateReducer == null) {
+            intermediateReducer = false;
+            // set intermediateReducer to true if are no loads or stores in a TezOperator
+            LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(plan, POStore.class);
+            // Not map and not final reducer
+            if (stores.size() <= 0 &&
+                    (getLoaderInfo().getLoads() == null || getLoaderInfo().getLoads().size() <= 0)) {
+                intermediateReducer = true;
+            }
+        }
+        return intermediateReducer;
+    }
+
+    public void setIntermediateReducer(Boolean intermediateReducer) {
+        this.intermediateReducer = intermediateReducer;
+    }
+
     public static class VertexGroupInfo {
 
         private List<OperatorKey> inputKeys;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPOPackageAnnotator.java Fri Mar  4 18:17:39 2016
@@ -30,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -67,11 +68,13 @@ public class TezPOPackageAnnotator exten
         List<TezOperator> preds = this.mPlan.getPredecessors(pkgTezOp);
         for (Iterator<TezOperator> it = preds.iterator(); it.hasNext();) {
             TezOperator predTezOp = it.next();
+            TezOperator predTezOpVertexGrp = null;
             if (predTezOp.isVertexGroup()) {
+                predTezOpVertexGrp = predTezOp;
                 // Just get one of the inputs to vertex group
                 predTezOp = getPlan().getOperator(predTezOp.getVertexGroupMembers().get(0));
             }
-            lrFound += patchPackage(predTezOp, pkgTezOp, pkg);
+            lrFound += patchPackage(predTezOp, predTezOpVertexGrp, pkgTezOp, pkg);
             if(lrFound == pkg.getNumInps()) {
                 break;
             }
@@ -79,13 +82,19 @@ public class TezPOPackageAnnotator exten
 
         if(lrFound != pkg.getNumInps()) {
             int errCode = 2086;
-            String msg = "Unexpected problem during optimization. Could not find all LocalRearrange operators.";
+            String msg = "Unexpected problem during optimization. "
+                    + "Could not find all LocalRearrange operators. Expected: "
+                    + pkg.getNumInps() + ", Found: " + lrFound;
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
     }
 
-    private int patchPackage(TezOperator predTezOp, TezOperator pkgTezOp, POPackage pkg) throws VisitorException {
-        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(predTezOp.plan, pkgTezOp, pkg);
+    private int patchPackage(TezOperator predTezOp,
+            TezOperator predTezOpVertexGrp,
+            TezOperator pkgTezOp,
+            POPackage pkg) throws VisitorException {
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+                predTezOp.plan, predTezOpVertexGrp, pkgTezOp, pkg);
         lrDiscoverer.visit();
         // let our caller know if we managed to patch
         // the package
@@ -131,13 +140,24 @@ public class TezPOPackageAnnotator exten
         private int loRearrangeFound = 0;
         private TezOperator pkgTezOp;
         private POPackage pkg;
+        private TezOperator predTezOpVertexGrp;
+        private boolean isPOSplit;
 
-        public LoRearrangeDiscoverer(PhysicalPlan plan, TezOperator pkgTezOp, POPackage pkg) {
-            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+        public LoRearrangeDiscoverer(PhysicalPlan predPlan, TezOperator predTezOpVertexGrp, TezOperator pkgTezOp, POPackage pkg) {
+            super(predPlan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(predPlan));
             this.pkgTezOp = pkgTezOp;
             this.pkg = pkg;
+            this.predTezOpVertexGrp = predTezOpVertexGrp;
+        }
+
+        @Override
+        public void visitSplit(POSplit spl) throws VisitorException {
+            isPOSplit = true;
+            super.visitSplit(spl);
         }
 
+
+
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             POLocalRearrangeTez lr = (POLocalRearrangeTez) lrearrange;
@@ -160,17 +180,24 @@ public class TezPOPackageAnnotator exten
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
-            if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
-                // something is wrong - we should not be getting key info
-                // for the same index from two different Local Rearranges
-                int errCode = 2087;
-                String msg = "Unexpected problem during optimization." +
-                        " Found index:" + lrearrange.getIndex() +
-                        " in multiple LocalRearrange operators.";
-                throw new OptimizerException(msg, errCode, PigException.BUG);
+            Integer index = Integer.valueOf(lrearrange.getIndex());
+            if(keyInfo.get(index) != null) {
+                if (isPOSplit) {
+                    // Case of POSplit having more than one input in case of self join or union
+                    loRearrangeFound--;
+                } else {
+                    // something is wrong - we should not be getting key info
+                    // for the same index from two different Local Rearranges
+                    int errCode = 2087;
+                    String msg = "Unexpected problem during optimization." +
+                            " Found index:" + lrearrange.getIndex() +
+                            " in multiple LocalRearrange operators.";
+                    throw new OptimizerException(msg, errCode, PigException.BUG);
+                }
 
             }
-            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
+
+            keyInfo.put(index,
                     new Pair<Boolean, Map<Integer, Integer>>(
                             lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
             pkg.getPkgr().setKeyInfo(keyInfo);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainer.java Fri Mar  4 18:17:39 2016
@@ -31,6 +31,7 @@ import java.util.Set;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezResourceManager;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
@@ -152,38 +153,109 @@ public class TezPlanContainer extends Op
     }
 
     public void split(TezPlanContainerNode planNode) throws PlanException {
+
         TezOperPlan tezOperPlan = planNode.getTezOperPlan();
+        if (tezOperPlan.size() == 1) {
+            // Nothing to split. Only one operator in the plan
+            return;
+        }
+
         TezOperator operToSegment = null;
         List<TezOperator> succs = new ArrayList<TezOperator>();
-        for (TezOperator tezOper : tezOperPlan) {
-            if (tezOper.needSegmentBelow() && tezOperPlan.getSuccessors(tezOper)!=null) {
-                operToSegment = tezOper;
-                succs.addAll(tezOperPlan.getSuccessors(tezOper));
-                break;
-            }
+        try {
+            // Split top down from root to leaves
+            SegmentOperatorFinder finder = new SegmentOperatorFinder(tezOperPlan);
+            finder.visit();
+            operToSegment = finder.getOperatorToSegment();
+        } catch (VisitorException e) {
+            throw new PlanException(e);
         }
-        if (operToSegment != null) {
+
+        if (operToSegment != null && tezOperPlan.getSuccessors(operToSegment) != null) {
+            succs.addAll(tezOperPlan.getSuccessors(operToSegment));
             for (TezOperator succ : succs) {
                 tezOperPlan.disconnect(operToSegment, succ);
-                TezOperPlan newOperPlan = new TezOperPlan();
-                List<TezPlanContainerNode> containerSuccs = new ArrayList<TezPlanContainerNode>();
-                if (getSuccessors(planNode)!=null) {
-                    containerSuccs.addAll(getSuccessors(planNode));
-                }
-                tezOperPlan.moveTree(succ, newOperPlan);
-                TezPlanContainerNode newPlanNode = new TezPlanContainerNode(generateNodeOperatorKey(), newOperPlan);
-                add(newPlanNode);
-                for (TezPlanContainerNode containerNodeSucc : containerSuccs) {
-                    disconnect(planNode, containerNodeSucc);
-                    connect(newPlanNode, containerNodeSucc);
+            }
+            for (TezOperator succ : succs) {
+                try {
+                    if (tezOperPlan.getOperator(succ.getOperatorKey()) == null) {
+                        // Has already been moved to a new plan by previous successor
+                        // as part of dependency. It could have been further split.
+                        // So walk the full plan to find the new plan and connect
+                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+                        finder.visit();
+                        connect(planNode, finder.getPlanContainerNode());
+                        continue;
+                    }
+                    TezOperPlan newOperPlan = new TezOperPlan();
+                    tezOperPlan.moveTree(succ, newOperPlan);
+                    TezPlanContainerNode newPlanNode = new TezPlanContainerNode(
+                            generateNodeOperatorKey(), newOperPlan);
+                    add(newPlanNode);
+                    connect(planNode, newPlanNode);
+                    split(newPlanNode);
+                    if (newPlanNode.getTezOperPlan().getOperator(succ.getOperatorKey()) == null) {
+                        // On further split, the successor moved to a new plan container.
+                        // Connect to that
+                        TezOperatorFinder finder = new TezOperatorFinder(this, succ);
+                        finder.visit();
+                        disconnect(planNode, newPlanNode);
+                        connect(planNode, finder.getPlanContainerNode());
+                    }
+                } catch (VisitorException e) {
+                    throw new PlanException(e);
                 }
-                connect(planNode, newPlanNode);
-                split(newPlanNode);
             }
             split(planNode);
         }
     }
 
+    private static class SegmentOperatorFinder extends TezOpPlanVisitor {
+
+        private TezOperator operToSegment;
+
+        public SegmentOperatorFinder(TezOperPlan plan) {
+            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        }
+
+        public TezOperator getOperatorToSegment() {
+            return operToSegment;
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+            if (tezOperator.needSegmentBelow() && operToSegment == null) {
+                operToSegment = tezOperator;
+            }
+        }
+
+    }
+
+    private static class TezOperatorFinder extends TezPlanContainerVisitor {
+
+        private TezPlanContainerNode planContainerNode;
+        private TezOperator operatorToFind;
+
+        public TezOperatorFinder(TezPlanContainer plan, TezOperator operatorToFind) {
+            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan));
+            this.operatorToFind = operatorToFind;
+        }
+
+        public TezPlanContainerNode getPlanContainerNode() {
+            return planContainerNode;
+        }
+
+        @Override
+        public void visitTezPlanContainerNode(
+                TezPlanContainerNode tezPlanContainerNode)
+                throws VisitorException {
+            if (tezPlanContainerNode.getTezOperPlan().getOperatorKey(operatorToFind) != null) {
+                planContainerNode = tezPlanContainerNode;
+            }
+        }
+
+    }
+
     private synchronized OperatorKey generateNodeOperatorKey() {
         OperatorKey opKey = new OperatorKey(jobName + "-" + dagId + "_scope", scopeId);
         scopeId++;

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPlanContainerPrinter.java Fri Mar  4 18:17:39 2016
@@ -19,7 +19,8 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.PrintStream;
 
-import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezGraphPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezDAGGraphPrinter;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezPrinter.TezVertexGraphPrinter;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -37,6 +38,19 @@ public class TezPlanContainerPrinter ext
         mStream.println("#--------------------------------------------------");
         mStream.println("# There are " + planContainer.size() + " DAGs in the session");
         mStream.println("#--------------------------------------------------");
+        printContainerPlan(planContainer);
+    }
+
+    private void printContainerPlan(TezPlanContainer planContainer) {
+        try {
+            if (planContainer.size() > 1) {
+                TezDAGGraphPrinter graphPrinter = new TezDAGGraphPrinter(planContainer);
+                graphPrinter.visit();
+                mStream.print(graphPrinter.toString());
+            }
+        } catch (VisitorException e) {
+            throw new RuntimeException(e);
+        }
     }
 
     public void setVerbose(boolean verbose) {
@@ -48,7 +62,7 @@ public class TezPlanContainerPrinter ext
         mStream.println("#--------------------------------------------------");
         mStream.println("# TEZ DAG plan: " + tezPlanContainerNode.getOperatorKey());
         mStream.println("#--------------------------------------------------");
-        TezGraphPrinter graphPrinter = new TezGraphPrinter(tezPlanContainerNode.getTezOperPlan());
+        TezVertexGraphPrinter graphPrinter = new TezVertexGraphPrinter(tezPlanContainerNode.getTezOperPlan());
         graphPrinter.visit();
         mStream.print(graphPrinter.toString());
         TezPrinter printer = new TezPrinter(mStream, tezPlanContainerNode.getTezOperPlan());

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezPrinter.java Fri Mar  4 18:17:39 2016
@@ -55,9 +55,13 @@ public class TezPrinter extends TezOpPla
     public void visitTezOp(TezOperator tezOper) throws VisitorException {
         if (tezOper.isVertexGroup()) {
             VertexGroupInfo info = tezOper.getVertexGroupInfo();
-            mStream.println("Tez vertex group "
-                    + tezOper.getOperatorKey().toString() + "\t<-\t "
-                    + info.getInputs() + "\t->\t " + info.getOutput());
+            mStream.print("Tez vertex group "
+                    + tezOper.getOperatorKey().toString());
+            if (info!=null) {
+                mStream.println("\t<-\t " + info.getInputs() + "\t->\t " + info.getOutput());
+            } else {
+                mStream.println();
+            }
             mStream.println("# No plan on vertex group");
         } else {
             mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
@@ -86,29 +90,36 @@ public class TezPrinter extends TezOpPla
             printer.setVerbose(isVerbose);
             printer.visit();
             mStream.println();
+        } else if (!tezOper.isVertexGroup()) {
+            // For things like NativeTezOper
+            mStream.println("" + tezOper);
         }
     }
 
     /**
      * This class prints the Tez Vertex Graph
      */
-    public static class TezGraphPrinter extends TezOpPlanVisitor {
+    public static class TezVertexGraphPrinter extends TezOpPlanVisitor {
 
-        StringBuffer buf;
+        StringBuilder buf;
 
-        public TezGraphPrinter(TezOperPlan plan) {
+        public TezVertexGraphPrinter(TezOperPlan plan) {
             super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan, true));
-            buf = new StringBuffer();
+            buf = new StringBuilder();
         }
 
         @Override
         public void visitTezOp(TezOperator tezOper) throws VisitorException {
+            writePlan(mPlan, tezOper, buf);
+        }
+
+        public static void writePlan(TezOperPlan plan, TezOperator tezOper, StringBuilder buf) {
             if (tezOper.isVertexGroup()) {
                 buf.append("Tez vertex group " + tezOper.getOperatorKey().toString());
             } else {
                 buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
             }
-            List<TezOperator> succs = mPlan.getSuccessors(tezOper);
+            List<TezOperator> succs = plan.getSuccessors(tezOper);
             if (succs != null) {
                 Collections.sort(succs);
                 buf.append("\t->\t");
@@ -121,6 +132,43 @@ public class TezPrinter extends TezOpPla
                 }
             }
             buf.append("\n");
+        }
+
+        @Override
+        public String toString() {
+            buf.append("\n");
+            return buf.toString();
+        }
+    }
+
+    /**
+     * This class prints the Tez DAG Graph
+     */
+    public static class TezDAGGraphPrinter extends TezPlanContainerVisitor {
+
+        StringBuilder buf;
+
+        public TezDAGGraphPrinter(TezPlanContainer plan) {
+            super(plan, new DependencyOrderWalker<TezPlanContainerNode, TezPlanContainer>(plan, true));
+            buf = new StringBuilder();
+        }
+
+        @Override
+        public void visitTezPlanContainerNode(TezPlanContainerNode tezPlanContainerNode) throws VisitorException {
+            writePlan(mPlan, tezPlanContainerNode, buf);
+        }
+
+        public static void writePlan(TezPlanContainer mPlan, TezPlanContainerNode tezPlanContainerNode, StringBuilder buf) {
+            buf.append("Tez DAG " + tezPlanContainerNode.getOperatorKey().toString());
+            List<TezPlanContainerNode> succs = mPlan.getSuccessors(tezPlanContainerNode);
+            if (succs != null) {
+                Collections.sort(succs);
+                buf.append("\t->\t");
+                for (TezPlanContainerNode op : succs) {
+                    buf.append("Tez DAG " + op.getOperatorKey().toString()).append(",");
+                }
+            }
+            buf.append("\n");
         }
 
         @Override

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java Fri Mar  4 18:17:39 2016
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezInput;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalInput;
@@ -56,6 +55,7 @@ public class POCounterStatsTez extends P
     // KeyValuesReader. After TEZ-661, switch to unsorted shuffle
     private transient KeyValuesReader reader;
     private transient KeyValueWriter writer;
+    private transient boolean finished = false;
 
     public POCounterStatsTez(OperatorKey k) {
         super(k);
@@ -123,6 +123,9 @@ public class POCounterStatsTez extends P
     @Override
     public Result getNextTuple() throws ExecException {
         try {
+            if (finished) {
+                return RESULT_EOP;
+            }
             Map<Integer, Long> counterRecords = new HashMap<Integer, Long>();
             Integer key = null;
             Long value = null;
@@ -148,9 +151,10 @@ public class POCounterStatsTez extends P
                 prevTasksCount += counterRecords.get(i);
             }
 
-            Tuple tuple = TupleFactory.getInstance().newTuple(1);
+            Tuple tuple = mTupleFactory.newTuple(1);
             tuple.set(0, counterOffsets);
             writer.write(POValueOutputTez.EMPTY_KEY, tuple);
+            finished = true;
             return RESULT_EOP;
         } catch (IOException e) {
             throw new ExecException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java Fri Mar  4 18:17:39 2016
@@ -53,11 +53,13 @@ public class POFRJoinTez extends POFRJoi
     private static final Log log = LogFactory.getLog(POFRJoinTez.class);
     private static final long serialVersionUID = 1L;
 
-    // For replicated tables
-    private List<LogicalInput> replInputs = Lists.newArrayList();
-    private List<KeyValueReader> replReaders = Lists.newArrayList();
     private List<String> inputKeys;
+
+    // For replicated tables
+    private transient List<LogicalInput> replInputs;
+    private transient List<KeyValueReader> replReaders;
     private transient boolean isInputCached;
+    private transient String cacheKey;
 
     public POFRJoinTez(POFRJoin copy, List<String> inputKeys) throws ExecException {
        super(copy);
@@ -71,14 +73,14 @@ public class POFRJoinTez extends POFRJoi
 
     @Override
     public void replaceInput(String oldInputKey, String newInputKey) {
-        if (inputKeys.remove(oldInputKey)) {
+        while (inputKeys.remove(oldInputKey)) {
             inputKeys.add(newInputKey);
         }
     }
 
     @Override
     public void addInputsToSkip(Set<String> inputsToSkip) {
-        String cacheKey = "replicatemap-" + getOperatorKey().toString();
+        cacheKey = "replicatemap-" + inputKeys.toString();
         Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
         if (cacheValue != null) {
             isInputCached = true;
@@ -93,10 +95,14 @@ public class POFRJoinTez extends POFRJoi
             return;
         }
         try {
+            this.replInputs = Lists.newArrayList();
+            this.replReaders = Lists.newArrayList();
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
-                this.replInputs.add(input);
-                this.replReaders.add((KeyValueReader) input.getReader());
+                if (!this.replInputs.contains(input)) {
+                    this.replInputs.add(input);
+                    this.replReaders.add((KeyValueReader) input.getReader());
+                }
             }
         } catch (Exception e) {
             throw new ExecException(e);
@@ -110,10 +116,11 @@ public class POFRJoinTez extends POFRJoi
      */
     @Override
     protected void setUpHashMap() throws ExecException {
-        String cacheKey = "replicatemap-" + getOperatorKey().toString();
 
-        if (isInputCached) {
-            Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        // Re-check again in case of Split + union + replicate join
+        // where same POFRJoinTez occurs in different Split sub-plans
+        Object cacheValue = ObjectCache.getInstance().retrieve(cacheKey);
+        if (cacheValue != null) {
             replicates = (TupleToMapKey[]) cacheValue;
             log.info("Found " + (replicates.length - 1) + " replication hash tables in Tez cache. cachekey=" + cacheKey);
             return;
@@ -208,4 +215,10 @@ public class POFRJoinTez extends POFRJoi
     public List<String> getInputKeys() {
         return inputKeys;
     }
+
+    @Override
+    public POFRJoinTez clone() throws CloneNotSupportedException {
+        return (POFRJoinTez) super.clone();
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java Fri Mar  4 18:17:39 2016
@@ -56,10 +56,12 @@ public class POIdentityInOutTez extends
     private transient KeyValueReader reader;
     private transient KeyValuesReader shuffleReader;
     private transient boolean shuffleInput;
+    private transient boolean finished = false;
 
-    public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange) {
+    public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange, String inputKey) {
         super(inputRearrange);
         this.mKey = k;
+        this.inputKey = inputKey;
     }
 
     public void setInputKey(String inputKey) {
@@ -121,6 +123,9 @@ public class POIdentityInOutTez extends
     @Override
     public Result getNextTuple() throws ExecException {
         try {
+            if (finished) {
+                return RESULT_EOP;
+            }
             if (shuffleInput) {
                 while (shuffleReader.next()) {
                     Object curKey = shuffleReader.getCurrentKey();
@@ -152,6 +157,7 @@ public class POIdentityInOutTez extends
                     }
                 }
             }
+            finished = true;
             return RESULT_EOP;
         } catch (IOException e) {
             throw new ExecException(e);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java?rev=1733627&r1=1733626&r2=1733627&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POLocalRearrangeTez.java Fri Mar  4 18:17:39 2016
@@ -31,9 +31,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.TezOutput;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
@@ -48,11 +48,11 @@ public class POLocalRearrangeTez extends
     private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class);
 
     protected String outputKey;
-    protected transient KeyValueWriter writer;
-
     protected boolean connectedToPackage = true;
     protected boolean isSkewedJoin = false;
 
+    protected transient KeyValueWriter writer;
+
     public POLocalRearrangeTez(OperatorKey k) {
         super(k);
     }
@@ -144,6 +144,13 @@ public class POLocalRearrangeTez extends
                     // assign the tuple to its slot in the projection.
                     key.setIndex(index);
                     val.setIndex(index);
+                    if (isSkewedJoin) {
+                        // Wrap into a NullablePartitionWritable to match the key
+                        // of the right table from POPartitionRearrangeTez for the skewed join
+                        NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
+                        wrappedKey.setPartition(-1);
+                        key = wrappedKey;
+                    }
                     writer.write(key, val);
                 } else {
                     illustratorMarkup(res.result, res.result, 0);
@@ -164,20 +171,9 @@ public class POLocalRearrangeTez extends
         return inp;
     }
 
-    /**
-     * Make a deep copy of this operator.
-     * @throws CloneNotSupportedException
-     */
     @Override
     public POLocalRearrangeTez clone() throws CloneNotSupportedException {
-        POLocalRearrangeTez clone = new POLocalRearrangeTez(new OperatorKey(
-                mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(
-                        mKey.scope)), requestedParallelism);
-        deepCopyTo(clone);
-        clone.isSkewedJoin = isSkewedJoin;
-        clone.connectedToPackage = connectedToPackage;
-        clone.setOutputKey(outputKey);
-        return clone;
+        return (POLocalRearrangeTez) super.clone();
     }
 
     @Override



Mime
View raw message