Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0674510575 for ; Mon, 12 Aug 2013 23:43:05 +0000 (UTC) Received: (qmail 72323 invoked by uid 500); 12 Aug 2013 23:43:04 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 72296 invoked by uid 500); 12 Aug 2013 23:43:04 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 72288 invoked by uid 99); 12 Aug 2013 23:43:04 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2013 23:43:04 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 12 Aug 2013 23:43:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id B44092388A32; Mon, 12 Aug 2013 23:42:42 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1513299 - in /hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql: exec/tez/DagUtils.java exec/tez/TezTask.java optimizer/GenMapRedUtils.java parse/GenTezWork.java Date: Mon, 12 Aug 2013 23:42:42 -0000 To: commits@hive.apache.org From: gunther@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130812234242.B44092388A32@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gunther Date: Mon Aug 12 23:42:42 2013 New Revision: 1513299 URL: http://svn.apache.org/r1513299 Log: HIVE-5058: Fix NPE issue with DAG submission in TEZ (Gunther Hagleitner) Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java?rev=1513299&r1=1513298&r2=1513299&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/DagUtils.java Mon Aug 12 23:42:42 2013 @@ -82,10 +82,12 @@ import org.apache.tez.mapreduce.processo */ public class DagUtils { + private static final String TEZ_DIR = "_tez_scratch_dir"; + /* * Creates the configuration object necessary to run a specific vertex from * map work. This includes input formats, input processor, etc. -= */ + */ private static JobConf initializeVertexConf(JobConf baseConf, MapWork mapWork) { JobConf conf = new JobConf(baseConf); @@ -124,8 +126,8 @@ public class DagUtils { inpFormat = BucketizedHiveInputFormat.class.getName(); } - conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName()); - conf.set(MRJobConfig.INPUT_FORMAT_CLASS_ATTR, inpFormat); + conf.set("mapred.mapper.class", ExecMapper.class.getName()); + conf.set("mapred.input.format.class", inpFormat); return conf; } @@ -141,11 +143,16 @@ public class DagUtils { * @param w The second vertex (sink) * @return */ - public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) { + public static Edge createEdge(JobConf vConf, Vertex v, JobConf wConf, Vertex w) + throws IOException { // Tez needs to setup output subsequent input pairs correctly MultiStageMRConfToTezTranslator.translateVertexConfToTez(wConf, vConf); + // update payloads (configuration for the vertices might have changed) + v.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(vConf)); + w.getProcessorDescriptor().setUserPayload(MRHelpers.createUserPayloadFromConf(wConf)); + // all edges are of the same type right now EdgeProperty edgeProperty = new EdgeProperty(ConnectionPattern.BIPARTITE, SourceType.STABLE, @@ -161,6 +168,8 @@ public class DagUtils { LocalResource appJarLr, List additionalLr, FileSystem fs, Path mrScratchDir, Context ctx) throws Exception { + Path tezDir = getTezDir(mrScratchDir); + // map work can contain localwork, i.e: hashtables for map-side joins Path hashTableArchive = createHashTables(mapWork, conf); LocalResource localWorkLr = null; @@ -171,17 +180,24 @@ public class DagUtils { } // write out the operator plan - Path planPath = Utilities.setMapWork(conf, mapWork, mrScratchDir.toUri().toString(), false); + Path planPath = Utilities.setMapWork(conf, mapWork, + mrScratchDir.toUri().toString(), false); LocalResource planLr = createLocalResource(fs, planPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); // setup input paths and split info - List inputPaths = Utilities.getInputPaths(conf, mapWork, mrScratchDir.toUri().toString(), ctx); + List inputPaths = Utilities.getInputPaths(conf, mapWork, + mrScratchDir.toUri().toString(), ctx); Utilities.setInputPaths(conf, inputPaths); - InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, mrScratchDir); - MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, conf); + InputSplitInfo inputSplitInfo = MRHelpers.generateInputSplits(conf, tezDir); + + // create the directories FileSinkOperators need + Utilities.createTmpDirs(conf, mapWork); + + // Tez ask us to call this even if there's no preceding vertex + MultiStageMRConfToTezTranslator.translateVertexConfToTez(conf, null); // finally create the vertex Vertex map = null; @@ -229,17 +245,13 @@ public class DagUtils { private static JobConf initializeVertexConf(JobConf baseConf, ReduceWork reduceWork) { JobConf conf = new JobConf(baseConf); - conf.set(MRJobConfig.REDUCE_CLASS_ATTR, ExecReducer.class.getName()); + conf.set("mapred.reducer.class", ExecReducer.class.getName()); boolean useSpeculativeExecReducers = HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVESPECULATIVEEXECREDUCERS); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HADOOPSPECULATIVEEXECREDUCERS, useSpeculativeExecReducers); - // reducers should have been set at planning stage - // job.setNumberOfReducers(rWork.getNumberOfReducers()) - conf.set(MRJobConfig.NUM_REDUCES, reduceWork.getNumReduceTasks().toString()); - return conf; } @@ -252,10 +264,13 @@ public class DagUtils { // write out the operator plan Path planPath = Utilities.setReduceWork(conf, reduceWork, - mrScratchDir.getName(), false); + mrScratchDir.toUri().toString(), false); LocalResource planLr = createLocalResource(fs, planPath, LocalResourceType.FILE, LocalResourceVisibility.APPLICATION); + // create the directories FileSinkOperators need + Utilities.createTmpDirs(conf, reduceWork); + // create the vertex Vertex reducer = new Vertex("Reducer "+seqNo, new ProcessorDescriptor(ReduceProcessor.class.getName(), @@ -476,7 +491,7 @@ public class DagUtils { * @throws URISyntaxException when current jar location cannot be determined. */ public static LocalResource createHiveExecLocalResource(HiveConf conf) - throws IOException, LoginException, URISyntaxException { + throws IOException, LoginException, URISyntaxException { String hiveJarDir = conf.getVar(HiveConf.ConfVars.HIVE_JAR_DIRECTORY); String currentVersionPathStr = getExecJarPathLocal(); String currentJarName = getResourceBaseName(currentVersionPathStr); @@ -560,20 +575,17 @@ public class DagUtils { } } - conf.set("mapreduce.framework.name","yarn-tez"); - conf.set("mapreduce.job.output.committer.class", NullOutputCommitter.class.getName()); - - conf.setBoolean(MRJobConfig.SETUP_CLEANUP_NEEDED, false); - conf.setBoolean(MRJobConfig.TASK_CLEANUP_NEEDED, false); + conf.set("mapred.output.committer.class", NullOutputCommitter.class.getName()); - conf.setClass(MRJobConfig.OUTPUT_FORMAT_CLASS_ATTR, HiveOutputFormatImpl.class, OutputFormat.class); + conf.setBoolean("mapred.committer.job.setup.cleanup.needed", false); + conf.setBoolean("mapred.committer.job.task.cleanup.needed", false); - conf.set(MRJobConfig.MAP_CLASS_ATTR, ExecMapper.class.getName()); + conf.setClass("mapred.output.format.class", HiveOutputFormatImpl.class, OutputFormat.class); conf.set(MRJobConfig.OUTPUT_KEY_CLASS, HiveKey.class.getName()); conf.set(MRJobConfig.OUTPUT_VALUE_CLASS, BytesWritable.class.getName()); - conf.set(MRJobConfig.PARTITIONER_CLASS_ATTR, HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER)); + conf.set("mapred.partitioner.class", HiveConf.getVar(conf, HiveConf.ConfVars.HIVEPARTITIONER)); return conf; } @@ -631,6 +643,25 @@ public class DagUtils { } } + /** + * createTezDir creates a temporary directory in the scratchDir folder to + * be used with Tez. Assumes scratchDir exists. + */ + public static Path createTezDir(Path scratchDir, Configuration conf) + throws IOException { + Path tezDir = getTezDir(scratchDir); + FileSystem fs = tezDir.getFileSystem(conf); + fs.mkdirs(tezDir); + return tezDir; + } + + /** + * Gets the tez dir that belongs to the hive scratch dir + */ + public static Path getTezDir(Path scratchDir) { + return new Path(scratchDir, TEZ_DIR); + } + private DagUtils() { // don't instantiate } Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1513299&r1=1513298&r2=1513299&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Mon Aug 12 23:42:42 2013 @@ -24,16 +24,20 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.Context; import org.apache.hadoop.hive.ql.DriverContext; +import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.plan.BaseWork; import org.apache.hadoop.hive.ql.plan.TezWork; import org.apache.hadoop.hive.ql.plan.api.StageType; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.LocalResource; import org.apache.tez.client.TezClient; import org.apache.tez.dag.api.DAG; @@ -60,15 +64,26 @@ public class TezTask extends Task ws = work.getAllWork(); Collections.reverse(ws); + Path tezDir = DagUtils.getTezDir(scratchDir); + FileSystem fs = tezDir.getFileSystem(conf); + // the name of the dag is what is displayed in the AM/Job UI DAG dag = new DAG( Utilities.abbreviate(HiveConf.getVar(conf, HiveConf.ConfVars.HIVEQUERYSTRING), @@ -125,8 +151,8 @@ public class TezTask extends Task amLrs = new HashMap(); amLrs.put(DagUtils.getBaseName(appJarLr), appJarLr); + Path tezDir = DagUtils.getTezDir(scratchDir); + // ready to start execution on the cluster - DAGClient dagClient = tezClient.submitDAGApplication(dag, scratchDir, + DAGClient dagClient = tezClient.submitDAGApplication(dag, tezDir, null, "default", Collections.singletonList(""), amEnv, amLrs, new TezConfiguration(conf)); return dagClient; } + /* + * close will move the temp files into the right place for the fetch + * task. If the job has failed it will clean up the files. + */ + private int close(TezWork work, int rc) { + try { + JobCloseFeedBack feedBack = new JobCloseFeedBack(); + List ws = work.getAllWork(); + for (BaseWork w: ws) { + List> ops = w.getAllOperators(); + for (Operator op: ops) { + op.jobClose(conf, rc == 0, feedBack); + } + } + } catch (Exception e) { + // jobClose needs to execute successfully otherwise fail task + if (rc == 0) { + rc = 3; + String mesg = "Job Commit failed with exception '" + + Utilities.getNameMessage(e) + "'"; + console.printError(mesg, "\n" + StringUtils.stringifyException(e)); + } + } + return rc; + } + @Override public boolean isMapRedTask() { return true; Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java?rev=1513299&r1=1513298&r2=1513299&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java Mon Aug 12 23:42:42 2013 @@ -773,6 +773,21 @@ public final class GenMapRedUtils { } /** + * Set key and value descriptor + * @param work RedueWork + * @param rs ReduceSinkOperator + */ + public static void setKeyAndValueDesc(ReduceWork work, ReduceSinkOperator rs) { + work.setKeyDesc(rs.getConf().getKeySerializeInfo()); + int tag = Math.max(0, rs.getConf().getTag()); + List tagToSchema = work.getTagToValueDesc(); + while (tag + 1 > tagToSchema.size()) { + tagToSchema.add(null); + } + tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); + } + + /** * set key and value descriptor. * * @param plan @@ -788,13 +803,7 @@ public final class GenMapRedUtils { if (topOp instanceof ReduceSinkOperator) { ReduceSinkOperator rs = (ReduceSinkOperator) topOp; - plan.setKeyDesc(rs.getConf().getKeySerializeInfo()); - int tag = Math.max(0, rs.getConf().getTag()); - List tagToSchema = plan.getTagToValueDesc(); - while (tag + 1 > tagToSchema.size()) { - tagToSchema.add(null); - } - tagToSchema.set(tag, rs.getConf().getValueSerializeInfo()); + setKeyAndValueDesc(plan, rs); } else { List> children = topOp.getChildOperators(); if (children != null) { Modified: hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java URL: http://svn.apache.org/viewvc/hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java?rev=1513299&r1=1513298&r2=1513299&view=diff ============================================================================== --- hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java (original) +++ hive/branches/tez/ql/src/java/org/apache/hadoop/hive/ql/parse/GenTezWork.java Mon Aug 12 23:42:42 2013 @@ -109,6 +109,14 @@ public class GenTezWork implements NodeP = (ReduceSinkOperator)root.getParentOperators().get(0); reduceWork.setNumReduceTasks(reduceSink.getConf().getNumReducers()); + // need to fill in information about the key and value in the reducer + GenMapRedUtils.setKeyAndValueDesc(reduceWork, reduceSink); + + // needs to be fixed in HIVE-5052. This should be driven off of stats + if (reduceWork.getNumReduceTasks() <= 0) { + reduceWork.setNumReduceTasks(1); + } + tezWork.add(reduceWork); tezWork.connect( context.preceedingWork, @@ -129,6 +137,17 @@ public class GenTezWork implements NodeP // Also note: the concept of leaf and root is reversed in hive for historical // reasons. Roots are data sources, leaves are data sinks. I know. if (context.leafOperatorToFollowingWork.containsKey(operator)) { + + BaseWork followingWork = context.leafOperatorToFollowingWork.get(operator); + + // need to add this branch to the key + value info + assert operator instanceof ReduceSinkOperator + && followingWork instanceof ReduceWork; + ReduceSinkOperator rs = (ReduceSinkOperator) operator; + ReduceWork rWork = (ReduceWork) followingWork; + GenMapRedUtils.setKeyAndValueDesc(rWork, rs); + + // add dependency between the two work items tezWork.connect(work, context.leafOperatorToFollowingWork.get(operator)); } @@ -136,6 +155,7 @@ public class GenTezWork implements NodeP // we might have to connect parent work with this work later. for (Operator parent: new ArrayList>(root.getParentOperators())) { assert !context.leafOperatorToFollowingWork.containsKey(parent); + assert !(work instanceof MapWork); context.leafOperatorToFollowingWork.put(parent, work); LOG.debug("Removing " + parent + " as parent from " + root); root.removeParent(parent);