pig-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From roh...@apache.org
Subject svn commit: r1565502 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez: ./ util/
Date Fri, 07 Feb 2014 00:28:17 GMT
Author: rohini
Date: Fri Feb  7 00:28:17 2014
New Revision: 1565502

URL: http://svn.apache.org/r1565502
Log:
PIG-3742: Set MR runtime settings on tez runtime

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java
      - copied, changed from r1565495, pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecurityHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
      - copied, changed from r1565495, pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
Removed:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecurityHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POIdentityInOutTez.java
Fri Feb  7 00:28:17 2014
@@ -31,6 +31,8 @@ import org.apache.tez.runtime.api.Logica
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueReader;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
 /**
  * POIdentityInOutTez is used to pass through tuples as is to next vertex from
@@ -44,6 +46,8 @@ public class POIdentityInOutTez extends 
     private static final long serialVersionUID = 1L;
     private String inputKey;
     private transient KeyValueReader reader;
+    private transient KeyValuesReader shuffleReader;
+    private transient boolean shuffleInput;
 
     public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange) {
         super(inputRearrange);
@@ -62,7 +66,13 @@ public class POIdentityInOutTez extends 
             throw new ExecException("Input from vertex " + inputKey + " is missing");
         }
         try {
-            reader = (KeyValueReader) input.getReader();
+            if (input instanceof ShuffledMergedInput) {
+                shuffleInput = true;
+                ShuffledMergedInput smInput = (ShuffledMergedInput) input;
+                shuffleReader = smInput.getReader();
+            } else {
+                reader = (KeyValueReader) input.getReader();
+            }
         } catch (Exception e) {
             throw new ExecException(e);
         }
@@ -85,12 +95,21 @@ public class POIdentityInOutTez extends 
     @Override
     public Result getNextTuple() throws ExecException {
         try {
-            if (!reader.next()) {
-                return RESULT_EOP;
+            if (shuffleInput) {
+                while (shuffleReader.next()) {
+                    Object curKey = shuffleReader.getCurrentKey();
+                    Iterable<Object> vals = shuffleReader.getCurrentValues();
+                    for (Object val : vals) {
+                        writer.write(curKey, val);
+                    }
+                }
             } else {
-                writer.write(reader.getCurrentKey(), reader.getCurrentValue());
+                while (reader.next()) {
+                    writer.write(reader.getCurrentKey(),
+                            reader.getCurrentValue());
+                }
             }
-            return RESULT_EMPTY;
+            return RESULT_EOP;
         } catch (IOException e) {
             throw new ExecException(e);
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
Fri Feb  7 00:28:17 2014
@@ -77,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTezFactory.LocalRearrangeType;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.builtin.DefaultIndexableLoader;
@@ -251,9 +252,10 @@ public class TezCompiler extends PhyPlan
     }
 
     private void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper)
{
-        if (subPlanOper.requestedParallelism > parentOper.requestedParallelism) {
-            parentOper.requestedParallelism = subPlanOper.requestedParallelism;
+        if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism())
{
+            parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
         }
+        subPlanOper.setRequestedParallelismByReference(parentOper);
         if (subPlanOper.UDFs != null) {
             parentOper.UDFs.addAll(subPlanOper.UDFs);
         }
@@ -357,8 +359,8 @@ public class TezCompiler extends PhyPlan
 
         // Now we have the inputs compiled. Do something with the input oper op.
         op.visit(this);
-        if (op.getRequestedParallelism() > curTezOp.requestedParallelism) {
-            curTezOp.requestedParallelism = op.getRequestedParallelism();
+        if (op.getRequestedParallelism() > curTezOp.getRequestedParallelism()) {
+            curTezOp.setRequestedParallelism(op.getRequestedParallelism());
         }
         compiledInputs = prevCompInp;
     }
@@ -564,8 +566,8 @@ public class TezCompiler extends PhyPlan
             tezPlan.connect(it.next(), ret);
         }
         for (TezOperator tezOp : toBeRemoved) {
-            if (tezOp.requestedParallelism > ret.requestedParallelism) {
-                ret.requestedParallelism = tezOp.requestedParallelism;
+            if (tezOp.getRequestedParallelism() > ret.getRequestedParallelism()) {
+                ret.setRequestedParallelism(tezOp.getRequestedParallelism());
             }
             for (String udf : tezOp.UDFs) {
                 if (!ret.UDFs.contains(udf)) {
@@ -802,7 +804,7 @@ public class TezCompiler extends PhyPlan
 
             // If the parallelism of the current vertex is one and it doesn't do a LOAD (whose
             // parallelism is determined by the InputFormat), we don't need another vertex.
-            if (curTezOp.requestedParallelism == 1) {
+            if (curTezOp.getRequestedParallelism() == 1) {
                 boolean canStop = true;
                 for (PhysicalOperator planOp : curTezOp.plan.getRoots()) {
                     if (planOp instanceof POLoad) {
@@ -849,7 +851,7 @@ public class TezCompiler extends PhyPlan
             }
 
             // Explicitly set the parallelism for the new vertex to 1.
-            curTezOp.requestedParallelism = 1;
+            curTezOp.setRequestedParallelism(1);
         } catch (Exception e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1064,7 +1066,7 @@ public class TezCompiler extends PhyPlan
                 rightTezOprAggr = getTezOp();
                 tezPlan.add(rightTezOprAggr);
                 TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr,
scope, nig);
-                rightTezOprAggr.requestedParallelism = 1; // we need exactly one task for
indexing job.
+                rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for
indexing job.
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
                 FileSpec strFile = getTempFileSpec();
@@ -1291,9 +1293,11 @@ public class TezCompiler extends PhyPlan
             // Run POLocalRearrange for first join table. Note we set the
             // parallelism of POLocalRearrange to that of the load vertex. So
             // its parallelism will be determined by the size of skewed table.
+            //TODO: Check if this really works as load vertex parallelism
+            // is determined during vertex construction.
             POLocalRearrangeTez lr =
                     new POLocalRearrangeTez(new OperatorKey(scope,nig.getNextNodeId(scope)),
-                            prevOp.requestedParallelism);
+                            Math.max(prevOp.getRequestedParallelism(), 1));
             try {
                 lr.setIndex(0);
             } catch (ExecException e) {
@@ -1350,8 +1354,8 @@ public class TezCompiler extends PhyPlan
             gr.setResultType(DataType.TUPLE);
             gr.visit(this);
             joinJobs[2] = curTezOp;
-            if (gr.getRequestedParallelism() > joinJobs[2].requestedParallelism) {
-                joinJobs[2].requestedParallelism = gr.getRequestedParallelism();
+            if (gr.getRequestedParallelism() > joinJobs[2].getRequestedParallelism())
{
+                joinJobs[2].setRequestedParallelism(gr.getRequestedParallelism());
             }
 
             compiledInputs = new TezOperator[] {joinJobs[2]};
@@ -1757,7 +1761,7 @@ public class TezCompiler extends PhyPlan
         }
         oper.setClosed(true);
 
-        oper.requestedParallelism = 1;
+        oper.setRequestedParallelism(1);
         oper.markSampler();
         return new Pair<TezOperator, Integer>(oper, rp);
     }
@@ -1938,6 +1942,8 @@ public class TezCompiler extends PhyPlan
 
             TezOperator prevOper = endSingleInputWithStoreAndSample(op, lr, lrSample, keyType,
fields);
 
+            //TODO: Dynamic Reducer estimation or some equivalent of JobControlCompiler.calculateRuntimeReducers
+            // pigContext.defaultParallel to be taken into account
             int rp = Math.max(op.getRequestedParallelism(), 1);
 
             Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op,
rp);
@@ -1945,21 +1951,23 @@ public class TezCompiler extends PhyPlan
 
             TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
 
-            // TODO: Convert to unsorted shuffle after TEZ-661
-            // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
-            // edge.partitionerClass = RoundRobinPartitioner.class;
-
-            // TODO: Test which is better - ONE_TO_ONE from prevOper to same number of
-            // tasks in sortOpers[0] and then sortOpers[1] with requestedParallelism
-            // or unsorted shuffled output (TEZ-661) from prevOper to
-            // sortOpers[0] with requestedParallelism and then sortOpers[1] with
-            // requestedParallelism
-            sortOpers[0].requestedParallelism = prevOper.requestedParallelism;
-            sortOpers[1].requestedParallelism = quantJobParallelismPair.second;
+            // Use 1-1 edge
             edge.dataMovementType = DataMovementType.ONE_TO_ONE;
             edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
             edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            // If prevOper.requestedParallelism changes based on no. of input splits
+            // it will reflect for sortOpers[0] so that 1-1 edge will work.
+            sortOpers[0].setRequestedParallelismByReference(prevOper);
+            sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
+
+            /*
+            // TODO: Convert to unsorted shuffle after TEZ-661
+            // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+            // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            edge.partitionerClass = RoundRobinPartitioner.class;
+            sortOpers[0].setRequestedParallelism(quantJobParallelismPair.second);
+            sortOpers[1].setRequestedParallelism(quantJobParallelismPair.second);
+            */
 
             handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Fri Feb  7 00:28:17 2014
@@ -35,9 +35,7 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
@@ -75,6 +73,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezPOPackageAnnotator.LoRearrangeDiscoverer;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
@@ -94,7 +94,6 @@ import org.apache.tez.dag.api.EdgeProper
 import org.apache.tez.dag.api.InputDescriptor;
 import org.apache.tez.dag.api.OutputDescriptor;
 import org.apache.tez.dag.api.ProcessorDescriptor;
-import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.mapreduce.combine.MRCombiner;
 import org.apache.tez.mapreduce.committer.MROutputCommitter;
@@ -133,10 +132,14 @@ public class TezDagBuilder extends TezOp
 
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
+        TezOperPlan tezPlan = getPlan();
+        List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
+
         // Construct vertex for the current Tez operator
         Vertex to = null;
         try {
-            to = newVertex(tezOp);
+            boolean isMap = (predecessors == null || predecessors.isEmpty()) ? true : false;
+            to = newVertex(tezOp, isMap);
             dag.addVertex(to);
         } catch (Exception e) {
             throw new VisitorException("Cannot create vertex for "
@@ -144,8 +147,6 @@ public class TezDagBuilder extends TezOp
         }
 
         // Connect the new vertex with predecessor vertices
-        TezOperPlan tezPlan = getPlan();
-        List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
         if (predecessors != null) {
             for (TezOperator predecessor : predecessors) {
                 // Since this is a dependency order walker, predecessor vertices
@@ -238,6 +239,7 @@ public class TezDagBuilder extends TezOp
                     edge.partitionerClass.getName());
         }
 
+        MRToTezHelper.convertMRToTezRuntimeConf(conf, globalConf);
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
@@ -271,7 +273,7 @@ public class TezDagBuilder extends TezOp
                 .serialize(new byte[] { combRearrange.getKeyType() }));
     }
 
-    private Vertex newVertex(TezOperator tezOp) throws IOException,
+    private Vertex newVertex(TezOperator tezOp, boolean isMap) throws IOException,
             ClassNotFoundException, InterruptedException {
         ProcessorDescriptor procDesc = new ProcessorDescriptor(
                 tezOp.getProcessorName());
@@ -383,12 +385,14 @@ public class TezDagBuilder extends TezOp
 
         UDFContext.getUDFContext().serialize(payloadConf);
 
+        MRToTezHelper.convertMRToTezRuntimeConf(payloadConf, globalConf);
+
         // Take our assembled configuration and create a vertex
         byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);
         // Can only set parallelism here if the parallelism isn't derived from
         // splits
-        int parallelism = tezOp.requestedParallelism;
+        int parallelism = Math.max(tezOp.getRequestedParallelism(), 1);
         InputSplitInfo inputSplitInfo = null;
         if (loads != null && loads.size() > 0) {
             // Not using MRInputAMSplitGenerator because delegation tokens are
@@ -397,20 +401,25 @@ public class TezDagBuilder extends TezOp
             // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
             // splits can be moved to if(loads) block below
             parallelism = inputSplitInfo.getNumTasks();
+            tezOp.setRequestedParallelism(parallelism);
         }
         Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
-                Resource.newInstance(tezOp.requestedMemory, tezOp.requestedCpu));
+                isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
 
-        Map<String, String> env = new HashMap<String, String>();
-        MRHelpers.updateEnvironmentForMRTasks(globalConf, env, true);
-        vertex.setTaskEnvironment(env);
+        Map<String, String> taskEnv = new HashMap<String, String>();
+        MRHelpers.updateEnvironmentForMRTasks(globalConf, taskEnv, isMap);
+        vertex.setTaskEnvironment(taskEnv);
 
         vertex.setTaskLocalResources(localResources);
 
-        // This could also be reduce, but we need to choose one
-        // TODO: Create new or use existing settings that are specifically for
-        // Tez.
-        vertex.setJavaOpts(MRHelpers.getMapJavaOpts(globalConf));
+        vertex.setJavaOpts(isMap ? MRHelpers.getMapJavaOpts(globalConf)
+                : MRHelpers.getReduceJavaOpts(globalConf));
+
+        log.info("For vertex - " + tezOp.getOperatorKey().toString()
+                + ": parallelism=" + parallelism
+                + ", memory=" + vertex.getTaskResource().getMemory()
+                + ", java opts=" + vertex.getJavaOpts()
+                );
 
         // Right now there can only be one of each of these. Will need to be
         // more generic when there can be more.
@@ -459,15 +468,6 @@ public class TezDagBuilder extends TezOp
         // specified in mapreduce.job.hdfs-servers
         SecurityHelper.populateTokenCache(job.getConfiguration(), dag.getCredentials());
 
-        // Unlike MR which gets staging dir from RM, tez AM is picking it up
-        // from configuration
-        Path submitJobDir = new Path(payloadConf.get(
-                TezConfiguration.TEZ_AM_STAGING_DIR,
-                TezConfiguration.TEZ_AM_STAGING_DIR_DEFAULT));
-        // Get delegation token for the job submission dir
-        TokenCache.obtainTokensForNamenodes(dag.getCredentials(),
-                new Path[] { submitJobDir }, payloadConf);
-
         return vertex;
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
Fri Feb  7 00:28:17 2014
@@ -21,12 +21,10 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.UUID;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
 import org.apache.pig.PigConfiguration;
@@ -35,6 +33,7 @@ import org.apache.pig.backend.hadoop.dat
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
@@ -56,8 +55,7 @@ public class TezLauncher extends Launche
         aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning",
"false"));
         Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
 
-        FileSystem fs = FileSystem.get(conf);
-        Path stagingDir = new Path(fs.getWorkingDirectory(), UUID.randomUUID().toString());
+        Path stagingDir = FileLocalizer.getTemporaryPath(pc, "-tez");
 
         TezResourceManager.initialize(stagingDir, pc, conf);
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
Fri Feb  7 00:28:17 2014
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -50,14 +51,16 @@ public class TezOperator extends Operato
 
     // TODO: We need to specify parallelism per vertex in Tez. For now, we set
     // them all to 1.
-    int requestedParallelism = 1;
+    // Use AtomicInteger for access by reference and being able to reset in
+    // TezDAGBuilder based on number of input splits. We just need mutability and not concurrency
+    private AtomicInteger requestedParallelism = new AtomicInteger(-1);
 
     // TODO: When constructing Tez vertex, we have to specify how much resource
     // the vertex will need. So we need to estimate these values while compiling
     // physical plan into tez plan. For now, we're using default values - 1G mem
     // and 1 core.
-    int requestedMemory = 1024;
-    int requestedCpu = 1;
+    //int requestedMemory = 1024;
+    //int requestedCpu = 1;
 
     // Presence indicates that this TezOper is sub-plan of a POSplit.
     // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
@@ -144,6 +147,18 @@ public class TezOperator extends Operato
         return true;
     }
 
+    public int getRequestedParallelism() {
+        return requestedParallelism.get();
+    }
+
+    public void setRequestedParallelism(int requestedParallelism) {
+        this.requestedParallelism.set(requestedParallelism);
+    }
+
+    public void setRequestedParallelismByReference(TezOperator oper) {
+        this.requestedParallelism = oper.requestedParallelism;
+    }
+
     public OperatorKey getSplitOperatorKey() {
         return splitOperatorKey;
     }
@@ -309,5 +324,6 @@ public class TezOperator extends Operato
     public boolean combineSmallSplits() {
         return combineSmallSplits;
     }
+
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1565502&r1=1565501&r2=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
Fri Feb  7 00:28:17 2014
@@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
 import org.apache.tez.client.AMConfiguration;
 import org.apache.tez.client.TezClient;
@@ -38,6 +39,7 @@ import org.apache.tez.client.TezSessionC
 import org.apache.tez.client.TezSessionStatus;
 import org.apache.tez.dag.api.TezConfiguration;
 import org.apache.tez.dag.api.TezException;
+import org.apache.tez.mapreduce.hadoop.MRHelpers;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -99,19 +101,26 @@ public class TezSessionManager {
 
     private static SessionInfo createSession(Configuration conf, Map<String, LocalResource>
requestedAMResources, Credentials creds) throws TezException, IOException {
         TezConfiguration tezConf = new TezConfiguration(conf);
+        AMConfiguration amConfig = getAMConfig(tezConf, requestedAMResources, creds);
+        TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
+
+        String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = new TezClient(tezConf);
         ApplicationId appId = tezClient.createApplication();
 
-        Map<String, LocalResource> resources = new HashMap<String, LocalResource>();
-        resources.putAll(requestedAMResources);
-
-        String jobName = conf.get(PigContext.JOB_NAME, "pig");
-        AMConfiguration amConfig = new AMConfiguration(null, resources, tezConf, creds);
-        TezSessionConfiguration sessionConfig = new TezSessionConfiguration(amConfig, tezConf);
         TezSession tezSession = new TezSession(jobName, appId, sessionConfig);
         tezSession.start();
         waitForTezSessionReady(tezSession);
-        return new SessionInfo(tezSession, resources);
+        return new SessionInfo(tezSession, requestedAMResources);
+    }
+
+    private static AMConfiguration getAMConfig(TezConfiguration tezConf, Map<String, LocalResource>
resources, Credentials creds) {
+        TezConfiguration dagAMConf = MRToTezHelper.getDAGAMConfFromMRConf(tezConf);
+        Map<String, String> amEnv = new HashMap<String, String>();
+        MRHelpers.updateEnvironmentForMRAM(tezConf, amEnv);
+
+        AMConfiguration amConfig = new AMConfiguration(amEnv, resources, dagAMConf, creds);
+        return amConfig;
     }
 
     private static boolean validateSessionResources(SessionInfo currentSession, Map<String,
LocalResource> requestedAMResources) throws TezException, IOException {

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java?rev=1565502&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
(added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/MRToTezHelper.java
Fri Feb  7 00:28:17 2014
@@ -0,0 +1,134 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.util.Pair;
+import org.apache.tez.common.TezJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.mapreduce.hadoop.DeprecatedKeys;
+import org.apache.tez.mapreduce.hadoop.MRJobConfig;
+
+@InterfaceAudience.Private
+public class MRToTezHelper {
+
+    private static final Log LOG = LogFactory.getLog(MRToTezHelper.class);
+
+    /**
+     * Keys which are used across an edge. i.e. by an Output-Input pair.
+     */
+    private static Map<String, Pair<String, String>> mrToTezIOParamMap = new
HashMap<String, Pair<String, String>>();
+
+    private MRToTezHelper() {
+    }
+
+    static {
+        populateMRToTezIOParamMap();
+    }
+
+    private static void populateMRToTezIOParamMap() {
+        mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS,
+                new Pair<String, String> (
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_SHOULD_COMPRESS,
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_IS_COMPRESSED));
+
+        mrToTezIOParamMap.put(MRJobConfig.MAP_OUTPUT_COMPRESS_CODEC,
+                new Pair<String, String> (
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_COMPRESS_CODEC,
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_COMPRESS_CODEC));
+
+    }
+
+    public static TezConfiguration getDAGAMConfFromMRConf(
+            TezConfiguration tezConf) {
+
+        // Set Tez parameters based on MR parameters.
+        TezConfiguration dagAMConf = new TezConfiguration(tezConf);
+        Map<String, String> mrParamToDAGParamMap = DeprecatedKeys
+                .getMRToDAGParamMap();
+
+        for (Entry<String, String> entry : mrParamToDAGParamMap.entrySet()) {
+            if (dagAMConf.get(entry.getKey()) != null) {
+                dagAMConf.set(entry.getValue(), dagAMConf.get(entry.getKey()));
+                dagAMConf.unset(entry.getKey());
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("MR->DAG Translating MR key: " + entry.getKey()
+                            + " to Tez key: " + entry.getValue()
+                            + " with value " + dagAMConf.get(entry.getValue()));
+                }
+            }
+        }
+
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_JAVA_OPTS,
+                org.apache.tez.mapreduce.hadoop.MRHelpers
+                        .getMRAMJavaOpts(tezConf));
+
+        String queueName = tezConf.get(JobContext.QUEUE_NAME,
+                YarnConfiguration.DEFAULT_QUEUE_NAME);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_QUEUE_NAME, queueName);
+
+        int amMemMB = tezConf.getInt(MRJobConfig.MR_AM_VMEM_MB,
+                MRJobConfig.DEFAULT_MR_AM_VMEM_MB);
+        int amCores = tezConf.getInt(MRJobConfig.MR_AM_CPU_VCORES,
+                MRJobConfig.DEFAULT_MR_AM_CPU_VCORES);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, ""
+                + amMemMB);
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_RESOURCE_CPU_VCORES, ""
+                + amCores);
+
+        dagAMConf.setIfUnset(TezConfiguration.TEZ_AM_MAX_APP_ATTEMPTS, ""
+                + dagAMConf.getInt(MRJobConfig.MR_AM_MAX_ATTEMPTS,
+                        MRJobConfig.DEFAULT_MR_AM_MAX_ATTEMPTS));
+
+        return dagAMConf;
+    }
+
+    public static void convertMRToTezRuntimeConf(Configuration conf, Configuration baseConf)
{
+        for (Entry<String, String> dep : DeprecatedKeys
+                .getMRToTezRuntimeParamMap().entrySet()) {
+            if (baseConf.get(dep.getKey()) != null) {
+                conf.unset(dep.getKey());
+                LOG.info("Setting " + dep.getValue() + " to "
+                        + baseConf.get(dep.getKey()) + " from MR setting "
+                        + dep.getKey());
+                conf.setIfUnset(dep.getValue(), baseConf.get(dep.getKey()));
+            }
+        }
+
+        for (Entry<String, Pair<String, String>> dep : mrToTezIOParamMap.entrySet())
{
+            if (baseConf.get(dep.getKey()) != null) {
+                conf.unset(dep.getKey());
+                LOG.info("Setting " + dep.getValue() + " to "
+                        + baseConf.get(dep.getKey()) + " from MR setting "
+                        + dep.getKey());
+                conf.setIfUnset(dep.getValue().first, baseConf.get(dep.getKey()));
+                conf.setIfUnset(dep.getValue().second, baseConf.get(dep.getKey()));
+            }
+        }
+    }
+
+}

Copied: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java
(from r1565495, pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecurityHelper.java)
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java?p2=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java&p1=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecurityHelper.java&r1=1565495&r2=1565502&rev=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecurityHelper.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/SecurityHelper.java
Fri Feb  7 00:28:17 2014
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig.backend.hadoop.executionengine.tez;
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
 import java.io.File;
 import java.io.IOException;

Copied: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
(from r1565495, pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java)
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?p2=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java&p1=pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java&r1=1565495&r2=1565502&rev=1565502&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
(original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
Fri Feb  7 00:28:17 2014
@@ -1,4 +1,4 @@
-package org.apache.pig.backend.hadoop.executionengine.tez;
+package org.apache.pig.backend.hadoop.executionengine.tez.util;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -11,6 +11,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.tez.POLocalRearrangeTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.POStoreTez;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezEdgeDescriptor;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezOperator;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;



Mime
View raw message