hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gunt...@apache.org
Subject svn commit: r1513299 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/tez/DagUtils.java exec/tez/TezTask.java optimizer/GenMapRedUtils.java parse/GenTezWork.java
Date Mon, 12 Aug 2013 23:42:42 GMT
Author: gunther
Date: Mon Aug 12 23:42:42 2013
New Revision: 1513299

URL: http://svn.apache.org/r1513299
Log:
HIVE-5058: Fix NPE issue with DAG submission in TEZ (Gunther Hagleitner)

Modified:
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
    hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1513299&r1=1513298&r2=1513299&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Aug
12 23:42:42 2013
@@ -82,10 +82,12 @@ import org.apache.tez.mapreduce.processo
  */
 public class DagUtils {
 
+  private static final String TEZ_DIR = "_tez_scratch_dir";
+
   /*
    * Creates the configuration object necessary to run a specific vertex from
    * map work. This includes input formats, input processor, etc.
-=  */
+   */
   private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) {
     JobConf conf = new JobConf(baseConf);
 
@@ -124,8 +126,8 @@ public class DagUtils {
       inpFormat = BucketizedHiveInputFormat.class.getName();
     }
 
-    conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName());
-    conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inpFormat);
+    conf.set("mapred.mapper.class", ExecMapper.class.getName());
+    conf.set("mapred.input.format.class", inpFormat);
 
     return conf;
   }
@@ -141,11 +143,16 @@ public class DagUtils {
    * @param w The second vertex (sink)
    * @return
    */
-  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) {
+  public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) 
+      throws IOException {
 
     // Tez needs to setup output subsequent input pairs correctly
     MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf);
 
+    // update payloads (configuration for the vertices might have changed)
+    v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf));
+    w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf));
+
     // all edges are of the same type right now
     EdgeProperty edgeProperty =
         new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE,
@@ -161,6 +168,8 @@ public class DagUtils {
       LocalResource appJarLr, List<LocalResource> additionalLr, FileSystem fs,
       Path mrScratchDir, Context ctx) throws Exception {
 
+    Path tezDir = getTezDir(mrScratchDir);
+
     // map work can contain localwork, i.e: hashtables for map-side joins
     Path hashTableArchive = createHashTables(mapWork, conf);
     LocalResource localWorkLr = null;
@@ -171,17 +180,24 @@ public class DagUtils {
     }
 
     // write out the operator plan
-    Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(),
false);
+    Path planPath = Utilities.setMapWork(conf, mapWork, 
+        mrScratchDir.toUri().toString(), false);
     LocalResource planLr = createLocalResource(fs,
         planPath, LocalResourceType.FILE,
         LocalResourceVisibility.APPLICATION);
 
     // setup input paths and split info
-    List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(),
ctx);
+    List<Path> inputPaths = Utilities.getInputPaths(conf, mapWork, 
+        mrScratchDir.toUri().toString(), ctx);
     Utilities.setInputPaths(conf, inputPaths);
 
-    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, mrScratchDir);
-    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, conf);
+    InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, tezDir);
+
+    // create the directories FileSinkOperators need
+    Utilities.createTmpDirs(conf, mapWork);
+
+    // Tez ask us to call this even if there's no preceding vertex
+    MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null);
 
     // finally create the vertex
     Vertex map = null;
@@ -229,17 +245,13 @@ public class DagUtils {
   private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) {
     JobConf conf = new JobConf(baseConf);
 
-    conf.set(MRJobConfig.REDUCE_CLASS_ATTR, ExecReducer.class.getName());
+    conf.set("mapred.reducer.class", ExecReducer.class.getName());
 
     boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf,
         HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS);
     HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS,
         useSpeculativeExecReducers);
 
-    // reducers should have been set at planning stage
-    // job.setNumberOfReducers(rWork.getNumberOfReducers())
-    conf.set(MRJobConfig.NUM_REDUCES, reduceWork.getNumReduceTasks().toString());
-
     return conf;
   }
 
@@ -252,10 +264,13 @@ public class DagUtils {
 
     // write out the operator plan
     Path planPath = Utilities.setReduceWork(conf, reduceWork,
-        mrScratchDir.getName(), false);
+        mrScratchDir.toUri().toString(), false);
     LocalResource planLr = createLocalResource(fs, planPath,
         LocalResourceType.FILE, LocalResourceVisibility.APPLICATION);
 
+    // create the directories FileSinkOperators need
+    Utilities.createTmpDirs(conf, reduceWork);
+
     // create the vertex
     Vertex reducer = new Vertex("Reducer "+seqNo,
         new ProcessorDescriptor(ReduceProcessor.class.getName(),
@@ -476,7 +491,7 @@ public class DagUtils {
    * @throws URISyntaxException when current jar location cannot be determined.
    */
   public static LocalResource createHiveExecLocalResource(HiveConf conf)
-  throws IOException, LoginException, URISyntaxException {
+      throws IOException, LoginException, URISyntaxException {
     String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY);
     String currentVersionPathStr = getExecJarPathLocal();
     String currentJarName = getResourceBaseName(currentVersionPathStr);
@@ -560,20 +575,17 @@ public class DagUtils {
       }
     }
 
-    conf.set("mapreduce.framework.name","yarn-tez");
-    conf.set("mapreduce.job.output.committer.class", NullOutputCommitter.class.getName());
-
-    conf.setBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, false);
-    conf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false);
+    conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName());
 
-    conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, HiveOutputFormatImpl.class, OutputFormat.class);
+    conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false);
+    conf.setBoolean("mapred.committer.job.task.cleanup.needed", false);
 
-    conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName());
+    conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class);
 
     conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName());
     conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName());
 
-    conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
+    conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER));
 
     return conf;
   }
@@ -631,6 +643,25 @@ public class DagUtils {
     }
   }
 
+  /**
+   * createTezDir creates a temporary directory in the scratchDir folder to
+   * be used with Tez. Assumes scratchDir exists.
+   */
+  public static Path createTezDir(Path scratchDir, Configuration conf) 
+      throws IOException {
+    Path tezDir = getTezDir(scratchDir);
+    FileSystem fs = tezDir.getFileSystem(conf);
+    fs.mkdirs(tezDir);
+    return tezDir;
+  }
+
+  /**
+   * Gets the tez dir that belongs to the hive scratch dir
+   */
+  public static Path getTezDir(Path scratchDir) {
+    return new Path(scratchDir, TEZ_DIR);
+  }
+
   private DagUtils() {
     // don't instantiate
   }

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1513299&r1=1513298&r2=1513299&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Aug
12 23:42:42 2013
@@ -24,16 +24,20 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.DriverContext;
+import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack;
+import org.apache.hadoop.hive.ql.exec.Operator;
 import org.apache.hadoop.hive.ql.exec.Task;
 import org.apache.hadoop.hive.ql.exec.Utilities;
 import org.apache.hadoop.hive.ql.plan.BaseWork;
 import org.apache.hadoop.hive.ql.plan.TezWork;
 import org.apache.hadoop.hive.ql.plan.api.StageType;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.DAG;
@@ -60,15 +64,26 @@ public class TezTask extends Task<TezWor
   @Override
   public int execute(DriverContext driverContext) {
     int rc = 1;
-
-    Context ctx = driverContext.getCtx();
+    boolean cleanContext = false;
+    Context ctx = null;
+    DAGClient client = null;
 
     try {
+      // Get or create Context object. If we create it we have to clean
+      // it later as well.
+      ctx = driverContext.getCtx();
+      if (ctx == null) {
+        ctx = new Context(conf);
+        cleanContext = true;
+      }
 
       // we will localize all the files (jars, plans, hashtables) to the
       // scratch dir. let's create this first.
       Path scratchDir = new Path(ctx.getMRScratchDir());
 
+      // create the tez tmp dir
+      DagUtils.createTezDir(scratchDir, conf);
+
       // jobConf will hold all the configuration for hadoop, tez, and hive
       JobConf jobConf = DagUtils.createConfiguration(conf);
 
@@ -80,7 +95,7 @@ public class TezTask extends Task<TezWor
       DAG dag = build(jobConf, work, scratchDir, appJarLr, ctx);
 
       // submit will send the job to the cluster and start executing
-      DAGClient client = submit(jobConf, dag, scratchDir, appJarLr);
+      client = submit(jobConf, dag, scratchDir, appJarLr);
 
       // finally monitor will print progress until the job is done
       TezJobMonitor monitor = new TezJobMonitor();
@@ -88,13 +103,21 @@ public class TezTask extends Task<TezWor
 
     } catch (Exception e) {
       LOG.error("Failed to execute tez graph.", e);
+      // rc will be 1 at this point indicating failure.
     } finally {
       Utilities.clearWork(conf);
-      try {
-        ctx.clear();
-      } catch (Exception e) {
-        /*best effort*/
-        LOG.warn("Failed to clean up after tez job");
+      if (cleanContext) {
+        try {
+          ctx.clear();
+        } catch (Exception e) {
+          /*best effort*/
+          LOG.warn("Failed to clean up after tez job");
+        }
+      }
+      // need to either move tmp files or remove them
+      if (client != null) {
+        // rc will only be overwritten if close errors out
+        rc = close(work, rc);
       }
     }
     return rc;
@@ -115,6 +138,9 @@ public class TezTask extends Task<TezWor
     List<BaseWork> ws = work.getAllWork();
     Collections.reverse(ws);
 
+    Path tezDir = DagUtils.getTezDir(scratchDir);
+    FileSystem fs = tezDir.getFileSystem(conf);
+
     // the name of the dag is what is displayed in the AM/Job UI
     DAG dag = new DAG(
         Utilities.abbreviate(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING),
@@ -125,8 +151,8 @@ public class TezTask extends Task<TezWor
 
       // translate work to vertex
       JobConf wxConf = DagUtils.initializeVertexConf(conf, w);
-      Vertex wx = DagUtils.createVertex(wxConf, w, scratchDir, i--,
-          appJarLr, additionalLr, scratchDir.getFileSystem(conf), ctx);
+      Vertex wx = DagUtils.createVertex(wxConf, w, tezDir, 
+          i--, appJarLr, additionalLr, fs, ctx);
       dag.addVertex(wx);
       workToVertex.put(w, wx);
       workToConf.put(w, wxConf);
@@ -155,14 +181,42 @@ public class TezTask extends Task<TezWor
     Map<String, LocalResource> amLrs = new HashMap<String, LocalResource>();
     amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr);
 
+    Path tezDir = DagUtils.getTezDir(scratchDir);
+
     // ready to start execution on the cluster
-    DAGClient dagClient = tezClient.submitDAGApplication(dag, scratchDir,
+    DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir,
         null, "default", Collections.singletonList(""), amEnv, amLrs,
         new TezConfiguration(conf));
 
     return dagClient;
   }
 
+  /*
+   * close will move the temp files into the right place for the fetch
+   * task. If the job has failed it will clean up the files.
+   */
+  private int close(TezWork work, int rc) {
+    try {
+      JobCloseFeedBack feedBack = new JobCloseFeedBack();
+      List<BaseWork> ws = work.getAllWork();
+      for (BaseWork w: ws) {
+        List<Operator<?>> ops = w.getAllOperators();
+        for (Operator<?> op: ops) {
+          op.jobClose(conf, rc == 0, feedBack);
+        }
+      }
+    } catch (Exception e) {
+      // jobClose needs to execute successfully otherwise fail task
+      if (rc == 0) {
+        rc = 3;
+        String mesg = "Job Commit failed with exception '" 
+          + Utilities.getNameMessage(e) + "'";
+        console.printError(mesg, "\n" + StringUtils.stringifyException(e));
+      }
+    }
+    return rc;
+  }
+
   @Override
   public boolean isMapRedTask() {
     return true;

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1513299&r1=1513298&r2=1513299&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
(original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
Mon Aug 12 23:42:42 2013
@@ -773,6 +773,21 @@ public final class GenMapRedUtils {
   }
 
   /**
+   * Set key and value descriptor
+   * @param work RedueWork
+   * @param rs ReduceSinkOperator
+   */
+  public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs) {
+    work.setKeyDesc(rs.getConf().getKeySerializeInfo());
+    int tag = Math.max(0, rs.getConf().getTag());
+    List<TableDesc> tagToSchema = work.getTagToValueDesc();
+    while (tag + 1 > tagToSchema.size()) {
+      tagToSchema.add(null);
+    }
+    tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+  }
+
+  /**
    * set key and value descriptor.
    *
    * @param plan
@@ -788,13 +803,7 @@ public final class GenMapRedUtils {
 
     if (topOp instanceof ReduceSinkOperator) {
       ReduceSinkOperator rs = (ReduceSinkOperator) topOp;
-      plan.setKeyDesc(rs.getConf().getKeySerializeInfo());
-      int tag = Math.max(0, rs.getConf().getTag());
-      List<TableDesc> tagToSchema = plan.getTagToValueDesc();
-      while (tag + 1 > tagToSchema.size()) {
-        tagToSchema.add(null);
-      }
-      tagToSchema.set(tag, rs.getConf().getValueSerializeInfo());
+      setKeyAndValueDesc(plan, rs);
     } else {
       List<Operator<? extends OperatorDesc>> children = topOp.getChildOperators();
       if (children != null) {

Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java
URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1513299&r1=1513298&r2=1513299&view=diff
==============================================================================
--- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original)
+++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Aug
12 23:42:42 2013
@@ -109,6 +109,14 @@ public class GenTezWork implements NodeP
         = (ReduceSinkOperator)root.getParentOperators().get(0);
       reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers());
 
+      // need to fill in information about the key and value in the reducer
+      GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink);
+
+      // needs to be fixed in HIVE-5052. This should be driven off of stats
+      if (reduceWork.getNumReduceTasks() <= 0) {
+        reduceWork.setNumReduceTasks(1);
+      }
+
       tezWork.add(reduceWork);
       tezWork.connect(
           context.preceedingWork,
@@ -129,6 +137,17 @@ public class GenTezWork implements NodeP
     // Also note: the concept of leaf and root is reversed in hive for historical
     // reasons. Roots are data sources, leaves are data sinks. I know.
     if (context.leafOperatorToFollowingWork.containsKey(operator)) {
+
+      BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator);
+
+      // need to add this branch to the key + value info
+      assert operator instanceof ReduceSinkOperator 
+        && followingWork instanceof ReduceWork;
+      ReduceSinkOperator rs = (ReduceSinkOperator) operator;
+      ReduceWork rWork = (ReduceWork) followingWork;
+      GenMapRedUtils.setKeyAndValueDesc(rWork, rs);
+
+      // add dependency between the two work items
       tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator));
     }
 
@@ -136,6 +155,7 @@ public class GenTezWork implements NodeP
     // we might have to connect parent work with this work later.
     for (Operator<?> parent: new ArrayList<Operator<?>>(root.getParentOperators()))
{
       assert !context.leafOperatorToFollowingWork.containsKey(parent);
+      assert !(work instanceof MapWork);
       context.leafOperatorToFollowingWork.put(parent, work);
       LOG.debug("Removing " + parent + " as parent from " + root);
       root.removeParent(parent);



Mime
View raw message