Return-Path: X-Original-To: apmail-tez-commits-archive@minotaur.apache.org Delivered-To: apmail-tez-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DBD85101B8 for ; Thu, 18 Apr 2013 23:56:28 +0000 (UTC) Received: (qmail 86008 invoked by uid 500); 18 Apr 2013 23:56:28 -0000 Delivered-To: apmail-tez-commits-archive@tez.apache.org Received: (qmail 85984 invoked by uid 500); 18 Apr 2013 23:56:28 -0000 Mailing-List: contact commits-help@tez.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@tez.apache.org Delivered-To: mailing list commits@tez.apache.org Received: (qmail 85975 invoked by uid 99); 18 Apr 2013 23:56:28 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Apr 2013 23:56:28 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Apr 2013 23:56:26 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 72A8B2388CB2; Thu, 18 Apr 2013 23:54:58 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1469642 [36/36] - in /incubator/tez/branches/TEZ-1: ./ example_jobs/ example_jobs/sampleInput/ example_jobs/wc_mr_6m_1r/ example_jobs/wc_mrr_6m_3r_3r/ ljr_helper/ tez-common/ tez-common/src/ tez-common/src/main/ tez-common/src/main/java/ t... Date: Thu, 18 Apr 2013 23:54:28 -0000 To: commits@tez.incubator.apache.org From: hitesh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130418235458.72A8B2388CB2@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java (added) +++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,1030 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.tez.mapreduce; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.Vector; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileContext; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.UnsupportedFileSystemException; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.io.DataOutputBuffer; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.ipc.ProtocolSignature; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TaskLog; +import org.apache.hadoop.mapred.TaskLog.LogName; +import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus; +import org.apache.hadoop.mapreduce.ClusterMetrics; +import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.JobStatus; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.DAGConfiguration; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.EdgeProperty; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.api.VertexLocationHint.TaskLocationHint; +import org.apache.tez.mapreduce.hadoop.MRJobConfig; +import org.apache.tez.mapreduce.hadoop.MultiStageMRConfigUtil; +import org.apache.zookeeper.Environment.Entry; +import org.apache.hadoop.mapreduce.QueueAclsInfo; +import org.apache.hadoop.mapreduce.QueueInfo; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskCompletionEvent; +import org.apache.hadoop.mapreduce.TaskReport; +import org.apache.hadoop.mapreduce.TaskTrackerInfo; +import org.apache.hadoop.mapreduce.TaskType; +import org.apache.hadoop.mapreduce.TypeConverter; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo; +import org.apache.hadoop.mapreduce.split.SplitMetaInfoReader; +import org.apache.hadoop.mapreduce.v2.LogParams; +import org.apache.hadoop.mapreduce.v2.api.MRClientProtocol; +import org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils; +import org.apache.hadoop.mapreduce.v2.util.MRApps; +import org.apache.hadoop.security.Credentials; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.api.ApplicationConstants; +import org.apache.hadoop.yarn.api.ApplicationConstants.Environment; +import org.apache.hadoop.yarn.api.records.ApplicationAccessType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; +import org.apache.hadoop.yarn.api.records.LocalResource; +import org.apache.hadoop.yarn.api.records.LocalResourceType; +import org.apache.hadoop.yarn.api.records.LocalResourceVisibility; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.URL; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Apps; +import org.apache.hadoop.yarn.util.BuilderUtils; +import org.apache.hadoop.yarn.util.ConverterUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class enables the current JobClient (0.22 hadoop) to run on YARN-TEZ. + */ +@SuppressWarnings({ "unchecked" }) +public class YARNRunner implements ClientProtocol { + + private static final Log LOG = LogFactory.getLog(YARNRunner.class); + + private final RecordFactory recordFactory = + RecordFactoryProvider.getRecordFactory(null); + private ResourceMgrDelegate resMgrDelegate; + private ClientCache clientCache; + private Configuration conf; + private final FileContext defaultFileContext; + + private static final Object classpathLock = new Object(); + private static AtomicBoolean initialClasspathFlag = new AtomicBoolean(); + private static String initialClasspath = null; + + final public static FsPermission DAG_FILE_PERMISSION = + FsPermission.createImmutable((short) 0644); + + /** + * Yarn runner incapsulates the client interface of + * yarn + * @param conf the configuration object for the client + */ + public YARNRunner(Configuration conf) { + this(conf, new ResourceMgrDelegate(new YarnConfiguration(conf))); + } + + /** + * Similar to {@link #YARNRunner(Configuration)} but allowing injecting + * {@link ResourceMgrDelegate}. Enables mocking and testing. + * @param conf the configuration object for the client + * @param resMgrDelegate the resourcemanager client handle. + */ + public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate) { + this(conf, resMgrDelegate, new ClientCache(conf, resMgrDelegate)); + } + + /** + * Similar to {@link YARNRunner#YARNRunner(Configuration, ResourceMgrDelegate)} + * but allowing injecting {@link ClientCache}. Enable mocking and testing. + * @param conf the configuration object + * @param resMgrDelegate the resource manager delegate + * @param clientCache the client cache object. + */ + public YARNRunner(Configuration conf, ResourceMgrDelegate resMgrDelegate, + ClientCache clientCache) { + this.conf = conf; + try { + this.resMgrDelegate = resMgrDelegate; + this.clientCache = clientCache; + this.defaultFileContext = FileContext.getFileContext(this.conf); + } catch (UnsupportedFileSystemException ufe) { + throw new RuntimeException("Error in instantiating YarnClient", ufe); + } + } + + @Private + /** + * Used for testing mostly. + * @param resMgrDelegate the resource manager delegate to set to. + */ + public void setResourceMgrDelegate(ResourceMgrDelegate resMgrDelegate) { + this.resMgrDelegate = resMgrDelegate; + } + + @Override + public void cancelDelegationToken(Token arg0) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Use Token.renew instead"); + } + + @Override + public TaskTrackerInfo[] getActiveTrackers() throws IOException, + InterruptedException { + return resMgrDelegate.getActiveTrackers(); + } + + @Override + public JobStatus[] getAllJobs() throws IOException, InterruptedException { + return resMgrDelegate.getAllJobs(); + } + + @Override + public TaskTrackerInfo[] getBlacklistedTrackers() throws IOException, + InterruptedException { + return resMgrDelegate.getBlacklistedTrackers(); + } + + @Override + public ClusterMetrics getClusterMetrics() throws IOException, + InterruptedException { + return resMgrDelegate.getClusterMetrics(); + } + + @VisibleForTesting + Token getDelegationTokenFromHS(MRClientProtocol hsProxy) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("No HistoryServer for Tez"); + } + + @Override + public Token getDelegationToken(Text renewer) + throws IOException, InterruptedException { + // The token is only used for serialization. So the type information + // mismatch should be fine. + return resMgrDelegate.getDelegationToken(renewer); + } + + @Override + public String getFilesystemName() throws IOException, InterruptedException { + return resMgrDelegate.getFilesystemName(); + } + + @Override + public JobID getNewJobID() throws IOException, InterruptedException { + return resMgrDelegate.getNewJobID(); + } + + @Override + public QueueInfo getQueue(String queueName) throws IOException, + InterruptedException { + return resMgrDelegate.getQueue(queueName); + } + + @Override + public QueueAclsInfo[] getQueueAclsForCurrentUser() throws IOException, + InterruptedException { + return resMgrDelegate.getQueueAclsForCurrentUser(); + } + + @Override + public QueueInfo[] getQueues() throws IOException, InterruptedException { + return resMgrDelegate.getQueues(); + } + + @Override + public QueueInfo[] getRootQueues() throws IOException, InterruptedException { + return resMgrDelegate.getRootQueues(); + } + + @Override + public QueueInfo[] getChildQueues(String parent) throws IOException, + InterruptedException { + return resMgrDelegate.getChildQueues(parent); + } + + @Override + public String getStagingAreaDir() throws IOException, InterruptedException { + return resMgrDelegate.getStagingAreaDir(); + } + + @Override + public String getSystemDir() throws IOException, InterruptedException { + return resMgrDelegate.getSystemDir(); + } + + @Override + public long getTaskTrackerExpiryInterval() throws IOException, + InterruptedException { + return resMgrDelegate.getTaskTrackerExpiryInterval(); + } + + private Map createJobLocalResources( + Configuration jobConf, String jobSubmitDir) + throws IOException { + + // Setup LocalResources + Map localResources = + new HashMap(); + + Path jobConfPath = new Path(jobSubmitDir, MRJobConfig.JOB_CONF_FILE); + + URL yarnUrlForJobSubmitDir = ConverterUtils + .getYarnUrlFromPath(defaultFileContext.getDefaultFileSystem() + .resolvePath( + defaultFileContext.makeQualified(new Path(jobSubmitDir)))); + LOG.debug("Creating setup context, jobSubmitDir url is " + + yarnUrlForJobSubmitDir); + + localResources.put(MRJobConfig.JOB_CONF_FILE, + createApplicationResource(defaultFileContext, + jobConfPath, LocalResourceType.FILE)); + if (jobConf.get(MRJobConfig.JAR) != null) { + Path jobJarPath = new Path(jobConf.get(MRJobConfig.JAR)); + LocalResource rc = createApplicationResource(defaultFileContext, + jobJarPath, + LocalResourceType.FILE); + // FIXME fix pattern support + // String pattern = conf.getPattern(JobContext.JAR_UNPACK_PATTERN, + // JobConf.UNPACK_JAR_PATTERN_DEFAULT).pattern(); + // rc.setPattern(pattern); + localResources.put(MRJobConfig.JOB_JAR, rc); + } else { + // Job jar may be null. For e.g, for pipes, the job jar is the hadoop + // mapreduce jar itself which is already on the classpath. + LOG.info("Job jar is not present. " + + "Not adding any jar to the list of resources."); + } + + // TODO gross hack + for (String s : new String[] { + MRJobConfig.JOB_SPLIT, + MRJobConfig.JOB_SPLIT_METAINFO, + MRJobConfig.APPLICATION_TOKENS_FILE }) { + localResources.put(s, + createApplicationResource(defaultFileContext, + new Path(jobSubmitDir, s), LocalResourceType.FILE)); + } + + MRApps.setupDistributedCache(jobConf, localResources); + + return localResources; + } + + // FIXME isn't this a nice mess of a client? + // read input, write splits, read splits again + private TaskLocationHint[] getMapLocationHintsFromInputSplits(JobID jobId, + FileSystem fs, Configuration conf, + String jobSubmitDir) throws IOException { + LOG.info("XXXX Reading splits information"); + TaskSplitMetaInfo[] splitsInfo = + SplitMetaInfoReader.readSplitMetaInfo(jobId, fs, conf, + new Path(jobSubmitDir)); + int splitsCount = splitsInfo.length; + LOG.info("XXXX Found splits information, splitCount=" + splitsCount); + TaskLocationHint[] locationHints = + new TaskLocationHint[splitsCount]; + for (int i = 0; i < splitsCount; ++i) { + TaskLocationHint locationHint = + new TaskLocationHint(splitsInfo[i].getLocations(), null); + locationHints[i] = locationHint; + } + return locationHints; + } + + private static String getInitialClasspath(Configuration conf) + throws IOException { + synchronized (classpathLock) { + if (initialClasspathFlag.get()) { + return initialClasspath; + } + Map env = new HashMap(); + MRApps.setClasspath(env, conf); + initialClasspath = env.get(Environment.CLASSPATH.name()); + initialClasspathFlag.set(true); + return initialClasspath; + } + } + + private void setupCommonChildEnv(Configuration conf, + Map environment) throws IOException { + + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), + getInitialClasspath(conf)); + + // Shell + environment.put(Environment.SHELL.name(), conf.get( + MRJobConfig.MAPRED_ADMIN_USER_SHELL, MRJobConfig.DEFAULT_SHELL)); + + // Add pwd to LD_LIBRARY_PATH, add this before adding anything else + Apps.addToEnvironment(environment, Environment.LD_LIBRARY_PATH.name(), + Environment.PWD.$()); + + // Add the env variables passed by the admin + Apps.setEnvFromInputString(environment, conf.get( + MRJobConfig.MAPRED_ADMIN_USER_ENV, + MRJobConfig.DEFAULT_MAPRED_ADMIN_USER_ENV)); + + // FIXME is this really required? + // Add stdout/stderr env + environment.put( + MRJobConfig.STDOUT_LOGFILE_ENV, + getTaskLogFile(TaskLog.LogName.STDOUT) + ); + environment.put( + MRJobConfig.STDERR_LOGFILE_ENV, + getTaskLogFile(TaskLog.LogName.STDERR) + ); + + } + + private static String getChildEnv(Configuration jobConf, boolean isMap) { + if (isMap) { + return jobConf.get(MRJobConfig.MAP_ENV, ""); + } + return jobConf.get(MRJobConfig.REDUCE_ENV, ""); + } + + private static String getChildLogLevel(Configuration conf, boolean isMap) { + if (isMap) { + return conf.get( + MRJobConfig.MAP_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } else { + return conf.get( + MRJobConfig.REDUCE_LOG_LEVEL, + JobConf.DEFAULT_LOG_LEVEL.toString() + ); + } + } + + private static String getTaskLogFile(LogName filter) { + return ApplicationConstants.LOG_DIR_EXPANSION_VAR + Path.SEPARATOR + + filter.toString(); + } + + private static void setupLog4jProperties(Configuration jobConf, + boolean isMap, + Vector vargs, + long logSize) { + String logLevel = getChildLogLevel(jobConf, isMap); + MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); + } + + private void setupMapReduceEnv(Configuration jobConf, + Map environment, boolean isMap) throws IOException { + + if (isMap) { + warnForJavaLibPath( + conf.get(MRJobConfig.MAP_JAVA_OPTS,""), + "map", + MRJobConfig.MAP_JAVA_OPTS, + MRJobConfig.MAP_ENV); + warnForJavaLibPath( + conf.get(MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS,""), + "map", + MRJobConfig.MAPRED_MAP_ADMIN_JAVA_OPTS, + MRJobConfig.MAPRED_ADMIN_USER_ENV); + } else { + warnForJavaLibPath( + conf.get(MRJobConfig.REDUCE_JAVA_OPTS,""), + "reduce", + MRJobConfig.REDUCE_JAVA_OPTS, + MRJobConfig.REDUCE_ENV); + warnForJavaLibPath( + conf.get(MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS,""), + "reduce", + MRJobConfig.MAPRED_REDUCE_ADMIN_JAVA_OPTS, + MRJobConfig.MAPRED_ADMIN_USER_ENV); + } + + setupCommonChildEnv(jobConf, environment); + + // Add the env variables passed by the user + String mapredChildEnv = getChildEnv(jobConf, isMap); + Apps.setEnvFromInputString(environment, mapredChildEnv); + + // Set logging level in the environment. + // This is so that, if the child forks another "bin/hadoop" (common in + // streaming) it will have the correct loglevel. + environment.put( + "HADOOP_ROOT_LOGGER", + getChildLogLevel(jobConf, isMap) + ",CLA"); + + // FIXME: don't think this is also needed given we already set java + // properties. + // TODO Change this not to use JobConf. + long logSize = TaskLog.getTaskLogLength(new JobConf(jobConf)); + Vector logProps = new Vector(4); + setupLog4jProperties(jobConf, isMap, logProps, logSize); + Iterator it = logProps.iterator(); + StringBuffer buffer = new StringBuffer(); + while (it.hasNext()) { + buffer.append(" " + it.next()); + } + + // FIXME supposedly required for streaming, should we remove it and let + // YARN set it for all containers? + String hadoopClientOpts = System.getenv("HADOOP_CLIENT_OPTS"); + if (hadoopClientOpts == null) { + hadoopClientOpts = ""; + } else { + hadoopClientOpts = hadoopClientOpts + " "; + } + hadoopClientOpts = hadoopClientOpts + buffer.toString(); + // environment.put("HADOOP_CLIENT_OPTS", hadoopClientOpts); + + // FIXME for this to work, we need YARN-561 and the task runtime changed + // to use YARN-561 + + addTezClasspathToEnv(conf, environment); + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), + getInitialClasspath(conf)); + + LOG.info("XXXX Dumping out env for child, isMap=" + isMap); + for (Map.Entry entry : environment.entrySet()) { + LOG.info("XXXX env entry: " + + entry.getKey() + + "=" + entry.getValue()); + } + } + + private Vertex configureReduceStage(FileSystem fs, JobID jobId, + Configuration jobConf, String jobSubmitDir, Credentials ts, + Map jobLocalResources, int iReduceIndex) + throws IOException { + int stageNum = iReduceIndex + 1; + Configuration conf = MultiStageMRConfigUtil.getIntermediateStageConf(jobConf, stageNum); + int numTasks = conf.getInt(MRJobConfig.NUM_REDUCES, 0); + Vertex vertex = new Vertex( + MultiStageMRConfigUtil.getIntermediateReduceVertexName(stageNum), + "org.apache.tez.mapreduce.task.IntermediateTask", numTasks); + + Map reduceEnv = new HashMap(); + setupMapReduceEnv(conf, reduceEnv, false); + + Resource reduceResource = BuilderUtils.newResource(conf.getInt( + MRJobConfig.REDUCE_MEMORY_MB, MRJobConfig.DEFAULT_REDUCE_MEMORY_MB), + conf.getInt(MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)); + + Map reduceLocalResources = new TreeMap(); + reduceLocalResources.putAll(jobLocalResources); + // TODO MRR Don't bother localizing the input splits for the reduce vertices. + + vertex.setTaskEnvironment(reduceEnv); + vertex.setTaskLocalResources(reduceLocalResources); + vertex.setTaskLocationsHint(null); + vertex.setTaskResource(reduceResource); + + return vertex; + } + + private Vertex[] configureMultStageMRR(FileSystem fs, JobID jobId, + JobConf jobConf, String jobSubmitDir, Credentials ts, + Map jobLocalResources, DAG dag) throws IOException { + + int numIntermediateStages = MultiStageMRConfigUtil + .getNumIntermediateStages(jobConf); + + Vertex[] vertices = new Vertex[numIntermediateStages]; + + for (int i = 0; i < numIntermediateStages; i++) { + vertices[i] = configureReduceStage(fs, jobId, jobConf, jobSubmitDir, ts, + jobLocalResources, i); + dag.addVertex(vertices[i]); + } + return vertices; + } + + private DAG createDAG(FileSystem fs, JobID jobId, JobConf jobConf, + String jobSubmitDir, Credentials ts, + Map jobLocalResources) throws IOException { + + DAG dag = new DAG(); + + int numMaps = jobConf.getInt(MRJobConfig.NUM_MAPS, 0); + int numReduces = jobConf.getInt(MRJobConfig.NUM_REDUCES, 0); + int intermediateReduces = jobConf.getInt( + MRJobConfig.MRR_INTERMEDIATE_STAGES, 0); + + boolean mapOnly = + (numMaps > 0 && numReduces == 0 && intermediateReduces == 0); + boolean isMRR = (intermediateReduces > 0); + + LOG.info("XXXX Parsing job config" + + ", numMaps=" + numMaps + + ", numReduces=" + numReduces + + ", intermediateReduces=" + intermediateReduces); + + // configure map vertex + String mapProcessor = mapOnly ? + "org.apache.tez.mapreduce.task.MapOnlyTask" + : "org.apache.tez.mapreduce.task.InitialTask"; + Vertex mapVertex = new Vertex("map", mapProcessor, numMaps); + + // FIXME set up map environment + Map mapEnv = new HashMap(); + setupMapReduceEnv(jobConf, mapEnv, true); + + TaskLocationHint[] inputSplitLocations = + getMapLocationHintsFromInputSplits(jobId, fs, jobConf, jobSubmitDir); + + Resource mapResource = BuilderUtils.newResource( + jobConf.getInt(MRJobConfig.MAP_MEMORY_MB, + MRJobConfig.DEFAULT_MAP_MEMORY_MB), + jobConf.getInt(MRJobConfig.MAP_CPU_VCORES, + MRJobConfig.DEFAULT_MAP_CPU_VCORES)); + + Map mapLocalResources = + new TreeMap(); + mapLocalResources.putAll(jobLocalResources); + + mapVertex.setTaskEnvironment(mapEnv); + mapVertex.setTaskLocalResources(mapLocalResources); + mapVertex.setTaskLocationsHint(inputSplitLocations); + mapVertex.setTaskResource(mapResource); + + LOG.info("XXXX Adding map vertex to DAG" + + ", vertexName=" + mapVertex.getVertexName() + + ", processor=" + mapVertex.getProcessorName() + + ", parrellism=" + mapVertex.getParallelism()); + dag.addVertex(mapVertex); + + Vertex[] intermediateVertices = null; + // configure intermediate reduces + if (isMRR) { + intermediateVertices = configureMultStageMRR(fs, jobId, jobConf, + jobSubmitDir, ts, jobLocalResources, dag); + } + + // configure final reduce vertex + if (numReduces > 0) { + String reduceProcessor = + "org.apache.tez.mapreduce.task.FinalTask"; + Vertex reduceVertex = new Vertex("reduce", reduceProcessor, numReduces); + + // FIXME set up reduce environment + Map reduceEnv = new HashMap(); + setupMapReduceEnv(jobConf, reduceEnv, false); + + Resource reduceResource = BuilderUtils.newResource( + jobConf.getInt(MRJobConfig.REDUCE_MEMORY_MB, + MRJobConfig.DEFAULT_REDUCE_MEMORY_MB), + jobConf.getInt(MRJobConfig.REDUCE_CPU_VCORES, + MRJobConfig.DEFAULT_REDUCE_CPU_VCORES)); + + Map reduceLocalResources = + new TreeMap(); + reduceLocalResources.putAll(jobLocalResources); + + reduceVertex.setTaskEnvironment(reduceEnv); + reduceVertex.setTaskLocalResources(reduceLocalResources); + reduceVertex.setTaskLocationsHint(null); + reduceVertex.setTaskResource(reduceResource); + + LOG.info("XXXX Adding reduce vertex to DAG" + + ", vertexName=" + reduceVertex.getVertexName() + + ", processor=" + reduceVertex.getProcessorName() + + ", parrellism=" + reduceVertex.getParallelism()); + dag.addVertex(reduceVertex); + + EdgeProperty edgeProperty = new EdgeProperty(); + Edge edge = null; + if (!isMRR) { + edge = new Edge(mapVertex, reduceVertex, edgeProperty); + dag.addEdge(edge); + } else { + + Edge initialEdge = new Edge(mapVertex, intermediateVertices[0], + edgeProperty); + dag.addEdge(initialEdge); + + int numIntermediateEdges = intermediateVertices.length - 1; + for (int i = 0; i < numIntermediateEdges; i++) { + Edge iEdge = new Edge(intermediateVertices[i], + intermediateVertices[i + 1], edgeProperty); + dag.addEdge(iEdge); + } + + Edge finalEdge = new Edge( + intermediateVertices[intermediateVertices.length - 1], + reduceVertex, edgeProperty); + dag.addEdge(finalEdge); + } + + } + + return dag; + } + + private void addTezClasspathToEnv(Configuration conf, + Map environment) { + for (String c : conf.getStrings( + TezConfiguration.TEZ_APPLICATION_CLASSPATH, + TezConfiguration.DEFAULT_TEZ_APPLICATION_CLASSPATH)) { + Apps.addToEnvironment(environment, + ApplicationConstants.Environment.CLASSPATH.name(), c.trim()); + } + } + + private ApplicationSubmissionContext createApplicationSubmissionContext( + FileSystem fs, DAG dag, + Configuration jobConf, String jobSubmitDir, Credentials ts, + Map jobLocalResources) throws IOException { + + ApplicationId applicationId = resMgrDelegate.getApplicationId(); + + // Setup resource requirements + Resource capability = recordFactory.newRecordInstance(Resource.class); + capability.setMemory( + conf.getInt(TezConfiguration.DAG_AM_RESOURCE_MEMORY_MB, + TezConfiguration.DEFAULT_DAG_AM_RESOURCE_MEMORY_MB)); + capability.setVirtualCores( + conf.getInt(TezConfiguration.DAG_AM_RESOURCE_CPU_VCORES, + TezConfiguration.DEFAULT_DAG_AM_RESOURCE_CPU_VCORES)); + LOG.debug("AppMaster capability = " + capability); + + // Setup security tokens + DataOutputBuffer dob = new DataOutputBuffer(); + ts.writeTokenStorageToStream(dob); + ByteBuffer securityTokens = ByteBuffer.wrap(dob.getData(), + 0, dob.getLength()); + + // Setup the command to run the AM + List vargs = new ArrayList(8); + vargs.add(Environment.JAVA_HOME.$() + "/bin/java"); + + // FIXME set up logging related properties + // TODO -Dtez.root.logger?? + // MRApps.addLog4jSystemProperties(logLevel, logSize, vargs); + + // FIXME admin command opts and user command opts for tez? + String mrAppMasterAdminOptions = conf.get(MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, + MRJobConfig.DEFAULT_MR_AM_ADMIN_COMMAND_OPTS); + warnForJavaLibPath(mrAppMasterAdminOptions, "app master", + MRJobConfig.MR_AM_ADMIN_COMMAND_OPTS, MRJobConfig.MR_AM_ADMIN_USER_ENV); + vargs.add(mrAppMasterAdminOptions); + + // Add AM user command opts + String mrAppMasterUserOptions = conf.get(MRJobConfig.MR_AM_COMMAND_OPTS, + MRJobConfig.DEFAULT_MR_AM_COMMAND_OPTS); + warnForJavaLibPath(mrAppMasterUserOptions, "app master", + MRJobConfig.MR_AM_COMMAND_OPTS, MRJobConfig.MR_AM_ENV); + vargs.add(mrAppMasterUserOptions); + + vargs.add(MRJobConfig.APPLICATION_MASTER_CLASS); + vargs.add("1>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDOUT); + vargs.add("2>" + ApplicationConstants.LOG_DIR_EXPANSION_VAR + + Path.SEPARATOR + ApplicationConstants.STDERR); + + + Vector vargsFinal = new Vector(8); + // Final command + StringBuilder mergedCommand = new StringBuilder(); + for (CharSequence str : vargs) { + mergedCommand.append(str).append(" "); + } + vargsFinal.add(mergedCommand.toString()); + + LOG.debug("Command to launch container for ApplicationMaster is : " + + mergedCommand); + + // Setup the CLASSPATH in environment + // i.e. add { Hadoop jars, job jar, CWD } to classpath. + Map environment = new HashMap(); + addTezClasspathToEnv(conf, environment); + Apps.addToEnvironment(environment, Environment.CLASSPATH.name(), + getInitialClasspath(conf)); + + // Setup the environment variables for Admin first + MRApps.setEnvFromInputString(environment, + conf.get(MRJobConfig.MR_AM_ADMIN_USER_ENV)); + // Setup the environment variables (LD_LIBRARY_PATH, etc) + MRApps.setEnvFromInputString(environment, + conf.get(MRJobConfig.MR_AM_ENV)); + + // FIXME remove this + Map localResources = + new TreeMap(); + + localResources.putAll(jobLocalResources); + + // FIXME add serialized dag conf + DAGConfiguration dagConf = dag.serializeDag(); + + Path dagConfFilePath = new Path(jobSubmitDir, + TezConfiguration.DAG_AM_PLAN_CONFIG_XML); + FSDataOutputStream dagConfOut = + FileSystem.create(fs, dagConfFilePath, + new FsPermission(DAG_FILE_PERMISSION)); + try { + dagConf.writeXml(dagConfOut); + } finally { + dagConfOut.close(); + } + localResources.put(TezConfiguration.DAG_AM_PLAN_CONFIG_XML, + createApplicationResource(defaultFileContext, + dagConfFilePath, LocalResourceType.FILE)); + + // FIXME add tez conf if needed + + // FIXME are we using MR acls for tez? + Map acls + = new HashMap(); + + // Setup ContainerLaunchContext for AM container + ContainerLaunchContext amContainer = BuilderUtils + .newContainerLaunchContext(UserGroupInformation + .getCurrentUser().getShortUserName(), localResources, + environment, vargsFinal, null, securityTokens, acls); + + // Set up the ApplicationSubmissionContext + ApplicationSubmissionContext appContext = + recordFactory.newRecordInstance(ApplicationSubmissionContext.class); + appContext.setApplicationId(applicationId); // ApplicationId + appContext.setResource(capability); // resource + appContext.setQueue( // Queue name + jobConf.get(JobContext.QUEUE_NAME, + YarnConfiguration.DEFAULT_QUEUE_NAME)); + appContext.setApplicationName( // Job name + jobConf.get(JobContext.JOB_NAME, + YarnConfiguration.DEFAULT_APPLICATION_NAME)); + appContext.setCancelTokensWhenComplete( + conf.getBoolean(MRJobConfig.JOB_CANCEL_DELEGATION_TOKEN, true)); + appContext.setAMContainerSpec(amContainer); // AM Container + + return appContext; + } + + @Override + public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) + throws IOException, InterruptedException { + + // Upload only in security mode: TODO + Path applicationTokensFile = + new Path(jobSubmitDir, MRJobConfig.APPLICATION_TOKENS_FILE); + try { + ts.writeTokenStorageFile(applicationTokensFile, conf); + } catch (IOException e) { + throw new YarnException(e); + } + + FileSystem fs = FileSystem.get(conf); + JobConf jobConf = new JobConf(new TezConfiguration(conf)); + + // FIXME set up job resources + Map jobLocalResources = + createJobLocalResources(jobConf, jobSubmitDir); + DAG dag = createDAG(fs, jobId, jobConf, jobSubmitDir, ts, + jobLocalResources); + ApplicationSubmissionContext appContext = + createApplicationSubmissionContext(fs, dag, jobConf, jobSubmitDir, ts, + jobLocalResources); + + // Submit to ResourceManager + ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); + + ApplicationReport appMasterReport = resMgrDelegate + .getApplicationReport(applicationId); + String diagnostics = + (appMasterReport == null ? + "application report is null" : appMasterReport.getDiagnostics()); + if (appMasterReport == null + || appMasterReport.getYarnApplicationState() == YarnApplicationState.FAILED + || appMasterReport.getYarnApplicationState() == YarnApplicationState.KILLED) { + throw new IOException("Failed to run job : " + + diagnostics); + } + + while (true) { + appMasterReport = resMgrDelegate + .getApplicationReport(applicationId); + diagnostics = + (appMasterReport == null ? + "application report is null" : appMasterReport.getDiagnostics()); + if (appMasterReport == null) { + throw new IOException("Failed to run job : " + + diagnostics); + } + YarnApplicationState state = appMasterReport.getYarnApplicationState(); + if (state.equals(YarnApplicationState.FAILED) + || state.equals(YarnApplicationState.FINISHED) + || state.equals(YarnApplicationState.KILLED)) { + LOG.info("Job completed" + + ", finalStatus=" + appMasterReport.getFinalApplicationStatus() + + ", finalState=" + appMasterReport.getYarnApplicationState() + + ", diagnostics=" + diagnostics); + break; + } else { + LOG.info("Job in progress" + + ", finalStatus=" + appMasterReport.getFinalApplicationStatus() + + ", finalState=" + appMasterReport.getYarnApplicationState() + + ", diagnostics=" + diagnostics); + } + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } + } + + // FIXME + return clientCache.getClient(jobId).getJobStatus(jobId); + } + + private LocalResource createApplicationResource(FileContext fs, Path p, LocalResourceType type) + throws IOException { + LocalResource rsrc = recordFactory.newRecordInstance(LocalResource.class); + FileStatus rsrcStat = fs.getFileStatus(p); + rsrc.setResource(ConverterUtils.getYarnUrlFromPath(fs + .getDefaultFileSystem().resolvePath(rsrcStat.getPath()))); + rsrc.setSize(rsrcStat.getLen()); + rsrc.setTimestamp(rsrcStat.getModificationTime()); + rsrc.setType(type); + rsrc.setVisibility(LocalResourceVisibility.APPLICATION); + return rsrc; + } + + @Override + public void setJobPriority(JobID arg0, String arg1) throws IOException, + InterruptedException { + resMgrDelegate.setJobPriority(arg0, arg1); + } + + @Override + public long getProtocolVersion(String arg0, long arg1) throws IOException { + return resMgrDelegate.getProtocolVersion(arg0, arg1); + } + + @Override + public long renewDelegationToken(Token arg0) + throws IOException, InterruptedException { + throw new UnsupportedOperationException("Use Token.renew instead"); + } + + + @Override + public Counters getJobCounters(JobID arg0) throws IOException, + InterruptedException { + return clientCache.getClient(arg0).getJobCounters(arg0); + } + + @Override + public String getJobHistoryDir() throws IOException, InterruptedException { + return JobHistoryUtils.getConfiguredHistoryServerDoneDirPrefix(conf); + } + + @Override + public JobStatus getJobStatus(JobID jobID) throws IOException, + InterruptedException { + JobStatus status = clientCache.getClient(jobID).getJobStatus(jobID); + return status; + } + + @Override + public TaskCompletionEvent[] getTaskCompletionEvents(JobID arg0, int arg1, + int arg2) throws IOException, InterruptedException { + return clientCache.getClient(arg0).getTaskCompletionEvents(arg0, arg1, arg2); + } + + @Override + public String[] getTaskDiagnostics(TaskAttemptID arg0) throws IOException, + InterruptedException { + return clientCache.getClient(arg0.getJobID()).getTaskDiagnostics(arg0); + } + + @Override + public TaskReport[] getTaskReports(JobID jobID, TaskType taskType) + throws IOException, InterruptedException { + return clientCache.getClient(jobID) + .getTaskReports(jobID, taskType); + } + + @Override + public void killJob(JobID arg0) throws IOException, InterruptedException { + /* check if the status is not running, if not send kill to RM */ + JobStatus status = clientCache.getClient(arg0).getJobStatus(arg0); + if (status.getState() != JobStatus.State.RUNNING) { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + return; + } + + try { + /* send a kill to the AM */ + clientCache.getClient(arg0).killJob(arg0); + long currentTimeMillis = System.currentTimeMillis(); + long timeKillIssued = currentTimeMillis; + while ((currentTimeMillis < timeKillIssued + 10000L) && (status.getState() + != JobStatus.State.KILLED)) { + try { + Thread.sleep(1000L); + } catch(InterruptedException ie) { + /** interrupted, just break */ + break; + } + currentTimeMillis = System.currentTimeMillis(); + status = clientCache.getClient(arg0).getJobStatus(arg0); + } + } catch(IOException io) { + LOG.debug("Error when checking for application status", io); + } + if (status.getState() != JobStatus.State.KILLED) { + resMgrDelegate.killApplication(TypeConverter.toYarn(arg0).getAppId()); + } + } + + @Override + public boolean killTask(TaskAttemptID arg0, boolean arg1) throws IOException, + InterruptedException { + return clientCache.getClient(arg0.getJobID()).killTask(arg0, arg1); + } + + @Override + public AccessControlList getQueueAdmins(String arg0) throws IOException { + return new AccessControlList("*"); + } + + @Override + public JobTrackerStatus getJobTrackerStatus() throws IOException, + InterruptedException { + return JobTrackerStatus.RUNNING; + } + + @Override + public ProtocolSignature getProtocolSignature(String protocol, + long clientVersion, int clientMethodsHash) throws IOException { + return ProtocolSignature.getProtocolSignature(this, protocol, clientVersion, + clientMethodsHash); + } + + @Override + public LogParams getLogFileParams(JobID jobID, TaskAttemptID taskAttemptID) + throws IOException { + return clientCache.getClient(jobID).getLogFilePath(jobID, taskAttemptID); + } + + private static void warnForJavaLibPath(String opts, String component, + String javaConf, String envConf) { + if (opts != null && opts.contains("-Djava.library.path")) { + LOG.warn("Usage of -Djava.library.path in " + javaConf + " can cause " + + "programs to no longer function if hadoop native libraries " + + "are used. These values should be set as part of the " + + "LD_LIBRARY_PATH in the " + component + " JVM env using " + + envConf + " config settings."); + } + } +} Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YARNRunner.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java (added) +++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java Thu Apr 18 23:54:18 2013 @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.mapreduce; + +import java.io.IOException; +import java.net.InetSocketAddress; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.protocol.ClientProtocol; +import org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider; +import org.apache.tez.mapreduce.hadoop.MRConfig; + +public class YarnTezClientProtocolProvider extends ClientProtocolProvider { + + @Override + public ClientProtocol create(Configuration conf) throws IOException { + if (MRConfig.YARN_TEZ_FRAMEWORK_NAME.equals(conf.get(MRConfig.FRAMEWORK_NAME))) { + return new YARNRunner(conf); + } + return null; + } + + @Override + public ClientProtocol create(InetSocketAddress addr, Configuration conf) + throws IOException { + return create(conf); + } + + @Override + public void close(ClientProtocol clientProtocol) throws IOException { + // nothing to do + } + +} Propchange: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/java/org/apache/tez/mapreduce/YarnTezClientProtocolProvider.java ------------------------------------------------------------------------------ svn:eol-style = native Added: incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider URL: http://svn.apache.org/viewvc/incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider?rev=1469642&view=auto ============================================================================== --- incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider (added) +++ incubator/tez/branches/TEZ-1/tez-yarn-client/src/main/resources/META-INF/services/org.apache.hadoop.mapreduce.protocol.ClientProtocolProvider Thu Apr 18 23:54:18 2013 @@ -0,0 +1,14 @@ +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +org.apache.tez.mapreduce.YarnTezClientProtocolProvider