pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject svn commit: r1603243 [1/2] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/...
Date Tue, 17 Jun 2014 18:15:27 GMT
Author: daijy
Date: Tue Jun 17 18:15:26 2014
New Revision: 1603243

URL: http://svn.apache.org/r1603243
Log:
PIG-3846: Implement automatic reducer parallelism

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
    pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
    pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
    pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java
    pig/trunk/test/tez-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Tue Jun 17 18:15:26 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-3846: Implement automatic reducer parallelism (daijy)
+
 PIG-3939: SPRINTF function to format strings using a printf-style template (mrflip via cheolsoo)
 
 PIG-3970: Merge Tez branch into trunk (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Tue Jun 17 18:15:26 2014
@@ -24,6 +24,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -31,12 +32,6 @@ public class POPoissonSample extends Phy
 
     private static final long serialVersionUID = 1L;
 
-    // marker string to mark the last sample row, which has total number or rows
-    // seen by this map instance. this string will be in the 2nd last column of
-    // the last sample row it is used by GetMemNumRows.
-    public static final String NUMROWS_TUPLE_MARKER =
-        "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah";
-
     private static final TupleFactory tf = TupleFactory.getInstance();
     private static Result eop = new Result(POStatus.STATUS_EOP, null);
 
@@ -226,7 +221,7 @@ public class POPoissonSample extends Phy
             }
         }
 
-        t.set(sz, NUMROWS_TUPLE_MARKER);
+        t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
         t.set(sz + 1, rowNum);
         numRowSplTupleReturned = true;
         return new Result(POStatus.STATUS_OK, t);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Tue Jun 17 18:15:26 2014
@@ -26,11 +26,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class POReservoirSample extends PhysicalOperator {
 
+    private static final TupleFactory tf = TupleFactory.getInstance();
+
     private static final long serialVersionUID = 1L;
 
     // number of samples to be sampled
@@ -45,6 +49,9 @@ public class POReservoirSample extends P
     //array to store the result
     private transient Result[] samples = null;
 
+    // last sample result
+    private Result lastSample = null;
+
     public POReservoirSample(OperatorKey k) {
         this(k, -1, null);
     }
@@ -101,7 +108,6 @@ public class POReservoirSample extends P
             }
         }
 
-        int rowNum = rowProcessed;
         Random randGen = new Random();
 
         while (true) {
@@ -114,11 +120,11 @@ public class POReservoirSample extends P
             }
 
             // collect samples until input is exhausted
-            int rand = randGen.nextInt(rowNum);
+            int rand = randGen.nextInt(rowProcessed);
             if (rand < numSamples) {
                 samples[rand] = res;
             }
-            rowNum++;
+            rowProcessed++;
         }
 
         if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
@@ -129,6 +135,26 @@ public class POReservoirSample extends P
     }
 
     private Result getSample() throws ExecException {
+        if (lastSample == null) {
+            lastSample = retrieveSample();
+        }
+        if (lastSample.returnStatus==POStatus.STATUS_EOP) {
+            return lastSample;
+        }
+        
+        Result currentSample = retrieveSample();
+        // If this is the last sample, tag with number of rows
+        if (currentSample.returnStatus == POStatus.STATUS_EOP) {
+            lastSample = createNumRowTuple((Tuple)lastSample.result);
+        } else if (currentSample.returnStatus == POStatus.STATUS_NULL) {
+            return currentSample;
+        }
+        Result result = lastSample;
+        lastSample = currentSample;
+        return result;
+    }
+
+    private Result retrieveSample() throws ExecException {
         if(nextSampleIdx < samples.length){
             if (illustrator != null) {
                 illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
@@ -140,7 +166,8 @@ public class POReservoirSample extends P
             return res;
         }
         else{
-            Result res = new Result(POStatus.STATUS_EOP, null);
+            Result res;
+            res = new Result(POStatus.STATUS_EOP, null);
             return res;
         }
     }
@@ -159,4 +186,24 @@ public class POReservoirSample extends P
     public String name() {
         return getAliasString() + "ReservoirSample - " + mKey.toString();
     }
+
+    /**
+     * @param sample - sample tuple
+     * @return - Tuple appended with special marker string column, num-rows column
+     * @throws ExecException
+     */
+    private Result createNumRowTuple(Tuple sample) throws ExecException {
+        int sz = (sample == null) ? 0 : sample.size();
+        Tuple t = tf.newTuple(sz + 2);
+
+        if (sample != null) {
+            for (int i=0; i<sample.size(); i++){
+                t.set(i, sample.get(i));
+            }
+        }
+
+        t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
+        t.set(sz + 1, (long)rowProcessed);
+        return new Result(POStatus.STATUS_OK, t);
+    }
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.builtin.FindQuantiles;
+
+public class FindQuantilesTez extends FindQuantiles {
+
+    private static final Log LOG = LogFactory.getLog(FindQuantilesTez.class);
+
+    private static TupleFactory tf = TupleFactory.getInstance();
+    
+    public FindQuantilesTez() {
+        super();
+    }
+    
+    public FindQuantilesTez(String[] args) {
+        super(args);
+    }
+    
+    @Override
+    public Map<String, Object> exec(Tuple in) throws IOException {
+        // In Tez, we also need to estimate the quantiles with regard to sample
+        // and the special tuple containing the total number of records
+        int estimatedNumReducers = -1;
+        boolean estimate_sample_quantile = PigMapReduce.sJobConfInternal.get().getBoolean
+                (PigProcessor.ESTIMATE_PARALLELISM, false);
+        DataBag mySamples = (DataBag)in.get(1);
+        this.samples = BagFactory.getInstance().newDefaultBag();
+        Iterator<Tuple> iter = mySamples.iterator();
+        Tuple t;
+        //total input rows for the order by
+        long totalInputRows = 0;
+        long sampleSize = 0;
+        while (iter.hasNext()) {
+            t = iter.next();
+            if (t.get(t.size() - 1) != null) {
+                totalInputRows += (Long)t.get(t.size() - 1);
+            }
+            if (t.get(t.size() - 2) != null) {
+                sampleSize += getMemorySize(t);
+            }
+            if (t.size() > 2) {
+                Tuple newTuple = tf.newTuple(t.size()-2);
+                for (int i=0;i<t.size()-2;i++) {
+                    newTuple.set(i, t.get(i));
+                }
+                this.samples.add(newTuple);
+            }
+        }
+        if (estimate_sample_quantile) {
+            Integer specifiedNumQuantiles = (Integer)in.get(0);
+
+            long bytesPerTask = PigMapReduce.sJobConfInternal.get().getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+
+            long estimatedInputSize = (long)((double)sampleSize/mySamples.size() * totalInputRows);
+            estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
+            estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+
+            LOG.info("Estimating parallelism: estimatedInputSize is " + estimatedInputSize + ". bytesPerTask is " + bytesPerTask + ". estimatedNumQuantiles is " + estimatedNumReducers + ".");
+
+            this.numQuantiles = estimatedNumReducers;
+            LOG.info("Use estimated parallelism instead:" + estimatedNumReducers);
+        }
+        Map<String, Object> result = super.exec(in);
+        if (estimate_sample_quantile) {
+            result.put(PigProcessor.ESTIMATED_NUM_PARALLELISM, numQuantiles);
+        }
+        PigProcessor.sampleMap = result;
+        return result;
+    }
+
+    // the last field of the tuple is a tuple for memory size and disk size
+    protected long getMemorySize(Tuple t) {
+        int s = t.size();
+        try {
+            return (Long) t.get(s - 2);
+        } catch (ExecException e) {
+            throw new RuntimeException(
+                    "Unable to retrive the size field from tuple.", e);
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Jun 17 18:15:26 2014
@@ -172,5 +172,8 @@ public class MultiQueryOptimizerTez exte
                 parentOper.outEdges.put(entry.getKey(), entry.getValue());
             }
         }
+        if (subPlanOper.isSampler()) {
+            parentOper.markSampler();
+        }
     }
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.builtin.PartitionSkewedKeys;
+
+public class PartitionSkewedKeysTez extends PartitionSkewedKeys {
+    private static final Log LOG = LogFactory.getLog(PartitionSkewedKeysTez.class);
+
+    public PartitionSkewedKeysTez() {
+        super();
+    }
+
+    public PartitionSkewedKeysTez(String[] args) {
+        super(args);
+    }
+
+    @Override
+    public Map<String, Object> exec(Tuple in) throws IOException {
+        if (in == null || in.size() == 0) {
+            return null;
+        }
+        
+        int estimatedNumReducers = -1;
+        boolean estimate_sample_quantile = PigMapReduce.sJobConfInternal.get().getBoolean
+                (PigProcessor.ESTIMATE_PARALLELISM, false);
+        if (estimate_sample_quantile) {
+            int specifiedNumReducer = (Integer) in.get(0);
+            DataBag samples = (DataBag) in.get(1);
+
+            long totalSampleSize = 0;
+            long totalInputRows = 0;
+            Iterator<Tuple> iter = samples.iterator();
+            while (iter.hasNext()) {
+                Tuple t = iter.next();
+                totalInputRows += (Long)t.get(t.size() - 1);
+                totalSampleSize += getMemorySize(t);
+            }
+            long totalSampleCount_ = samples.size();
+
+            long estimatedInputSize = (long)((double)totalSampleSize/totalSampleCount_ * totalInputRows);
+
+            long bytesPerTask = PigMapReduce.sJobConfInternal.get().getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER);
+
+            estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask);
+            estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+
+            LOG.info("Estimating parallelism: estimatedInputSize is " + estimatedInputSize + ". bytesPerTask is " + bytesPerTask + ". estimatedNumReducers is " + estimatedNumReducers + ".");
+
+            this.totalReducers_ = estimatedNumReducers;
+            LOG.info("Use estimated reducer instead:" + estimatedNumReducers + ", orig: " + specifiedNumReducer);
+        }
+        Map<String, Object> result = super.exec(in);
+        if (estimate_sample_quantile) {
+            result.put(PigProcessor.ESTIMATED_NUM_PARALLELISM, totalReducers_);
+        }
+        PigProcessor.sampleMap = result;
+        return result;
+    }
+}

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
+import org.apache.tez.dag.api.InputDescriptor;
+import org.apache.tez.dag.api.VertexManagerPlugin;
+import org.apache.tez.dag.api.VertexManagerPluginContext;
+import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint;
+import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager;
+import org.apache.tez.runtime.api.Event;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
+
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
+/**
+ * VertexManagerPlugin used by sorting job of order by and skewed join. 
+ * What is does is to set parallelism of the sorting vertex
+ * according to numParallelism specified by the predecessor vertex.
+ * The complex part is the PigOrderByEdgeManager, which specify how
+ * partition in the previous setting routes to the new vertex setting
+ */
+public class PartitionerDefinedVertexManager implements VertexManagerPlugin {
+
+    private VertexManagerPluginContext context;
+    private boolean isParallelismSet = false;
+    
+    private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class);
+
+    @Override
+    public void initialize(VertexManagerPluginContext context) {
+        this.context = context;
+    }
+
+    @Override
+    public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor,
+            List<Event> events) {
+        // Nothing to do
+    }
+
+    @Override
+    public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) {
+        // Nothing to do
+    }
+
+    @Override
+    public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) {
+        // There could be multiple partition vertex sending VertexManagerEvent
+        // Only need to setVertexParallelism once
+        if (isParallelismSet) {
+            return;
+        }
+        isParallelismSet = true;
+        int dynamicParallelism = -1;
+        // Need to distinguish from VertexManagerEventPayloadProto emitted by OnFileSortedOutput
+        if (vmEvent.getUserPayload().length==4) {
+            dynamicParallelism = Ints.fromByteArray(vmEvent.getUserPayload());
+        } else {
+            return;
+        }
+        int currentParallelism = context.getVertexNumTasks(context.getVertexName());
+        if (dynamicParallelism != -1) {
+            if (dynamicParallelism!=currentParallelism) {
+                LOG.info("Pig Partitioner Defined Vertex Manager: reset parallelism to " + dynamicParallelism
+                        + " from " + currentParallelism);
+                Map<String, EdgeManagerDescriptor> edgeManagers =
+                    new HashMap<String, EdgeManagerDescriptor>();
+                for(String vertex : context.getInputVertexEdgeProperties().keySet()) {
+                    EdgeManagerDescriptor edgeManagerDescriptor =
+                            new EdgeManagerDescriptor(ScatterGatherEdgeManager.class.getName());
+                    edgeManagers.put(vertex, edgeManagerDescriptor);
+                }
+                context.setVertexParallelism(dynamicParallelism, null, edgeManagers);
+            }
+            List<TaskWithLocationHint> tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism);
+            for (int i=0; i<dynamicParallelism; ++i) {
+                tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
+            }
+            context.scheduleVertexTasks(tasksToStart);
+        }
+    }
+
+    @Override
+    public void onVertexStarted(Map<String, List<Integer>> completions) {
+        // Nothing to do
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Jun 17 18:15:26 2014
@@ -57,14 +57,31 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.api.TezProcessorContext;
+import org.apache.tez.runtime.api.events.VertexManagerEvent;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 
+import com.google.common.collect.Lists;
+import com.google.common.primitives.Ints;
+
 public class PigProcessor implements LogicalIOProcessor {
 
     private static final Log LOG = LogFactory.getLog(PigProcessor.class);
     // Names of the properties that store serialized physical plans
     public static final String PLAN = "pig.exec.tez.plan";
     public static final String COMBINE_PLAN = "pig.exec.tez.combine.plan";
+    // This flag is used in both order by and skewed job. This is a configuration
+    // entry to instruct sample job to dynamically estimate parallelism
+    public static final String ESTIMATE_PARALLELISM = "pig.exec.estimate.parallelism";
+    // This flag is used in both order by and skewed job.
+    // This is the key in sampleMap of estimated parallelism
+    public static final String ESTIMATED_NUM_PARALLELISM = "pig.exec.estimated.num.parallelism";
+
+    // The operator key for sample vertex, used by partition vertex to collect sample
+    public static final String SAMPLE_VERTEX = "pig.sampleVertex";
+
+    // The operator key for sort vertex, used by sample vertex to send parallelism event
+    // if Pig need to estimate parallelism of sort vertex
+    public static final String SORT_VERTEX = "pig.sortVertex";
 
     private PhysicalPlan execPlan;
 
@@ -74,6 +91,8 @@ public class PigProcessor implements Log
 
     private Configuration conf;
     private PigHadoopLogger pigHadoopLogger;
+    
+    private TezProcessorContext processorContext;
 
     public static String sampleVertex;
     public static Map<String, Object> sampleMap;
@@ -82,6 +101,8 @@ public class PigProcessor implements Log
     @Override
     public void initialize(TezProcessorContext processorContext)
             throws Exception {
+        this.processorContext = processorContext;
+        
         // Reset any static variables to avoid conflict in container-reuse.
         sampleVertex = null;
         sampleMap = null;
@@ -191,6 +212,22 @@ public class PigProcessor implements Log
                     fileOutput.commit();
                 }
             }
+
+            // send event containing parallelism to sorting job of order by / skewed join
+            if (conf.getBoolean(ESTIMATE_PARALLELISM, false)) {
+                int parallelism = 1;
+                if (sampleMap!=null && sampleMap.containsKey(ESTIMATED_NUM_PARALLELISM)) {
+                    parallelism = (Integer)sampleMap.get(ESTIMATED_NUM_PARALLELISM);
+                }
+                String sortingVertex = conf.get(SORT_VERTEX);
+                // Should contain only 1 output for sampleAggregation job
+                LOG.info("Sending numParallelism " + parallelism + " to " + sortingVertex);
+                VertexManagerEvent vmEvent = new VertexManagerEvent(
+                        sortingVertex, Ints.toByteArray(parallelism));
+                List<Event> events = Lists.newArrayListWithCapacity(1);
+                events.add(vmEvent);
+                processorContext.sendEvents(events);
+            }
         } catch (Exception e) {
             abortOutput();
             LOG.error("Encountered exception while processing: ", e);
@@ -304,8 +341,9 @@ public class PigProcessor implements Log
 
     @SuppressWarnings({ "unchecked" })
     private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception {
-        Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex);
-        if (cached == Boolean.TRUE) {
+        String quantileMapCacheKey = "sample-" + sampleVertex  + ".cached";
+        sampleMap =  (Map<String, Object>)ObjectCache.getInstance().retrieve(quantileMapCacheKey);
+        if (sampleMap != null) {
             return;
         }
         LOG.info("Starting fetch of input " + logicalInput + " from vertex " + sampleVertex);
@@ -317,6 +355,9 @@ public class PigProcessor implements Log
             // Sample is not empty
             Tuple t = (Tuple) val;
             sampleMap = (Map<String, Object>) t.get(0);
+            ObjectCache.getInstance().cache(quantileMapCacheKey, sampleMap);
+        } else {
+            LOG.warn("Cannot fetch sample from " + sampleVertex);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Tue Jun 17 18:15:26 2014
@@ -31,23 +31,9 @@ import org.apache.pig.impl.util.Pair;
 public class SkewedPartitionerTez extends SkewedPartitioner {
     private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class);
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void init() {
 
-        ObjectCache cache = ObjectCache.getInstance();
-        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
-        String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers";
-        String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap";
-        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
-            totalReducers = (Integer) cache.retrieve(totalReducersCacheKey);
-            reducerMap = (Map<Tuple, Pair<Integer, Integer>>) cache.retrieve(reducerMapCacheKey);
-            LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey="
-                    + totalReducersCacheKey + "," + reducerMapCacheKey);
-            inited = true;
-            return;
-        }
-
         Map<String, Object> distMap = null;
         if (PigProcessor.sampleMap != null) {
             // We've collected sampleMap in PigProcessor
@@ -98,9 +84,6 @@ public class SkewedPartitionerTez extend
             throw new RuntimeException(e);
         }
         LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
-        cache.cache(isCachedKey, Boolean.TRUE);
-        cache.cache(totalReducersCacheKey, totalReducers);
-        cache.cache(reducerMapCacheKey, reducerMap);
         inited = true;
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Jun 17 18:15:26 2014
@@ -88,7 +88,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
-import org.apache.pig.impl.builtin.FindQuantiles;
 import org.apache.pig.impl.builtin.GetMemNumRows;
 import org.apache.pig.impl.builtin.PartitionSkewedKeys;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -1183,6 +1182,7 @@ public class TezCompiler extends PhyPlan
             TezOperator prevOp = compiledInputs[0];
             prevOp.plan.addAsLeaf(lrTez);
             prevOp.plan.addAsLeaf(poSample);
+            prevOp.markSampler();
 
             MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
             List<PhysicalOperator> l = plan.getPredecessors(op);
@@ -1230,23 +1230,21 @@ public class TezCompiler extends PhyPlan
             prevOp.plan.addAsLeaf(lrTezSample);
             prevOp.setClosed(true);
 
-            POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(),
+            int rp = op.getRequestedParallelism();
+            if (rp == -1) {
+                rp = pigContext.defaultParallel;
+            }
+
+            POSort sort = new POSort(op.getOperatorKey(), rp,
                     null, groups, ascCol, null);
             String per = pigProperties.getProperty("pig.skewedjoin.reduce.memusage",
                     String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
             String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0");
 
-            int rp = Math.max(op.getRequestedParallelism(), 1);
             Pair<TezOperator, Integer> sampleJobPair = getSamplingAggregationJob(sort, rp, null,
-                    PartitionSkewedKeys.class.getName(), new String[]{per, mc});
+                    PartitionSkewedKeysTez.class.getName(), new String[]{per, mc});
             rp = sampleJobPair.second;
 
-            // Set parallelism of SkewedJoin as the value calculated by sampling
-            // job if "parallel" is specified in join statement, "rp" is equal
-            // to that number if not specified, use the value that sampling
-            // process calculated based on default.
-            op.setRequestedParallelism(rp);
-
             TezOperator[] joinJobs = new TezOperator[] {null, compiledInputs[1], null};
             TezOperator[] joinInputs = new TezOperator[] {compiledInputs[0], compiledInputs[1]};
             TezOperator[] rearrangeOutputs = new TezOperator[2];
@@ -1259,12 +1257,6 @@ public class TezCompiler extends PhyPlan
             // It just partitions the data from first vertex based on the quantiles from sample vertex.
             joinJobs[0] = curTezOp;
 
-            // Run POLocalRearrange for first join table. Note we set the
-            // parallelism of POLocalRearrange to that of the load vertex. So
-            // its parallelism will be determined by the size of skewed table.
-            //TODO: Check if this really works as load vertex parallelism
-            // is determined during vertex construction.
-            lrTez.setRequestedParallelism(prevOp.getRequestedParallelism());
             try {
                 lrTez.setIndex(0);
             } catch (ExecException e) {
@@ -1288,6 +1280,7 @@ public class TezCompiler extends PhyPlan
             identityInOutTez.setInputKey(prevOp.getOperatorKey().toString());
             joinJobs[0].plan.addAsLeaf(identityInOutTez);
             joinJobs[0].setClosed(true);
+            joinJobs[0].markSampleBasedPartitioner();
             rearrangeOutputs[0] = joinJobs[0];
 
             compiledInputs = new TezOperator[] {joinInputs[1]};
@@ -1323,9 +1316,7 @@ public class TezCompiler extends PhyPlan
             gr.setResultType(DataType.TUPLE);
             gr.visit(this);
             joinJobs[2] = curTezOp;
-            if (gr.getRequestedParallelism() > joinJobs[2].getRequestedParallelism()) {
-                joinJobs[2].setRequestedParallelism(gr.getRequestedParallelism());
-            }
+            joinJobs[2].setRequestedParallelism(rp);
 
             compiledInputs = new TezOperator[] {joinJobs[2]};
 
@@ -1396,6 +1387,11 @@ public class TezCompiler extends PhyPlan
             }
 
             joinJobs[2].setSkewedJoin(true);
+            sampleJobPair.first.sortOperator = joinJobs[2];
+
+            if (rp == -1) {
+                sampleJobPair.first.setNeedEstimatedQuantile(true);
+            }
 
             phyToTezOpMap.put(op, curTezOp);
         } catch (Exception e) {
@@ -1511,14 +1507,52 @@ public class TezCompiler extends PhyPlan
                 }
             }
 
-            // This foreach will pick the sort key columns from the RandomSampleLoader output
-            POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
-            oper.plan.addAsLeaf(nfe1);
-
             String numSamples = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100");
             POReservoirSample poSample = new POReservoirSample(new OperatorKey(scope,nig.getNextNodeId(scope)),
                     -1, null, Integer.parseInt(numSamples));
             oper.plan.addAsLeaf(poSample);
+
+            List<PhysicalPlan> sortPlans = sort.getSortPlans();
+            // Set up transform plan to get keys and memory size of input
+            // tuples. It first adds all the plans to get key columns.
+            List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+            transformPlans.addAll(sortPlans);
+
+            // Then it adds a column for memory size
+            POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            prjStar.setResultType(DataType.TUPLE);
+            prjStar.setStar(true);
+
+            List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
+            ufInps.add(prjStar);
+
+            PhysicalPlan ep = new PhysicalPlan();
+            POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, ufInps, new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
+            uf.setResultType(DataType.TUPLE);
+            ep.add(uf);
+            ep.add(prjStar);
+            ep.connect(prjStar, uf);
+
+            transformPlans.add(ep);
+
+            flat1 = new ArrayList<Boolean>();
+            eps1 = new ArrayList<PhysicalPlan>();
+
+            for (int i=0; i<transformPlans.size(); i++) {
+                eps1.add(transformPlans.get(i));
+                if (i<sortPlans.size()) {
+                    flat1.add(false);
+                } else {
+                    flat1.add(true);
+                }
+            }
+
+            // This foreach will pick the sort key columns from the POPoissonSample output
+            POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, eps1, flat1);
+            oper.plan.addAsLeaf(nfe1);
+
             lrSample.setOutputKey(curTezOp.getOperatorKey().toString());
             oper.plan.addAsLeaf(lrSample);
         } else {
@@ -1556,7 +1590,7 @@ public class TezCompiler extends PhyPlan
             }
         }
 
-        return getSamplingAggregationJob(sort, rp, null, FindQuantiles.class.getName(), ctorArgs);
+        return getSamplingAggregationJob(sort, rp, null, FindQuantilesTez.class.getName(), ctorArgs);
     }
 
     /**
@@ -1717,7 +1751,7 @@ public class TezCompiler extends PhyPlan
         oper.setClosed(true);
 
         oper.setRequestedParallelism(1);
-        oper.markSampler();
+        oper.markSampleAggregation();
         return new Pair<TezOperator, Integer>(oper, rp);
     }
 
@@ -1777,6 +1811,7 @@ public class TezCompiler extends PhyPlan
         identityInOutTez.setInputKey(inputOper.getOperatorKey().toString());
         oper1.plan.addAsLeaf(identityInOutTez);
         oper1.setClosed(true);
+        oper1.markSampleBasedPartitioner();
 
         TezOperator oper2 = getTezOp();
         oper2.setGlobalSort(true);
@@ -1897,11 +1932,13 @@ public class TezCompiler extends PhyPlan
             POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL);
 
             TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields);
+            prevOper.markSampler();
 
-            //TODO: Dynamic Reducer estimation or some equivalent of JobControlCompiler.calculateRuntimeReducers
-            // pigContext.defaultParallel to be taken into account
-            int rp = Math.max(op.getRequestedParallelism(), 1);
-
+            int rp = op.getRequestedParallelism();
+            if (rp == -1) {
+                rp = pigContext.defaultParallel;
+            }
+            // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp
             Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
             TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
 
@@ -1914,6 +1951,9 @@ public class TezCompiler extends PhyPlan
             // If prevOper.requestedParallelism changes based on no. of input splits
             // it will reflect for sortOpers[0] so that 1-1 edge will work.
             sortOpers[0].setRequestedParallelismByReference(prevOper);
+            if (rp==-1) {
+                quantJobParallelismPair.first.setNeedEstimatedQuantile(true);
+            }
             sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
 
             /*
@@ -1945,6 +1985,7 @@ public class TezCompiler extends PhyPlan
 //                curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
 //                curTezOp.isUDFComparatorUsed = true;
 //            }
+            quantJobParallelismPair.first.sortOperator = sortOpers[1];
             phyToTezOpMap.put(op, curTezOp);
         }catch(Exception e){
             int errCode = 2034;
@@ -2024,7 +2065,7 @@ public class TezCompiler extends PhyPlan
             // which unions input from the two predecessor vertices
             TezOperator unionTezOp = getTezOp();
             tezPlan.add(unionTezOp);
-            unionTezOp.markUnion();
+            unionTezOp.setUnion();
             unionTezOp.setRequestedParallelism(op.getRequestedParallelism());
             POShuffledValueInputTez unionInput =  new POShuffledValueInputTez(OperatorKey.genOpKey(scope));
             unionTezOp.plan.addAsLeaf(unionInput);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Jun 17 18:15:26 2014
@@ -61,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
@@ -91,6 +92,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -107,6 +109,7 @@ import org.apache.tez.common.TezJobConfi
 import org.apache.tez.common.TezUtils;
 import org.apache.tez.dag.api.DAG;
 import org.apache.tez.dag.api.Edge;
+import org.apache.tez.dag.api.EdgeManagerDescriptor;
 import org.apache.tez.dag.api.EdgeProperty;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.dag.api.GroupInputEdge;
@@ -115,6 +118,8 @@ import org.apache.tez.dag.api.OutputDesc
 import org.apache.tez.dag.api.ProcessorDescriptor;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.VertexGroup;
+import org.apache.tez.dag.api.VertexManagerPluginDescriptor;
+import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager;
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
 import org.apache.tez.mapreduce.common.MRInputSplitDistributor;
@@ -133,6 +138,9 @@ import org.apache.tez.runtime.library.in
 public class TezDagBuilder extends TezOpPlanVisitor {
     private static final Log log = LogFactory.getLog(TezJobControlCompiler.class);
 
+    private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator";
+    private static final String REDUCER_ESTIMATOR_ARG_KEY =  "pig.exec.reducer.estimator.arg";
+
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
@@ -343,6 +351,12 @@ public class TezDagBuilder extends TezOp
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
+        if (to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) {
+            // Use custom edge
+            return new EdgeProperty((EdgeManagerDescriptor)null,
+                    edge.dataSourceType, edge.schedulingType, out, in);
+            }
+
         return new EdgeProperty(edge.dataMovementType, edge.dataSourceType,
                 edge.schedulingType, out, in);
     }
@@ -393,7 +407,11 @@ public class TezDagBuilder extends TezOp
         payloadConf = (JobConf) job.getConfiguration();
 
         if (tezOp.sampleOperator != null) {
-            payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString());
+            payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.sampleOperator.getOperatorKey().toString());
+        }
+
+        if (tezOp.sortOperator != null) {
+            payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
         }
 
         String tmp;
@@ -492,7 +510,7 @@ public class TezDagBuilder extends TezOp
             // TODO Need to fix multiple input key mapping
             TezOperator identityInOutPred = null;
             for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                if (!pred.isSampler()) {
+                if (!pred.isSampleAggregation()) {
                     identityInOutPred = pred;
                     break;
                 }
@@ -544,12 +562,9 @@ public class TezDagBuilder extends TezOp
                     ObjectSerializer.serialize(stores));
         }
 
-        // Take our assembled configuration and create a vertex
-        byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
-        procDesc.setUserPayload(userPayload);
         // Can only set parallelism here if the parallelism isn't derived from
         // splits
-        int parallelism = tezOp.getRequestedParallelism();
+        int parallelism = -1;
         InputSplitInfo inputSplitInfo = null;
         if (loads != null && loads.size() > 0) {
             // Not using MRInputAMSplitGenerator because delegation tokens are
@@ -559,28 +574,85 @@ public class TezDagBuilder extends TezOp
             // splits can be moved to if(loads) block below
             parallelism = inputSplitInfo.getNumTasks();
             tezOp.setRequestedParallelism(parallelism);
-        }
-        if (tezOp.getRequestedParallelism() < 0) {
-            if (pc.defaultParallel > 0) {
-                parallelism = pc.defaultParallel;
-            } else {
-                // Rough estimation till we have Automatic Reducer Parallelism
-                // and Parallelism estimator. To be removed.
-                int sumOfPredParallelism = 0;
-                int predParallelism;
-                for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
-                    predParallelism = pred.getRequestedParallelism();
-                    if (predParallelism < 0) {
-                        predParallelism = Math.max(pc.defaultParallel, 1);
+        } else {
+            int prevParallelism = -1;
+            boolean isOneToOneParallelism = false;
+            for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
+                if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
+                    TezOperator pred = mPlan.getOperator(entry.getKey());
+                    parallelism = pred.getEffectiveParallelism();
+                    if (prevParallelism == -1) {
+                        prevParallelism = parallelism;
+                    } else if (prevParallelism != parallelism) {
+                        throw new IOException("one to one sources parallelism for vertex "
+                                + tezOp.getOperatorKey().toString() + " are not equal");
+                    }
+                    if (pred.getRequestedParallelism()!=-1) {
+                        tezOp.setRequestedParallelism(pred.getRequestedParallelism());
+                    } else {
+                        tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
                     }
-                    sumOfPredParallelism += predParallelism;
+                    isOneToOneParallelism = true;
+                    parallelism = -1;
                 }
-                sumOfPredParallelism = Math.min(sumOfPredParallelism, 20);
-                parallelism = Math.max(sumOfPredParallelism, 1);
             }
-            tezOp.setRequestedParallelism(parallelism);
+            if (!isOneToOneParallelism) {
+                if (tezOp.getRequestedParallelism()!=-1) {
+                    parallelism = tezOp.getRequestedParallelism();
+                } else if (pc.defaultParallel!=-1) {
+                    parallelism = pc.defaultParallel;
+                } else {
+                    parallelism = estimateParallelism(job, mPlan, tezOp);
+                    tezOp.setEstimatedParallelism(parallelism);
+                    if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+                        // Vertex manager will set parallelism
+                        parallelism = -1;
+                    }
+                }
+            }
+        }
+
+        // Once we decide the parallelism of the sampler, propagate to
+        // downstream operators if necessary
+        if (tezOp.isSampler()) {
+            // There could be multiple sampler and share the same sample aggregation job
+            // and partitioner job
+            TezOperator sampleAggregationOper = null;
+            TezOperator sampleBasedPartionerOper = null;
+            TezOperator sortOper = null;
+            for (TezOperator succ : mPlan.getSuccessors(tezOp)) {
+                if (succ.isVertexGroup()) {
+                    succ = mPlan.getSuccessors(succ).get(0);
+                }
+                if (succ.isSampleAggregation()) {
+                    sampleAggregationOper = succ;
+                } else if (succ.isSampleBasedPartitioner()) {
+                    sampleBasedPartionerOper = succ;
+                }
+            }
+            sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
+            
+            if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) {
+                // set estimate parallelism for order by/skewed join to sampler parallelism
+                // that include:
+                // 1. sort operator
+                // 2. constant for sample aggregation oper
+                sortOper.setEstimatedParallelism(parallelism);
+                ParallelConstantVisitor visitor =
+                        new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism);
+                visitor.visit();
+            }
         }
 
+        if (tezOp.isNeedEstimateParallelism()) {
+            payloadConf.setBoolean(PigProcessor.ESTIMATE_PARALLELISM, true);
+            log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
+        }
+
+        // Take our assembled configuration and create a vertex
+        byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
+        procDesc.setUserPayload(userPayload);
+        
         Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
                 isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
 
@@ -656,6 +728,47 @@ public class TezDagBuilder extends TezOp
         if (stores.size() > 0) {
             new PigOutputFormat().checkOutputSpecs(job);
         }
+        
+        // Set the right VertexManagerPlugin
+        if (tezOp.getEstimatedParallelism() != -1) {
+            if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+                // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able
+                // to decrease/increase parallelism of sorting vertex dynamically
+                // based on the numQuantiles calculated by sample aggregation vertex
+                vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor(
+                        PartitionerDefinedVertexManager.class.getName()));
+                log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString());
+            } else {
+                boolean containScatterGather = false;
+                boolean containCustomPartitioner = false;
+                for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+                    if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) {
+                        containScatterGather = true;
+                    }
+                    if (edge.partitionerClass!=null) {
+                        containCustomPartitioner = true;
+                    }
+                }
+                if (containScatterGather && !containCustomPartitioner) {
+                    // Use auto-parallelism feature of ShuffleVertexManager to dynamically
+                    // reduce the parallelism of the vertex
+                    VertexManagerPluginDescriptor vmPluginDescriptor = new VertexManagerPluginDescriptor(
+                            ShuffleVertexManager.class.getName());
+                    Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false);
+                    vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
+                    if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!=
+                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
+                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+                                vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                        InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
+                    }
+                    vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf));
+                    vertex.setVertexManagerPlugin(vmPluginDescriptor);
+                    log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString());
+                }
+            }
+        }
 
         // Reset udfcontext jobconf. It is not supposed to be set in the front end
         UDFContext.getUDFContext().addJobConf(null);
@@ -1034,5 +1147,19 @@ public class TezDagBuilder extends TezOp
         conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
                 comparatorClass);
     }
+    
+    public static int estimateParallelism(Job job, TezOperPlan tezPlan,
+            TezOperator tezOp) throws IOException {
+        Configuration conf = job.getConfiguration();
+
+        TezParallelismEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator()
+                : PigContext.instantiateObjectFromParams(conf,
+                        REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY,
+                        TezParallelismEstimator.class);
+
+        log.info("Using parallel estimator: " + estimator.getClass().getName());
+        int numberOfReducers = estimator.estimateParallelism(tezPlan, tezOp, conf);
+        return numberOfReducers;
+    }
 
 }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,241 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map.Entry;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+
+/**
+ * Estimate the parallelism of the vertex using:
+ * 1. parallelism of the predecessors
+ * 2. bloating factor of the physical plan of the predecessor
+ * 
+ * Since currently it is only possible to reduce the parallelism
+ * estimation is exaggerated and will rely on Tez runtime to
+ * descrease the parallelism
+ */
+public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator {
+    
+    static private int maxTaskCount;
+    static final double DEFAULT_FLATTEN_FACTOR = 10;
+    static final double DEFAULT_FILTER_FACTOR = 0.7;
+    static final double DEFAULT_LIMIT_FACTOR = 0.1;
+
+    @Override
+    public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException {
+        
+        if (tezOper.isVertexGroup()) {
+            return -1;
+        }
+        
+        maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
+                PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
+        
+        // If parallelism is set explicitly, respect it
+        if (tezOper.getRequestedParallelism()!=-1) {
+            return tezOper.getRequestedParallelism();
+        }
+
+        // If we have already estimated parallelism, use that one
+        if (tezOper.getEstimatedParallelism()!=-1) {
+            return tezOper.getEstimatedParallelism();
+        }
+
+        List<TezOperator> preds = plan.getPredecessors(tezOper);
+        if (preds==null) {
+            throw new IOException("Cannot estimate parallelism for source vertex");
+        }
+
+        double estimatedParallelism = 0;
+
+        for (Entry<OperatorKey, TezEdgeDescriptor> entry : tezOper.inEdges.entrySet()) {
+            TezOperator pred = getPredecessorWithKey(plan, tezOper, entry.getKey().toString());
+
+            // Don't include broadcast edge, broadcast edge is used for
+            // replicated join (covered in TezParallelismFactorVisitor.visitFRJoin)
+            // and sample/scalar (does not impact parallelism)
+            if (entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER ||
+                    entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) {
+                double predParallelism = pred.getEffectiveParallelism();
+                if (predParallelism==-1) {
+                    throw new IOException("Cannot estimate parallelism for " + tezOper.getOperatorKey().toString() 
+                            + ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString()
+                            + " is -1");
+                }
+                if (pred.plan!=null) { // pred.plan can be null if it is a VertexGroup
+                    TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
+                    parallelismFactorVisitor.visit();
+                    predParallelism = predParallelism * parallelismFactorVisitor.getFactor();
+                }
+                estimatedParallelism += predParallelism;
+            }
+        }
+
+        int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
+        if (tezOper.isSampler()) {
+            TezOperator sampleAggregationOper = null;
+            TezOperator rangePartionerOper = null;
+            TezOperator sortOper = null;
+            for (TezOperator succ : plan.getSuccessors(tezOper)) {
+                if (succ.isSampleAggregation()) {
+                    sampleAggregationOper = succ;
+                } else if (succ.isSampleBasedPartitioner()) {
+                    rangePartionerOper = succ;
+                }
+            }
+            sortOper = plan.getSuccessors(rangePartionerOper).get(0);
+            
+            if (sortOper.getRequestedParallelism()!=-1) {
+
+                ParallelConstantVisitor visitor =
+                        new ParallelConstantVisitor(sampleAggregationOper.plan, roundedEstimatedParallelism);
+                visitor.visit();
+            }
+        }
+        
+        return Math.min(roundedEstimatedParallelism, maxTaskCount);
+    }
+
+    private static TezOperator getPredecessorWithKey(TezOperPlan plan, TezOperator tezOper, String inputKey) {
+        List<TezOperator> preds = plan.getPredecessors(tezOper);
+        for (TezOperator pred : preds) {
+            if (pred.isVertexGroup()) {
+                for (OperatorKey unionPred : pred.getUnionPredecessors()) {
+                    if (unionPred.toString().equals(inputKey)) {
+                        return plan.getOperator(unionPred);
+                    }
+                }
+                
+            }
+            else if (pred.getOperatorKey().toString().equals(inputKey)) {
+                return pred;
+            }
+        }
+        return null;
+    }
+
+    public static class TezParallelismFactorVisitor extends PhyPlanVisitor {
+        private double factor = 1;
+        private String outputKey;
+        public TezParallelismFactorVisitor(PhysicalPlan plan, String outputKey) {
+            super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
+            this.outputKey = outputKey;
+        }
+
+        @Override
+        public void visitFilter(POFilter fl) throws VisitorException {
+            if (fl.getPlan().size()==1 && fl.getPlan().getRoots().get(0) instanceof ConstantExpression) {
+                ConstantExpression cons = (ConstantExpression)fl.getPlan().getRoots().get(0);
+                if (cons.getValue().equals(Boolean.TRUE)) {
+                    // skip all true condition
+                    return;
+                }
+            }
+            factor *= DEFAULT_FILTER_FACTOR;
+        }
+        
+        @Override
+        public void visitPOForEach(POForEach nfe) throws VisitorException {
+            List<Boolean> flattens = nfe.getToBeFlattened();
+            boolean containFlatten = false;
+            for (boolean flatten : flattens) {
+                if (flatten) {
+                    containFlatten = true;
+                    break;
+                }
+            }
+            if (containFlatten) {
+                factor *= DEFAULT_FLATTEN_FACTOR;
+            }
+        }
+        
+        @Override
+        public void visitLimit(POLimit lim) throws VisitorException {
+            factor = DEFAULT_LIMIT_FACTOR;
+        }
+        
+        public void visitFRJoin(POFRJoin join) throws VisitorException {
+            factor *= DEFAULT_FLATTEN_FACTOR;
+        }
+
+        public void visitMergeJoin(POMergeJoin join) throws VisitorException {
+            factor *= DEFAULT_FLATTEN_FACTOR;
+        }
+
+        @Override
+        public void visitPackage(POPackage pkg) throws VisitorException{
+            // JoinPackager is equivalent to a foreach flatten after shuffle 
+            if (pkg.getPkgr() instanceof JoinPackager) {
+                factor *= DEFAULT_FLATTEN_FACTOR;
+            }
+        }
+        
+        @Override
+        public void visitSplit(POSplit sp) throws VisitorException {
+            // Find the split branch connecting to current operator
+            // accumulating the bloating factor in this branch
+            PhysicalPlan plan = getSplitBranch(sp, outputKey);
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
+        
+        private static PhysicalPlan getSplitBranch(POSplit split, String outputKey) throws VisitorException {
+            List<PhysicalPlan> plans = split.getPlans();
+            for (PhysicalPlan plan : plans) {
+                LinkedList<POLocalRearrangeTez> lrs = PlanHelper.getPhysicalOperators(plan, POLocalRearrangeTez.class);
+                if (!lrs.isEmpty()) {
+                    return plan;
+                }
+                LinkedList<POValueOutputTez> vos = PlanHelper.getPhysicalOperators(plan, POValueOutputTez.class);
+                if (!vos.isEmpty()) {
+                    return plan;
+                }
+            }
+            return null;
+        }
+        
+        public double getFactor() {
+            return factor;
+        }
+        
+    }
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Jun 17 18:15:26 2014
@@ -62,6 +62,8 @@ public class TezOperator extends Operato
     // even when parallelism of source vertex changes.
     // Can change to int and set to -1 if TEZ-800 gets fixed.
     private AtomicInteger requestedParallelism = new AtomicInteger(-1);
+    
+    private int estimatedParallelism = -1;
 
     // TODO: When constructing Tez vertex, we have to specify how much resource
     // the vertex will need. So we need to estimate these values while compiling
@@ -89,6 +91,9 @@ public class TezOperator extends Operato
     //Indicates if this job is an order by job
     boolean globalSort = false;
 
+    //Indicate if this job is a union job 
+    boolean union = false;
+
     //The sort order of the columns;
     //asc is true and desc is false
     boolean[] sortOrder;
@@ -104,21 +109,31 @@ public class TezOperator extends Operato
     // are NOT combinable for correctness.
     private boolean combineSmallSplits = true;
 
-    // If not null, need to collect sample sent from predecessor
+    // Used by partition vertex, if not null, need to collect sample sent from predecessor
     TezOperator sampleOperator = null;
 
+    // Used by sample vertex, send parallelism event to orderOperator
+    TezOperator sortOperator = null;
+
+    // If the flag is set, FindQuantilesTez/PartitionSkewedKeysTez will use aggregated sample
+    // to calculate the number of parallelism at runtime, instead of the numQuantiles/totalReducers_
+    // parameter set statically
+    private boolean needEstimateParallelism = false;
+
     // If true, we will use secondary key sort in the job
     private boolean useSecondaryKey = false;
 
     // Types of blocking operators. For now, we only support the following ones.
     private static enum OPER_FEATURE {
         NONE,
-        // Indicate if this job is a union job
-        UNION,
         // Indicate if this job is a merge indexer
         INDEXER,
         // Indicate if this job is a sampling job
         SAMPLER,
+        // Indicate if this job is a sample aggregation job
+        SAMPLE_AGGREGATOR,
+        // Indicate if this job is a sample based partition job (order by/skewed join)
+        SAMPLE_BASED_PARTITIONER,
         // Indicate if this job is a group by job
         GROUPBY,
         // Indicate if this job is a cogroup job
@@ -175,6 +190,18 @@ public class TezOperator extends Operato
         this.requestedParallelism = oper.requestedParallelism;
     }
 
+    public int getEstimatedParallelism() {
+        return estimatedParallelism;
+    }
+
+    public void setEstimatedParallelism(int estimatedParallelism) {
+        this.estimatedParallelism = estimatedParallelism;
+    }
+
+    public int getEffectiveParallelism() {
+        return getRequestedParallelism()!=-1? getRequestedParallelism() : getEstimatedParallelism();
+    }
+
     public OperatorKey getSplitParent() {
         return splitParent;
     }
@@ -224,11 +251,11 @@ public class TezOperator extends Operato
     }
 
     public boolean isUnion() {
-        return (feature == OPER_FEATURE.UNION);
+        return union;
     }
 
-    public void markUnion() {
-        feature = OPER_FEATURE.UNION;
+    public void setUnion() {
+        union = true;
     }
 
     public boolean isIndexer() {
@@ -247,6 +274,30 @@ public class TezOperator extends Operato
         feature = OPER_FEATURE.SAMPLER;
     }
 
+    public boolean isSampleAggregation() {
+        return (feature == OPER_FEATURE.SAMPLE_AGGREGATOR);
+    }
+
+    public void markSampleAggregation() {
+        feature = OPER_FEATURE.SAMPLE_AGGREGATOR;
+    }
+    
+    public boolean isSampleBasedPartitioner() {
+        return (feature == OPER_FEATURE.SAMPLE_BASED_PARTITIONER);
+    }
+
+    public void markSampleBasedPartitioner() {
+        feature = OPER_FEATURE.SAMPLE_BASED_PARTITIONER;
+    }
+
+    public void setNeedEstimatedQuantile(boolean needEstimateParallelism) {
+        this.needEstimateParallelism = needEstimateParallelism;
+    }
+
+    public boolean isNeedEstimateParallelism() {
+        return needEstimateParallelism;
+    }
+
     public boolean isUseSecondaryKey() {
         return useSecondaryKey;
     }

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+
+public interface TezParallelismEstimator {
+    public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException;
+}

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Tue Jun 17 18:15:26 2014
@@ -22,6 +22,7 @@ import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.data.DataBag;
@@ -33,30 +34,25 @@ import org.apache.pig.impl.io.PigNullabl
 public class WeightedRangePartitionerTez extends WeightedRangePartitioner {
     private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class);
 
-    @SuppressWarnings("unchecked")
+    private Integer estimatedNumPartitions;
+
     @Override
-    public void init() {
-        ObjectCache cache = ObjectCache.getInstance();
-        String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached";
-        String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles";
-        String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts";
-        if (cache.retrieve(isCachedKey) == Boolean.TRUE) {
-            quantiles = (PigNullableWritable[]) cache
-                    .retrieve(quantilesCacheKey);
-            weightedParts = (Map<PigNullableWritable, DiscreteProbabilitySampleGenerator>) cache
-                    .retrieve(weightedPartsCacheKey);
-            LOG.info("Found quantiles and weightedParts in Tez cache. cachekey="
-                    + quantilesCacheKey + "," + weightedPartsCacheKey);
-            inited = true;
-            return;
+    public int getPartition(PigNullableWritable key, Writable value,
+            int numPartitions){
+        if (estimatedNumPartitions!=null) {
+            numPartitions = estimatedNumPartitions;
         }
+        return super.getPartition(key, value, numPartitions);
+    }
 
+    @Override
+    public void init() {
         Map<String, Object> quantileMap = null;
         if (PigProcessor.sampleMap != null) {
             // We've collected sampleMap in PigProcessor
             quantileMap = PigProcessor.sampleMap;
         } else {
-            LOG.info("Quantiles map is empty");
+            LOG.warn("Quantiles map is empty");
             inited = true;
             return;
         }
@@ -65,6 +61,7 @@ public class WeightedRangePartitionerTez
         try {
             DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST);
             InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS);
+            estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM);
             convertToArray(quantilesList);
             for (Entry<Object, Object> ent : weightedPartsData.entrySet()) {
                 Tuple key = (Tuple) ent.getKey(); // sample item which repeats
@@ -77,9 +74,6 @@ public class WeightedRangePartitionerTez
         }
 
         LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start));
-        cache.cache(isCachedKey, Boolean.TRUE);
-        cache.cache(quantilesCacheKey, quantiles);
-        cache.cache(weightedPartsCacheKey, weightedParts);
         inited = true;
     }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Tue Jun 17 18:15:26 2014
@@ -273,6 +273,9 @@ public class UnionOptimizer extends TezO
         pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
         pred.UDFs.addAll(unionOp.UDFs);
         pred.scalars.addAll(unionOp.scalars);
+        if (unionOp.isSampler()) {
+            pred.markSampler();
+        }
     }
 
     public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException {

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1603243&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Tue Jun 17 18:15:26 2014
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.util;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class ParallelConstantVisitor extends PhyPlanVisitor {
+
+    private int rp;
+
+    private boolean replaced = false;
+
+    public ParallelConstantVisitor(PhysicalPlan plan, int rp) {
+        super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(
+                plan));
+        this.rp = rp;
+    }
+
+    @Override
+    public void visitConstant(ConstantExpression cnst) throws VisitorException {
+        if (cnst.getRequestedParallelism() == -1) {
+            Object obj = cnst.getValue();
+            if (obj instanceof Integer) {
+                if (replaced) {
+                    // sample job should have only one ConstantExpression
+                    throw new VisitorException("Invalid reduce plan: more " +
+                            "than one ConstantExpression found in sampling job");
+                }
+                cnst.setValue(rp);
+                cnst.setRequestedParallelism(rp);
+                replaced = true;
+            }
+        }
+    }
+}

Modified: pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Tue Jun 17 18:15:26 2014
@@ -53,6 +53,9 @@ public class FindQuantiles extends EvalF
     enum State { ALL_ASC, ALL_DESC, MIXED };
     State mState;
     
+    protected Integer numQuantiles = null;
+    protected DataBag samples = null;
+    
     private class SortComparator implements Comparator<Tuple> {
         @Override
         @SuppressWarnings("unchecked")
@@ -155,17 +158,19 @@ public class FindQuantiles extends EvalF
         Map<String, Object> output = new HashMap<String, Object>();
         if(in==null || in.size()==0)
             return null;
-        Integer numQuantiles = null;
-        DataBag samples = null;
+        
         ArrayList<Tuple> quantilesList = new ArrayList<Tuple>();
         InternalMap weightedParts = new InternalMap();
         // the sample file has a tuple as under:
         // (numQuantiles, bag of samples) 
         // numQuantiles here is the reduce parallelism
         try{
-            numQuantiles = (Integer)in.get(0);
-            samples = (DataBag)in.get(1);
-            
+            if (numQuantiles == null) {
+                numQuantiles = (Integer)in.get(0);
+            }
+            if (samples == null) {
+                samples = (DataBag)in.get(1);
+            }
             long numSamples = samples.size();
             long toSkip = numSamples / numQuantiles;
             if(toSkip == 0) {

Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1603243&r1=1603242&r2=1603243&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Tue Jun 17 18:15:26 2014
@@ -75,14 +75,14 @@ public class PartitionSkewedKeys extends
 
     private int currentIndex_;
 
-    private int totalReducers_;
-
     private long totalMemory_;
 
     private long totalSampleCount_;
 
     private double heapPercentage_;
 
+    protected int totalReducers_;
+
     // specify how many tuple a reducer can hold for a key
     // this is for testing purpose. If not specified, then
     // it is calculated based on memory size and size of tuple
@@ -135,7 +135,9 @@ public class PartitionSkewedKeys extends
         long totalInputRows = 0;
 
         try {
-            totalReducers_ = (Integer) in.get(0);
+            if (totalReducers_ == -1) {
+                totalReducers_ = (Integer) in.get(0);
+            }
             DataBag samples = (DataBag) in.get(1);
 
             totalSampleCount_ = samples.size();
@@ -271,7 +273,7 @@ public class PartitionSkewedKeys extends
     }
 
     // the last field of the tuple is a tuple for memory size and disk size
-    private long getMemorySize(Tuple t) {
+    protected long getMemorySize(Tuple t) {
         int s = t.size();
         try {
             return (Long) t.get(s - 2);



Mime
View raw message