Return-Path: X-Original-To: apmail-pig-commits-archive@www.apache.org Delivered-To: apmail-pig-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0715B11FFF for ; Tue, 17 Jun 2014 18:15:56 +0000 (UTC) Received: (qmail 46515 invoked by uid 500); 17 Jun 2014 18:15:55 -0000 Delivered-To: apmail-pig-commits-archive@pig.apache.org Received: (qmail 46485 invoked by uid 500); 17 Jun 2014 18:15:55 -0000 Mailing-List: contact commits-help@pig.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@pig.apache.org Delivered-To: mailing list commits@pig.apache.org Received: (qmail 46476 invoked by uid 99); 17 Jun 2014 18:15:55 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 18:15:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 17 Jun 2014 18:15:48 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B1D7A238899C; Tue, 17 Jun 2014 18:15:28 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1603243 [1/2] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/... Date: Tue, 17 Jun 2014 18:15:27 -0000 To: commits@pig.apache.org From: daijy@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140617181528.B1D7A238899C@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: daijy Date: Tue Jun 17 18:15:26 2014 New Revision: 1603243 URL: http://svn.apache.org/r1603243 Log: PIG-3846: Implement automatic reducer parallelism Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/FindQuantilesTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java pig/trunk/test/org/apache/pig/tez/TestTezAutoParallelism.java Modified: pig/trunk/CHANGES.txt pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC17.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6-OPTOFF.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld pig/trunk/test/org/apache/pig/tez/TestTezJobControlCompiler.java pig/trunk/test/tez-tests Modified: pig/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/CHANGES.txt (original) +++ pig/trunk/CHANGES.txt Tue Jun 17 18:15:26 2014 @@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES IMPROVEMENTS +PIG-3846: Implement automatic reducer parallelism (daijy) + PIG-3939: SPRINTF function to format strings using a printf-style template (mrflip via cheolsoo) PIG-3970: Merge Tez branch into trunk (daijy) Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPoissonSample.java Tue Jun 17 18:15:26 2014 @@ -24,6 +24,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; @@ -31,12 +32,6 @@ public class POPoissonSample extends Phy private static final long serialVersionUID = 1L; - // marker string to mark the last sample row, which has total number or rows - // seen by this map instance. this string will be in the 2nd last column of - // the last sample row it is used by GetMemNumRows. - public static final String NUMROWS_TUPLE_MARKER = - "\u4956\u3838_pig_inTeRnal-spEcial_roW_num_tuple3kt579CFLehkblah"; - private static final TupleFactory tf = TupleFactory.getInstance(); private static Result eop = new Result(POStatus.STATUS_EOP, null); @@ -226,7 +221,7 @@ public class POPoissonSample extends Phy } } - t.set(sz, NUMROWS_TUPLE_MARKER); + t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER); t.set(sz + 1, rowNum); numRowSplTupleReturned = true; return new Result(POStatus.STATUS_OK, t); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Tue Jun 17 18:15:26 2014 @@ -26,11 +26,15 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result; import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; import org.apache.pig.data.Tuple; +import org.apache.pig.data.TupleFactory; +import org.apache.pig.impl.builtin.PoissonSampleLoader; import org.apache.pig.impl.plan.OperatorKey; import org.apache.pig.impl.plan.VisitorException; public class POReservoirSample extends PhysicalOperator { + private static final TupleFactory tf = TupleFactory.getInstance(); + private static final long serialVersionUID = 1L; // number of samples to be sampled @@ -45,6 +49,9 @@ public class POReservoirSample extends P //array to store the result private transient Result[] samples = null; + // last sample result + private Result lastSample = null; + public POReservoirSample(OperatorKey k) { this(k, -1, null); } @@ -101,7 +108,6 @@ public class POReservoirSample extends P } } - int rowNum = rowProcessed; Random randGen = new Random(); while (true) { @@ -114,11 +120,11 @@ public class POReservoirSample extends P } // collect samples until input is exhausted - int rand = randGen.nextInt(rowNum); + int rand = randGen.nextInt(rowProcessed); if (rand < numSamples) { samples[rand] = res; } - rowNum++; + rowProcessed++; } if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) { @@ -129,6 +135,26 @@ public class POReservoirSample extends P } private Result getSample() throws ExecException { + if (lastSample == null) { + lastSample = retrieveSample(); + } + if (lastSample.returnStatus==POStatus.STATUS_EOP) { + return lastSample; + } + + Result currentSample = retrieveSample(); + // If this is the last sample, tag with number of rows + if (currentSample.returnStatus == POStatus.STATUS_EOP) { + lastSample = createNumRowTuple((Tuple)lastSample.result); + } else if (currentSample.returnStatus == POStatus.STATUS_NULL) { + return currentSample; + } + Result result = lastSample; + lastSample = currentSample; + return result; + } + + private Result retrieveSample() throws ExecException { if(nextSampleIdx < samples.length){ if (illustrator != null) { illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0); @@ -140,7 +166,8 @@ public class POReservoirSample extends P return res; } else{ - Result res = new Result(POStatus.STATUS_EOP, null); + Result res; + res = new Result(POStatus.STATUS_EOP, null); return res; } } @@ -159,4 +186,24 @@ public class POReservoirSample extends P public String name() { return getAliasString() + "ReservoirSample - " + mKey.toString(); } + + /** + * @param sample - sample tuple + * @return - Tuple appended with special marker string column, num-rows column + * @throws ExecException + */ + private Result createNumRowTuple(Tuple sample) throws ExecException { + int sz = (sample == null) ? 0 : sample.size(); + Tuple t = tf.newTuple(sz + 2); + + if (sample != null) { + for (int i=0; i exec(Tuple in) throws IOException { + // In Tez, we also need to estimate the quantiles with regard to sample + // and the special tuple containing the total number of records + int estimatedNumReducers = -1; + boolean estimate_sample_quantile = PigMapReduce.sJobConfInternal.get().getBoolean + (PigProcessor.ESTIMATE_PARALLELISM, false); + DataBag mySamples = (DataBag)in.get(1); + this.samples = BagFactory.getInstance().newDefaultBag(); + Iterator iter = mySamples.iterator(); + Tuple t; + //total input rows for the order by + long totalInputRows = 0; + long sampleSize = 0; + while (iter.hasNext()) { + t = iter.next(); + if (t.get(t.size() - 1) != null) { + totalInputRows += (Long)t.get(t.size() - 1); + } + if (t.get(t.size() - 2) != null) { + sampleSize += getMemorySize(t); + } + if (t.size() > 2) { + Tuple newTuple = tf.newTuple(t.size()-2); + for (int i=0;i result = super.exec(in); + if (estimate_sample_quantile) { + result.put(PigProcessor.ESTIMATED_NUM_PARALLELISM, numQuantiles); + } + PigProcessor.sampleMap = result; + return result; + } + + // the last field of the tuple is a tuple for memory size and disk size + protected long getMemorySize(Tuple t) { + int s = t.size(); + try { + return (Long) t.get(s - 2); + } catch (ExecException e) { + throw new RuntimeException( + "Unable to retrive the size field from tuple.", e); + } + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Tue Jun 17 18:15:26 2014 @@ -172,5 +172,8 @@ public class MultiQueryOptimizerTez exte parentOper.outEdges.put(entry.getKey(), entry.getValue()); } } + if (subPlanOper.isSampler()) { + parentOper.markSampler(); + } } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java?rev=1603243&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionSkewedKeysTez.java Tue Jun 17 18:15:26 2014 @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce; +import org.apache.pig.data.DataBag; +import org.apache.pig.data.Tuple; +import org.apache.pig.impl.builtin.PartitionSkewedKeys; + +public class PartitionSkewedKeysTez extends PartitionSkewedKeys { + private static final Log LOG = LogFactory.getLog(PartitionSkewedKeysTez.class); + + public PartitionSkewedKeysTez() { + super(); + } + + public PartitionSkewedKeysTez(String[] args) { + super(args); + } + + @Override + public Map exec(Tuple in) throws IOException { + if (in == null || in.size() == 0) { + return null; + } + + int estimatedNumReducers = -1; + boolean estimate_sample_quantile = PigMapReduce.sJobConfInternal.get().getBoolean + (PigProcessor.ESTIMATE_PARALLELISM, false); + if (estimate_sample_quantile) { + int specifiedNumReducer = (Integer) in.get(0); + DataBag samples = (DataBag) in.get(1); + + long totalSampleSize = 0; + long totalInputRows = 0; + Iterator iter = samples.iterator(); + while (iter.hasNext()) { + Tuple t = iter.next(); + totalInputRows += (Long)t.get(t.size() - 1); + totalSampleSize += getMemorySize(t); + } + long totalSampleCount_ = samples.size(); + + long estimatedInputSize = (long)((double)totalSampleSize/totalSampleCount_ * totalInputRows); + + long bytesPerTask = PigMapReduce.sJobConfInternal.get().getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER); + + estimatedNumReducers = (int)Math.ceil((double)estimatedInputSize/bytesPerTask); + estimatedNumReducers = Math.min(estimatedNumReducers, InputSizeReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); + + LOG.info("Estimating parallelism: estimatedInputSize is " + estimatedInputSize + ". bytesPerTask is " + bytesPerTask + ". estimatedNumReducers is " + estimatedNumReducers + "."); + + this.totalReducers_ = estimatedNumReducers; + LOG.info("Use estimated reducer instead:" + estimatedNumReducers + ", orig: " + specifiedNumReducer); + } + Map result = super.exec(in); + if (estimate_sample_quantile) { + result.put(PigProcessor.ESTIMATED_NUM_PARALLELISM, totalReducers_); + } + PigProcessor.sampleMap = result; + return result; + } +} Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java?rev=1603243&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PartitionerDefinedVertexManager.java Tue Jun 17 18:15:26 2014 @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.tez.dag.api.EdgeManagerDescriptor; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.VertexManagerPlugin; +import org.apache.tez.dag.api.VertexManagerPluginContext; +import org.apache.tez.dag.api.VertexManagerPluginContext.TaskWithLocationHint; +import org.apache.tez.dag.app.dag.impl.ScatterGatherEdgeManager; +import org.apache.tez.runtime.api.Event; +import org.apache.tez.runtime.api.events.VertexManagerEvent; + +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; + +/** + * VertexManagerPlugin used by sorting job of order by and skewed join. + * What is does is to set parallelism of the sorting vertex + * according to numParallelism specified by the predecessor vertex. + * The complex part is the PigOrderByEdgeManager, which specify how + * partition in the previous setting routes to the new vertex setting + */ +public class PartitionerDefinedVertexManager implements VertexManagerPlugin { + + private VertexManagerPluginContext context; + private boolean isParallelismSet = false; + + private static final Log LOG = LogFactory.getLog(PartitionerDefinedVertexManager.class); + + @Override + public void initialize(VertexManagerPluginContext context) { + this.context = context; + } + + @Override + public void onRootVertexInitialized(String inputName, InputDescriptor inputDescriptor, + List events) { + // Nothing to do + } + + @Override + public void onSourceTaskCompleted(String srcVertexName, Integer srcTaskId) { + // Nothing to do + } + + @Override + public void onVertexManagerEventReceived(VertexManagerEvent vmEvent) { + // There could be multiple partition vertex sending VertexManagerEvent + // Only need to setVertexParallelism once + if (isParallelismSet) { + return; + } + isParallelismSet = true; + int dynamicParallelism = -1; + // Need to distinguish from VertexManagerEventPayloadProto emitted by OnFileSortedOutput + if (vmEvent.getUserPayload().length==4) { + dynamicParallelism = Ints.fromByteArray(vmEvent.getUserPayload()); + } else { + return; + } + int currentParallelism = context.getVertexNumTasks(context.getVertexName()); + if (dynamicParallelism != -1) { + if (dynamicParallelism!=currentParallelism) { + LOG.info("Pig Partitioner Defined Vertex Manager: reset parallelism to " + dynamicParallelism + + " from " + currentParallelism); + Map edgeManagers = + new HashMap(); + for(String vertex : context.getInputVertexEdgeProperties().keySet()) { + EdgeManagerDescriptor edgeManagerDescriptor = + new EdgeManagerDescriptor(ScatterGatherEdgeManager.class.getName()); + edgeManagers.put(vertex, edgeManagerDescriptor); + } + context.setVertexParallelism(dynamicParallelism, null, edgeManagers); + } + List tasksToStart = Lists.newArrayListWithCapacity(dynamicParallelism); + for (int i=0; i> completions) { + // Nothing to do + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Tue Jun 17 18:15:26 2014 @@ -57,14 +57,31 @@ import org.apache.tez.runtime.api.Logica import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.TezProcessorContext; +import org.apache.tez.runtime.api.events.VertexManagerEvent; import org.apache.tez.runtime.library.api.KeyValueReader; +import com.google.common.collect.Lists; +import com.google.common.primitives.Ints; + public class PigProcessor implements LogicalIOProcessor { private static final Log LOG = LogFactory.getLog(PigProcessor.class); // Names of the properties that store serialized physical plans public static final String PLAN = "pig.exec.tez.plan"; public static final String COMBINE_PLAN = "pig.exec.tez.combine.plan"; + // This flag is used in both order by and skewed job. This is a configuration + // entry to instruct sample job to dynamically estimate parallelism + public static final String ESTIMATE_PARALLELISM = "pig.exec.estimate.parallelism"; + // This flag is used in both order by and skewed job. + // This is the key in sampleMap of estimated parallelism + public static final String ESTIMATED_NUM_PARALLELISM = "pig.exec.estimated.num.parallelism"; + + // The operator key for sample vertex, used by partition vertex to collect sample + public static final String SAMPLE_VERTEX = "pig.sampleVertex"; + + // The operator key for sort vertex, used by sample vertex to send parallelism event + // if Pig need to estimate parallelism of sort vertex + public static final String SORT_VERTEX = "pig.sortVertex"; private PhysicalPlan execPlan; @@ -74,6 +91,8 @@ public class PigProcessor implements Log private Configuration conf; private PigHadoopLogger pigHadoopLogger; + + private TezProcessorContext processorContext; public static String sampleVertex; public static Map sampleMap; @@ -82,6 +101,8 @@ public class PigProcessor implements Log @Override public void initialize(TezProcessorContext processorContext) throws Exception { + this.processorContext = processorContext; + // Reset any static variables to avoid conflict in container-reuse. sampleVertex = null; sampleMap = null; @@ -191,6 +212,22 @@ public class PigProcessor implements Log fileOutput.commit(); } } + + // send event containing parallelism to sorting job of order by / skewed join + if (conf.getBoolean(ESTIMATE_PARALLELISM, false)) { + int parallelism = 1; + if (sampleMap!=null && sampleMap.containsKey(ESTIMATED_NUM_PARALLELISM)) { + parallelism = (Integer)sampleMap.get(ESTIMATED_NUM_PARALLELISM); + } + String sortingVertex = conf.get(SORT_VERTEX); + // Should contain only 1 output for sampleAggregation job + LOG.info("Sending numParallelism " + parallelism + " to " + sortingVertex); + VertexManagerEvent vmEvent = new VertexManagerEvent( + sortingVertex, Ints.toByteArray(parallelism)); + List events = Lists.newArrayListWithCapacity(1); + events.add(vmEvent); + processorContext.sendEvents(events); + } } catch (Exception e) { abortOutput(); LOG.error("Encountered exception while processing: ", e); @@ -304,8 +341,9 @@ public class PigProcessor implements Log @SuppressWarnings({ "unchecked" }) private void collectSample(String sampleVertex, LogicalInput logicalInput) throws Exception { - Boolean cached = (Boolean) ObjectCache.getInstance().retrieve("cached.sample." + sampleVertex); - if (cached == Boolean.TRUE) { + String quantileMapCacheKey = "sample-" + sampleVertex + ".cached"; + sampleMap = (Map)ObjectCache.getInstance().retrieve(quantileMapCacheKey); + if (sampleMap != null) { return; } LOG.info("Starting fetch of input " + logicalInput + " from vertex " + sampleVertex); @@ -317,6 +355,9 @@ public class PigProcessor implements Log // Sample is not empty Tuple t = (Tuple) val; sampleMap = (Map) t.get(0); + ObjectCache.getInstance().cache(quantileMapCacheKey, sampleMap); + } else { + LOG.warn("Cannot fetch sample from " + sampleVertex); } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Tue Jun 17 18:15:26 2014 @@ -31,23 +31,9 @@ import org.apache.pig.impl.util.Pair; public class SkewedPartitionerTez extends SkewedPartitioner { private static final Log LOG = LogFactory.getLog(SkewedPartitionerTez.class); - @SuppressWarnings("unchecked") @Override protected void init() { - ObjectCache cache = ObjectCache.getInstance(); - String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached"; - String totalReducersCacheKey = "sample-" + PigProcessor.sampleVertex + ".totalReducers"; - String reducerMapCacheKey = "sample-" + PigProcessor.sampleVertex + ".reducerMap"; - if (cache.retrieve(isCachedKey) == Boolean.TRUE) { - totalReducers = (Integer) cache.retrieve(totalReducersCacheKey); - reducerMap = (Map>) cache.retrieve(reducerMapCacheKey); - LOG.info("Found totalReducers and reducerMap in Tez cache. cachekey=" - + totalReducersCacheKey + "," + reducerMapCacheKey); - inited = true; - return; - } - Map distMap = null; if (PigProcessor.sampleMap != null) { // We've collected sampleMap in PigProcessor @@ -98,9 +84,6 @@ public class SkewedPartitionerTez extend throw new RuntimeException(e); } LOG.info("Initialized SkewedPartitionerTez. Time taken: " + (System.currentTimeMillis() - start)); - cache.cache(isCachedKey, Boolean.TRUE); - cache.cache(totalReducersCacheKey, totalReducers); - cache.cache(reducerMapCacheKey, reducerMap); inited = true; } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Tue Jun 17 18:15:26 2014 @@ -88,7 +88,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.builtin.DefaultIndexableLoader; -import org.apache.pig.impl.builtin.FindQuantiles; import org.apache.pig.impl.builtin.GetMemNumRows; import org.apache.pig.impl.builtin.PartitionSkewedKeys; import org.apache.pig.impl.io.FileLocalizer; @@ -1183,6 +1182,7 @@ public class TezCompiler extends PhyPlan TezOperator prevOp = compiledInputs[0]; prevOp.plan.addAsLeaf(lrTez); prevOp.plan.addAsLeaf(poSample); + prevOp.markSampler(); MultiMap joinPlans = op.getJoinPlans(); List l = plan.getPredecessors(op); @@ -1230,23 +1230,21 @@ public class TezCompiler extends PhyPlan prevOp.plan.addAsLeaf(lrTezSample); prevOp.setClosed(true); - POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), + int rp = op.getRequestedParallelism(); + if (rp == -1) { + rp = pigContext.defaultParallel; + } + + POSort sort = new POSort(op.getOperatorKey(), rp, null, groups, ascCol, null); String per = pigProperties.getProperty("pig.skewedjoin.reduce.memusage", String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE)); String mc = pigProperties.getProperty("pig.skewedjoin.reduce.maxtuple", "0"); - int rp = Math.max(op.getRequestedParallelism(), 1); Pair sampleJobPair = getSamplingAggregationJob(sort, rp, null, - PartitionSkewedKeys.class.getName(), new String[]{per, mc}); + PartitionSkewedKeysTez.class.getName(), new String[]{per, mc}); rp = sampleJobPair.second; - // Set parallelism of SkewedJoin as the value calculated by sampling - // job if "parallel" is specified in join statement, "rp" is equal - // to that number if not specified, use the value that sampling - // process calculated based on default. - op.setRequestedParallelism(rp); - TezOperator[] joinJobs = new TezOperator[] {null, compiledInputs[1], null}; TezOperator[] joinInputs = new TezOperator[] {compiledInputs[0], compiledInputs[1]}; TezOperator[] rearrangeOutputs = new TezOperator[2]; @@ -1259,12 +1257,6 @@ public class TezCompiler extends PhyPlan // It just partitions the data from first vertex based on the quantiles from sample vertex. joinJobs[0] = curTezOp; - // Run POLocalRearrange for first join table. Note we set the - // parallelism of POLocalRearrange to that of the load vertex. So - // its parallelism will be determined by the size of skewed table. - //TODO: Check if this really works as load vertex parallelism - // is determined during vertex construction. - lrTez.setRequestedParallelism(prevOp.getRequestedParallelism()); try { lrTez.setIndex(0); } catch (ExecException e) { @@ -1288,6 +1280,7 @@ public class TezCompiler extends PhyPlan identityInOutTez.setInputKey(prevOp.getOperatorKey().toString()); joinJobs[0].plan.addAsLeaf(identityInOutTez); joinJobs[0].setClosed(true); + joinJobs[0].markSampleBasedPartitioner(); rearrangeOutputs[0] = joinJobs[0]; compiledInputs = new TezOperator[] {joinInputs[1]}; @@ -1323,9 +1316,7 @@ public class TezCompiler extends PhyPlan gr.setResultType(DataType.TUPLE); gr.visit(this); joinJobs[2] = curTezOp; - if (gr.getRequestedParallelism() > joinJobs[2].getRequestedParallelism()) { - joinJobs[2].setRequestedParallelism(gr.getRequestedParallelism()); - } + joinJobs[2].setRequestedParallelism(rp); compiledInputs = new TezOperator[] {joinJobs[2]}; @@ -1396,6 +1387,11 @@ public class TezCompiler extends PhyPlan } joinJobs[2].setSkewedJoin(true); + sampleJobPair.first.sortOperator = joinJobs[2]; + + if (rp == -1) { + sampleJobPair.first.setNeedEstimatedQuantile(true); + } phyToTezOpMap.put(op, curTezOp); } catch (Exception e) { @@ -1511,14 +1507,52 @@ public class TezCompiler extends PhyPlan } } - // This foreach will pick the sort key columns from the RandomSampleLoader output - POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1); - oper.plan.addAsLeaf(nfe1); - String numSamples = pigContext.getProperties().getProperty(PigConfiguration.PIG_RANDOM_SAMPLER_SAMPLE_SIZE, "100"); POReservoirSample poSample = new POReservoirSample(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, null, Integer.parseInt(numSamples)); oper.plan.addAsLeaf(poSample); + + List sortPlans = sort.getSortPlans(); + // Set up transform plan to get keys and memory size of input + // tuples. It first adds all the plans to get key columns. + List transformPlans = new ArrayList(); + transformPlans.addAll(sortPlans); + + // Then it adds a column for memory size + POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope))); + prjStar.setResultType(DataType.TUPLE); + prjStar.setStar(true); + + List ufInps = new ArrayList(); + ufInps.add(prjStar); + + PhysicalPlan ep = new PhysicalPlan(); + POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), + -1, ufInps, new FuncSpec(GetMemNumRows.class.getName(), (String[])null)); + uf.setResultType(DataType.TUPLE); + ep.add(uf); + ep.add(prjStar); + ep.connect(prjStar, uf); + + transformPlans.add(ep); + + flat1 = new ArrayList(); + eps1 = new ArrayList(); + + for (int i=0; i(oper, rp); } @@ -1777,6 +1811,7 @@ public class TezCompiler extends PhyPlan identityInOutTez.setInputKey(inputOper.getOperatorKey().toString()); oper1.plan.addAsLeaf(identityInOutTez); oper1.setClosed(true); + oper1.markSampleBasedPartitioner(); TezOperator oper2 = getTezOp(); oper2.setGlobalSort(true); @@ -1897,11 +1932,13 @@ public class TezCompiler extends PhyPlan POLocalRearrangeTez lrSample = localRearrangeFactory.create(LocalRearrangeType.NULL); TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType, fields); + prevOper.markSampler(); - //TODO: Dynamic Reducer estimation or some equivalent of JobControlCompiler.calculateRuntimeReducers - // pigContext.defaultParallel to be taken into account - int rp = Math.max(op.getRequestedParallelism(), 1); - + int rp = op.getRequestedParallelism(); + if (rp == -1) { + rp = pigContext.defaultParallel; + } + // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp Pair quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp); TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields); @@ -1914,6 +1951,9 @@ public class TezCompiler extends PhyPlan // If prevOper.requestedParallelism changes based on no. of input splits // it will reflect for sortOpers[0] so that 1-1 edge will work. sortOpers[0].setRequestedParallelismByReference(prevOper); + if (rp==-1) { + quantJobParallelismPair.first.setNeedEstimatedQuantile(true); + } sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second); /* @@ -1945,6 +1985,7 @@ public class TezCompiler extends PhyPlan // curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString()); // curTezOp.isUDFComparatorUsed = true; // } + quantJobParallelismPair.first.sortOperator = sortOpers[1]; phyToTezOpMap.put(op, curTezOp); }catch(Exception e){ int errCode = 2034; @@ -2024,7 +2065,7 @@ public class TezCompiler extends PhyPlan // which unions input from the two predecessor vertices TezOperator unionTezOp = getTezOp(); tezPlan.add(unionTezOp); - unionTezOp.markUnion(); + unionTezOp.setUnion(); unionTezOp.setRequestedParallelism(op.getRequestedParallelism()); POShuffledValueInputTez unionInput = new POShuffledValueInputTez(OperatorKey.genOpKey(scope)); unionTezOp.plan.addAsLeaf(unionInput); Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Tue Jun 17 18:15:26 2014 @@ -61,6 +61,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator; @@ -91,6 +92,7 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer; import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper; import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper; +import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor; import org.apache.pig.data.DataType; import org.apache.pig.impl.PigContext; import org.apache.pig.impl.io.FileLocalizer; @@ -107,6 +109,7 @@ import org.apache.tez.common.TezJobConfi import org.apache.tez.common.TezUtils; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeManagerDescriptor; import org.apache.tez.dag.api.EdgeProperty; import org.apache.tez.dag.api.EdgeProperty.DataMovementType; import org.apache.tez.dag.api.GroupInputEdge; @@ -115,6 +118,8 @@ import org.apache.tez.dag.api.OutputDesc import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; +import org.apache.tez.dag.api.VertexManagerPluginDescriptor; +import org.apache.tez.dag.library.vertexmanager.ShuffleVertexManager; import org.apache.tez.mapreduce.combine.MRCombiner; import org.apache.tez.mapreduce.committer.MROutputCommitter; import org.apache.tez.mapreduce.common.MRInputSplitDistributor; @@ -133,6 +138,9 @@ import org.apache.tez.runtime.library.in public class TezDagBuilder extends TezOpPlanVisitor { private static final Log log = LogFactory.getLog(TezJobControlCompiler.class); + private static final String REDUCER_ESTIMATOR_KEY = "pig.exec.reducer.estimator"; + private static final String REDUCER_ESTIMATOR_ARG_KEY = "pig.exec.reducer.estimator.arg"; + private DAG dag; private Map localResources; private PigContext pc; @@ -343,6 +351,12 @@ public class TezDagBuilder extends TezOp in.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); out.setUserPayload(TezUtils.createUserPayloadFromConf(conf)); + if (to.getEstimatedParallelism()!=-1 && (to.isGlobalSort()||to.isSkewedJoin())) { + // Use custom edge + return new EdgeProperty((EdgeManagerDescriptor)null, + edge.dataSourceType, edge.schedulingType, out, in); + } + return new EdgeProperty(edge.dataMovementType, edge.dataSourceType, edge.schedulingType, out, in); } @@ -393,7 +407,11 @@ public class TezDagBuilder extends TezOp payloadConf = (JobConf) job.getConfiguration(); if (tezOp.sampleOperator != null) { - payloadConf.set("pig.sampleVertex", tezOp.sampleOperator.getOperatorKey().toString()); + payloadConf.set(PigProcessor.SAMPLE_VERTEX, tezOp.sampleOperator.getOperatorKey().toString()); + } + + if (tezOp.sortOperator != null) { + payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString()); } String tmp; @@ -492,7 +510,7 @@ public class TezDagBuilder extends TezOp // TODO Need to fix multiple input key mapping TezOperator identityInOutPred = null; for (TezOperator pred : mPlan.getPredecessors(tezOp)) { - if (!pred.isSampler()) { + if (!pred.isSampleAggregation()) { identityInOutPred = pred; break; } @@ -544,12 +562,9 @@ public class TezDagBuilder extends TezOp ObjectSerializer.serialize(stores)); } - // Take our assembled configuration and create a vertex - byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf); - procDesc.setUserPayload(userPayload); // Can only set parallelism here if the parallelism isn't derived from // splits - int parallelism = tezOp.getRequestedParallelism(); + int parallelism = -1; InputSplitInfo inputSplitInfo = null; if (loads != null && loads.size() > 0) { // Not using MRInputAMSplitGenerator because delegation tokens are @@ -559,28 +574,85 @@ public class TezDagBuilder extends TezOp // splits can be moved to if(loads) block below parallelism = inputSplitInfo.getNumTasks(); tezOp.setRequestedParallelism(parallelism); - } - if (tezOp.getRequestedParallelism() < 0) { - if (pc.defaultParallel > 0) { - parallelism = pc.defaultParallel; - } else { - // Rough estimation till we have Automatic Reducer Parallelism - // and Parallelism estimator. To be removed. - int sumOfPredParallelism = 0; - int predParallelism; - for (TezOperator pred : mPlan.getPredecessors(tezOp)) { - predParallelism = pred.getRequestedParallelism(); - if (predParallelism < 0) { - predParallelism = Math.max(pc.defaultParallel, 1); + } else { + int prevParallelism = -1; + boolean isOneToOneParallelism = false; + for (Map.Entry entry : tezOp.inEdges.entrySet()) { + if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) { + TezOperator pred = mPlan.getOperator(entry.getKey()); + parallelism = pred.getEffectiveParallelism(); + if (prevParallelism == -1) { + prevParallelism = parallelism; + } else if (prevParallelism != parallelism) { + throw new IOException("one to one sources parallelism for vertex " + + tezOp.getOperatorKey().toString() + " are not equal"); + } + if (pred.getRequestedParallelism()!=-1) { + tezOp.setRequestedParallelism(pred.getRequestedParallelism()); + } else { + tezOp.setEstimatedParallelism(pred.getEstimatedParallelism()); } - sumOfPredParallelism += predParallelism; + isOneToOneParallelism = true; + parallelism = -1; } - sumOfPredParallelism = Math.min(sumOfPredParallelism, 20); - parallelism = Math.max(sumOfPredParallelism, 1); } - tezOp.setRequestedParallelism(parallelism); + if (!isOneToOneParallelism) { + if (tezOp.getRequestedParallelism()!=-1) { + parallelism = tezOp.getRequestedParallelism(); + } else if (pc.defaultParallel!=-1) { + parallelism = pc.defaultParallel; + } else { + parallelism = estimateParallelism(job, mPlan, tezOp); + tezOp.setEstimatedParallelism(parallelism); + if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { + // Vertex manager will set parallelism + parallelism = -1; + } + } + } + } + + // Once we decide the parallelism of the sampler, propagate to + // downstream operators if necessary + if (tezOp.isSampler()) { + // There could be multiple sampler and share the same sample aggregation job + // and partitioner job + TezOperator sampleAggregationOper = null; + TezOperator sampleBasedPartionerOper = null; + TezOperator sortOper = null; + for (TezOperator succ : mPlan.getSuccessors(tezOp)) { + if (succ.isVertexGroup()) { + succ = mPlan.getSuccessors(succ).get(0); + } + if (succ.isSampleAggregation()) { + sampleAggregationOper = succ; + } else if (succ.isSampleBasedPartitioner()) { + sampleBasedPartionerOper = succ; + } + } + sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0); + + if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) { + // set estimate parallelism for order by/skewed join to sampler parallelism + // that include: + // 1. sort operator + // 2. constant for sample aggregation oper + sortOper.setEstimatedParallelism(parallelism); + ParallelConstantVisitor visitor = + new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism); + visitor.visit(); + } } + if (tezOp.isNeedEstimateParallelism()) { + payloadConf.setBoolean(PigProcessor.ESTIMATE_PARALLELISM, true); + log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString()); + } + + // Take our assembled configuration and create a vertex + byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf); + procDesc.setUserPayload(userPayload); + Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism, isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf)); @@ -656,6 +728,47 @@ public class TezDagBuilder extends TezOp if (stores.size() > 0) { new PigOutputFormat().checkOutputSpecs(job); } + + // Set the right VertexManagerPlugin + if (tezOp.getEstimatedParallelism() != -1) { + if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) { + // Set VertexManagerPlugin to PartitionerDefinedVertexManager, which is able + // to decrease/increase parallelism of sorting vertex dynamically + // based on the numQuantiles calculated by sample aggregation vertex + vertex.setVertexManagerPlugin(new VertexManagerPluginDescriptor( + PartitionerDefinedVertexManager.class.getName())); + log.info("Set VertexManagerPlugin to PartitionerDefinedParallelismVertexManager for vertex " + tezOp.getOperatorKey().toString()); + } else { + boolean containScatterGather = false; + boolean containCustomPartitioner = false; + for (TezEdgeDescriptor edge : tezOp.inEdges.values()) { + if (edge.dataMovementType == DataMovementType.SCATTER_GATHER) { + containScatterGather = true; + } + if (edge.partitionerClass!=null) { + containCustomPartitioner = true; + } + } + if (containScatterGather && !containCustomPartitioner) { + // Use auto-parallelism feature of ShuffleVertexManager to dynamically + // reduce the parallelism of the vertex + VertexManagerPluginDescriptor vmPluginDescriptor = new VertexManagerPluginDescriptor( + ShuffleVertexManager.class.getName()); + Configuration vmPluginConf = ConfigurationUtil.toConfiguration(pc.getProperties(), false); + vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true); + if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!= + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) { + vmPluginConf.setLong(ShuffleVertexManager.TEZ_AM_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE, + vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, + InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)); + } + vmPluginDescriptor.setUserPayload(TezUtils.createUserPayloadFromConf(vmPluginConf)); + vertex.setVertexManagerPlugin(vmPluginDescriptor); + log.info("Set auto parallelism for vertex " + tezOp.getOperatorKey().toString()); + } + } + } // Reset udfcontext jobconf. It is not supposed to be set in the front end UDFContext.getUDFContext().addJobConf(null); @@ -1034,5 +1147,19 @@ public class TezDagBuilder extends TezOp conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS, comparatorClass); } + + public static int estimateParallelism(Job job, TezOperPlan tezPlan, + TezOperator tezOp) throws IOException { + Configuration conf = job.getConfiguration(); + + TezParallelismEstimator estimator = conf.get(REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator() + : PigContext.instantiateObjectFromParams(conf, + REDUCER_ESTIMATOR_KEY, REDUCER_ESTIMATOR_ARG_KEY, + TezParallelismEstimator.class); + + log.info("Using parallel estimator: " + estimator.getClass().getName()); + int numberOfReducers = estimator.estimateParallelism(tezPlan, tezOp, conf); + return numberOfReducers; + } } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java?rev=1603243&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperDependencyParallelismEstimator.java Tue Jun 17 18:15:26 2014 @@ -0,0 +1,241 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Map.Entry; + +import org.apache.hadoop.conf.Configuration; +import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigReducerEstimator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper; +import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.OperatorKey; +import org.apache.pig.impl.plan.VisitorException; +import org.apache.tez.dag.api.EdgeProperty.DataMovementType; + +/** + * Estimate the parallelism of the vertex using: + * 1. parallelism of the predecessors + * 2. bloating factor of the physical plan of the predecessor + * + * Since currently it is only possible to reduce the parallelism + * estimation is exaggerated and will rely on Tez runtime to + * descrease the parallelism + */ +public class TezOperDependencyParallelismEstimator implements TezParallelismEstimator { + + static private int maxTaskCount; + static final double DEFAULT_FLATTEN_FACTOR = 10; + static final double DEFAULT_FILTER_FACTOR = 0.7; + static final double DEFAULT_LIMIT_FACTOR = 0.1; + + @Override + public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException { + + if (tezOper.isVertexGroup()) { + return -1; + } + + maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM, + PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM); + + // If parallelism is set explicitly, respect it + if (tezOper.getRequestedParallelism()!=-1) { + return tezOper.getRequestedParallelism(); + } + + // If we have already estimated parallelism, use that one + if (tezOper.getEstimatedParallelism()!=-1) { + return tezOper.getEstimatedParallelism(); + } + + List preds = plan.getPredecessors(tezOper); + if (preds==null) { + throw new IOException("Cannot estimate parallelism for source vertex"); + } + + double estimatedParallelism = 0; + + for (Entry entry : tezOper.inEdges.entrySet()) { + TezOperator pred = getPredecessorWithKey(plan, tezOper, entry.getKey().toString()); + + // Don't include broadcast edge, broadcast edge is used for + // replicated join (covered in TezParallelismFactorVisitor.visitFRJoin) + // and sample/scalar (does not impact parallelism) + if (entry.getValue().dataMovementType==DataMovementType.SCATTER_GATHER || + entry.getValue().dataMovementType==DataMovementType.ONE_TO_ONE) { + double predParallelism = pred.getEffectiveParallelism(); + if (predParallelism==-1) { + throw new IOException("Cannot estimate parallelism for " + tezOper.getOperatorKey().toString() + + ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString() + + " is -1"); + } + if (pred.plan!=null) { // pred.plan can be null if it is a VertexGroup + TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString()); + parallelismFactorVisitor.visit(); + predParallelism = predParallelism * parallelismFactorVisitor.getFactor(); + } + estimatedParallelism += predParallelism; + } + } + + int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism); + if (tezOper.isSampler()) { + TezOperator sampleAggregationOper = null; + TezOperator rangePartionerOper = null; + TezOperator sortOper = null; + for (TezOperator succ : plan.getSuccessors(tezOper)) { + if (succ.isSampleAggregation()) { + sampleAggregationOper = succ; + } else if (succ.isSampleBasedPartitioner()) { + rangePartionerOper = succ; + } + } + sortOper = plan.getSuccessors(rangePartionerOper).get(0); + + if (sortOper.getRequestedParallelism()!=-1) { + + ParallelConstantVisitor visitor = + new ParallelConstantVisitor(sampleAggregationOper.plan, roundedEstimatedParallelism); + visitor.visit(); + } + } + + return Math.min(roundedEstimatedParallelism, maxTaskCount); + } + + private static TezOperator getPredecessorWithKey(TezOperPlan plan, TezOperator tezOper, String inputKey) { + List preds = plan.getPredecessors(tezOper); + for (TezOperator pred : preds) { + if (pred.isVertexGroup()) { + for (OperatorKey unionPred : pred.getUnionPredecessors()) { + if (unionPred.toString().equals(inputKey)) { + return plan.getOperator(unionPred); + } + } + + } + else if (pred.getOperatorKey().toString().equals(inputKey)) { + return pred; + } + } + return null; + } + + public static class TezParallelismFactorVisitor extends PhyPlanVisitor { + private double factor = 1; + private String outputKey; + public TezParallelismFactorVisitor(PhysicalPlan plan, String outputKey) { + super(plan, new DepthFirstWalker(plan)); + this.outputKey = outputKey; + } + + @Override + public void visitFilter(POFilter fl) throws VisitorException { + if (fl.getPlan().size()==1 && fl.getPlan().getRoots().get(0) instanceof ConstantExpression) { + ConstantExpression cons = (ConstantExpression)fl.getPlan().getRoots().get(0); + if (cons.getValue().equals(Boolean.TRUE)) { + // skip all true condition + return; + } + } + factor *= DEFAULT_FILTER_FACTOR; + } + + @Override + public void visitPOForEach(POForEach nfe) throws VisitorException { + List flattens = nfe.getToBeFlattened(); + boolean containFlatten = false; + for (boolean flatten : flattens) { + if (flatten) { + containFlatten = true; + break; + } + } + if (containFlatten) { + factor *= DEFAULT_FLATTEN_FACTOR; + } + } + + @Override + public void visitLimit(POLimit lim) throws VisitorException { + factor = DEFAULT_LIMIT_FACTOR; + } + + public void visitFRJoin(POFRJoin join) throws VisitorException { + factor *= DEFAULT_FLATTEN_FACTOR; + } + + public void visitMergeJoin(POMergeJoin join) throws VisitorException { + factor *= DEFAULT_FLATTEN_FACTOR; + } + + @Override + public void visitPackage(POPackage pkg) throws VisitorException{ + // JoinPackager is equivalent to a foreach flatten after shuffle + if (pkg.getPkgr() instanceof JoinPackager) { + factor *= DEFAULT_FLATTEN_FACTOR; + } + } + + @Override + public void visitSplit(POSplit sp) throws VisitorException { + // Find the split branch connecting to current operator + // accumulating the bloating factor in this branch + PhysicalPlan plan = getSplitBranch(sp, outputKey); + pushWalker(mCurrentWalker.spawnChildWalker(plan)); + visit(); + popWalker(); + } + + private static PhysicalPlan getSplitBranch(POSplit split, String outputKey) throws VisitorException { + List plans = split.getPlans(); + for (PhysicalPlan plan : plans) { + LinkedList lrs = PlanHelper.getPhysicalOperators(plan, POLocalRearrangeTez.class); + if (!lrs.isEmpty()) { + return plan; + } + LinkedList vos = PlanHelper.getPhysicalOperators(plan, POValueOutputTez.class); + if (!vos.isEmpty()) { + return plan; + } + } + return null; + } + + public double getFactor() { + return factor; + } + + } +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Tue Jun 17 18:15:26 2014 @@ -62,6 +62,8 @@ public class TezOperator extends Operato // even when parallelism of source vertex changes. // Can change to int and set to -1 if TEZ-800 gets fixed. private AtomicInteger requestedParallelism = new AtomicInteger(-1); + + private int estimatedParallelism = -1; // TODO: When constructing Tez vertex, we have to specify how much resource // the vertex will need. So we need to estimate these values while compiling @@ -89,6 +91,9 @@ public class TezOperator extends Operato //Indicates if this job is an order by job boolean globalSort = false; + //Indicate if this job is a union job + boolean union = false; + //The sort order of the columns; //asc is true and desc is false boolean[] sortOrder; @@ -104,21 +109,31 @@ public class TezOperator extends Operato // are NOT combinable for correctness. private boolean combineSmallSplits = true; - // If not null, need to collect sample sent from predecessor + // Used by partition vertex, if not null, need to collect sample sent from predecessor TezOperator sampleOperator = null; + // Used by sample vertex, send parallelism event to orderOperator + TezOperator sortOperator = null; + + // If the flag is set, FindQuantilesTez/PartitionSkewedKeysTez will use aggregated sample + // to calculate the number of parallelism at runtime, instead of the numQuantiles/totalReducers_ + // parameter set statically + private boolean needEstimateParallelism = false; + // If true, we will use secondary key sort in the job private boolean useSecondaryKey = false; // Types of blocking operators. For now, we only support the following ones. private static enum OPER_FEATURE { NONE, - // Indicate if this job is a union job - UNION, // Indicate if this job is a merge indexer INDEXER, // Indicate if this job is a sampling job SAMPLER, + // Indicate if this job is a sample aggregation job + SAMPLE_AGGREGATOR, + // Indicate if this job is a sample based partition job (order by/skewed join) + SAMPLE_BASED_PARTITIONER, // Indicate if this job is a group by job GROUPBY, // Indicate if this job is a cogroup job @@ -175,6 +190,18 @@ public class TezOperator extends Operato this.requestedParallelism = oper.requestedParallelism; } + public int getEstimatedParallelism() { + return estimatedParallelism; + } + + public void setEstimatedParallelism(int estimatedParallelism) { + this.estimatedParallelism = estimatedParallelism; + } + + public int getEffectiveParallelism() { + return getRequestedParallelism()!=-1? getRequestedParallelism() : getEstimatedParallelism(); + } + public OperatorKey getSplitParent() { return splitParent; } @@ -224,11 +251,11 @@ public class TezOperator extends Operato } public boolean isUnion() { - return (feature == OPER_FEATURE.UNION); + return union; } - public void markUnion() { - feature = OPER_FEATURE.UNION; + public void setUnion() { + union = true; } public boolean isIndexer() { @@ -247,6 +274,30 @@ public class TezOperator extends Operato feature = OPER_FEATURE.SAMPLER; } + public boolean isSampleAggregation() { + return (feature == OPER_FEATURE.SAMPLE_AGGREGATOR); + } + + public void markSampleAggregation() { + feature = OPER_FEATURE.SAMPLE_AGGREGATOR; + } + + public boolean isSampleBasedPartitioner() { + return (feature == OPER_FEATURE.SAMPLE_BASED_PARTITIONER); + } + + public void markSampleBasedPartitioner() { + feature = OPER_FEATURE.SAMPLE_BASED_PARTITIONER; + } + + public void setNeedEstimatedQuantile(boolean needEstimateParallelism) { + this.needEstimateParallelism = needEstimateParallelism; + } + + public boolean isNeedEstimateParallelism() { + return needEstimateParallelism; + } + public boolean isUseSecondaryKey() { return useSecondaryKey; } Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java?rev=1603243&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezParallelismEstimator.java Tue Jun 17 18:15:26 2014 @@ -0,0 +1,26 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.tez; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; + +public interface TezParallelismEstimator { + public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException; +} Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/WeightedRangePartitionerTez.java Tue Jun 17 18:15:26 2014 @@ -22,6 +22,7 @@ import java.util.Map.Entry; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator; import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner; import org.apache.pig.data.DataBag; @@ -33,30 +34,25 @@ import org.apache.pig.impl.io.PigNullabl public class WeightedRangePartitionerTez extends WeightedRangePartitioner { private static final Log LOG = LogFactory.getLog(WeightedRangePartitionerTez.class); - @SuppressWarnings("unchecked") + private Integer estimatedNumPartitions; + @Override - public void init() { - ObjectCache cache = ObjectCache.getInstance(); - String isCachedKey = "sample-" + PigProcessor.sampleVertex + ".cached"; - String quantilesCacheKey = "sample-" + PigProcessor.sampleVertex + ".quantiles"; - String weightedPartsCacheKey = "sample-" + PigProcessor.sampleVertex + ".weightedParts"; - if (cache.retrieve(isCachedKey) == Boolean.TRUE) { - quantiles = (PigNullableWritable[]) cache - .retrieve(quantilesCacheKey); - weightedParts = (Map) cache - .retrieve(weightedPartsCacheKey); - LOG.info("Found quantiles and weightedParts in Tez cache. cachekey=" - + quantilesCacheKey + "," + weightedPartsCacheKey); - inited = true; - return; + public int getPartition(PigNullableWritable key, Writable value, + int numPartitions){ + if (estimatedNumPartitions!=null) { + numPartitions = estimatedNumPartitions; } + return super.getPartition(key, value, numPartitions); + } + @Override + public void init() { Map quantileMap = null; if (PigProcessor.sampleMap != null) { // We've collected sampleMap in PigProcessor quantileMap = PigProcessor.sampleMap; } else { - LOG.info("Quantiles map is empty"); + LOG.warn("Quantiles map is empty"); inited = true; return; } @@ -65,6 +61,7 @@ public class WeightedRangePartitionerTez try { DataBag quantilesList = (DataBag) quantileMap.get(FindQuantiles.QUANTILES_LIST); InternalMap weightedPartsData = (InternalMap) quantileMap.get(FindQuantiles.WEIGHTED_PARTS); + estimatedNumPartitions = (Integer)quantileMap.get(PigProcessor.ESTIMATED_NUM_PARALLELISM); convertToArray(quantilesList); for (Entry ent : weightedPartsData.entrySet()) { Tuple key = (Tuple) ent.getKey(); // sample item which repeats @@ -77,9 +74,6 @@ public class WeightedRangePartitionerTez } LOG.info("Initialized WeightedRangePartitionerTez. Time taken: " + (System.currentTimeMillis() - start)); - cache.cache(isCachedKey, Boolean.TRUE); - cache.cache(quantilesCacheKey, quantiles); - cache.cache(weightedPartsCacheKey, weightedParts); inited = true; } } Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Tue Jun 17 18:15:26 2014 @@ -273,6 +273,9 @@ public class UnionOptimizer extends TezO pred.setUseSecondaryKey(unionOp.isUseSecondaryKey()); pred.UDFs.addAll(unionOp.UDFs); pred.scalars.addAll(unionOp.scalars); + if (unionOp.isSampler()) { + pred.markSampler(); + } } public static PhysicalPlan getUnionPredPlanFromSplit(PhysicalPlan plan, String unionOpKey) throws VisitorException { Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1603243&view=auto ============================================================================== --- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (added) +++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Tue Jun 17 18:15:26 2014 @@ -0,0 +1,55 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.pig.backend.hadoop.executionengine.util; + +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor; +import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan; +import org.apache.pig.impl.plan.DepthFirstWalker; +import org.apache.pig.impl.plan.VisitorException; + +public class ParallelConstantVisitor extends PhyPlanVisitor { + + private int rp; + + private boolean replaced = false; + + public ParallelConstantVisitor(PhysicalPlan plan, int rp) { + super(plan, new DepthFirstWalker( + plan)); + this.rp = rp; + } + + @Override + public void visitConstant(ConstantExpression cnst) throws VisitorException { + if (cnst.getRequestedParallelism() == -1) { + Object obj = cnst.getValue(); + if (obj instanceof Integer) { + if (replaced) { + // sample job should have only one ConstantExpression + throw new VisitorException("Invalid reduce plan: more " + + "than one ConstantExpression found in sampling job"); + } + cnst.setValue(rp); + cnst.setRequestedParallelism(rp); + replaced = true; + } + } + } +} Modified: pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/FindQuantiles.java Tue Jun 17 18:15:26 2014 @@ -53,6 +53,9 @@ public class FindQuantiles extends EvalF enum State { ALL_ASC, ALL_DESC, MIXED }; State mState; + protected Integer numQuantiles = null; + protected DataBag samples = null; + private class SortComparator implements Comparator { @Override @SuppressWarnings("unchecked") @@ -155,17 +158,19 @@ public class FindQuantiles extends EvalF Map output = new HashMap(); if(in==null || in.size()==0) return null; - Integer numQuantiles = null; - DataBag samples = null; + ArrayList quantilesList = new ArrayList(); InternalMap weightedParts = new InternalMap(); // the sample file has a tuple as under: // (numQuantiles, bag of samples) // numQuantiles here is the reduce parallelism try{ - numQuantiles = (Integer)in.get(0); - samples = (DataBag)in.get(1); - + if (numQuantiles == null) { + numQuantiles = (Integer)in.get(0); + } + if (samples == null) { + samples = (DataBag)in.get(1); + } long numSamples = samples.size(); long toSkip = numSamples / numQuantiles; if(toSkip == 0) { Modified: pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java?rev=1603243&r1=1603242&r2=1603243&view=diff ============================================================================== --- pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java (original) +++ pig/trunk/src/org/apache/pig/impl/builtin/PartitionSkewedKeys.java Tue Jun 17 18:15:26 2014 @@ -75,14 +75,14 @@ public class PartitionSkewedKeys extends private int currentIndex_; - private int totalReducers_; - private long totalMemory_; private long totalSampleCount_; private double heapPercentage_; + protected int totalReducers_; + // specify how many tuple a reducer can hold for a key // this is for testing purpose. If not specified, then // it is calculated based on memory size and size of tuple @@ -135,7 +135,9 @@ public class PartitionSkewedKeys extends long totalInputRows = 0; try { - totalReducers_ = (Integer) in.get(0); + if (totalReducers_ == -1) { + totalReducers_ = (Integer) in.get(0); + } DataBag samples = (DataBag) in.get(1); totalSampleCount_ = samples.size(); @@ -271,7 +273,7 @@ public class PartitionSkewedKeys extends } // the last field of the tuple is a tuple for memory size and disk size - private long getMemorySize(Tuple t) { + protected long getMemorySize(Tuple t) { int s = t.size(); try { return (Long) t.get(s - 2);