Return-Path: Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: (qmail 68031 invoked from network); 21 May 2010 18:23:22 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 21 May 2010 18:23:22 -0000 Received: (qmail 75346 invoked by uid 500); 21 May 2010 18:23:22 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 75298 invoked by uid 500); 21 May 2010 18:23:22 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 75281 invoked by uid 99); 21 May 2010 18:23:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 May 2010 18:23:21 +0000 X-ASF-Spam-Status: No, hits=-1750.3 required=10.0 tests=ALL_TRUSTED,AWL X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 May 2010 18:23:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id EBB2623889D2; Fri, 21 May 2010 18:22:59 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r947112 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/ Date: Fri, 21 May 2010 18:22:59 -0000 To: mapreduce-commits@hadoop.apache.org From: acmurthy@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100521182259.EBB2623889D2@eris.apache.org> Author: acmurthy Date: Fri May 21 18:22:59 2010 New Revision: 947112 URL: http://svn.apache.org/viewvc?rev=947112&view=rev Log: MAPREDUCE-1354. Enhancements to JobTracker for better performance and scalability. Contributed by Arun C. Murthy & Richard King. Modified: hadoop/mapreduce/trunk/CHANGES.txt hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Modified: hadoop/mapreduce/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/CHANGES.txt (original) +++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 21 18:22:59 2010 @@ -31,6 +31,9 @@ Trunk (unreleased changes) OPTIMIZATIONS + MAPREDUCE-1354. Enhancements to JobTracker for better performance and + scalability. (Arun C. Murthy & Richard King via acmurthy) + BUG FIXES MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker. Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri May 21 18:22:59 2010 @@ -238,6 +238,25 @@ class CapacityTaskScheduler extends Task TaskSchedulingMgr(CapacityTaskScheduler sched) { scheduler = sched; } + + /** + * Ceil of result of dividing two integers. + * + * This is *not* a utility method. + * Neither a or b should be negative. + * + * @param a + * @param b + * @return ceil of the result of a/b + */ + private int divideAndCeil(int a, int b) { + if (b != 0) { + return (a + (b - 1)) / b; + } + + LOG.info("divideAndCeil called with a=" + a + " b=" + b); + return 0; + } private boolean isUserOverLimit(JobInProgress j, QueueSchedulingContext qsc) { @@ -255,13 +274,14 @@ class CapacityTaskScheduler extends Task tsi.getNumSlotsOccupied() + TaskDataView.getTaskDataView(type).getSlotsPerTask(j); } - int limit = Math.max((int)(Math.ceil((double)currentCapacity/ - (double) qsc.getNumJobsByUser().size())), - (int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0))); + int limit = Math.max(divideAndCeil(currentCapacity, qsc.getNumJobsByUser().size()), + divideAndCeil(qsc.getUlMin() * currentCapacity, 100)); String user = j.getProfile().getUser(); if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) { - LOG.debug("User " + user + " is over limit, num slots occupied = " + - tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit); + if (LOG.isDebugEnabled()) { + LOG.debug("User " + user + " is over limit, num slots occupied = " + + tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit); + } return true; } else { @@ -604,7 +624,7 @@ class CapacityTaskScheduler extends Task boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) { //Check if job supports speculative map execution first then //check if job has speculative maps. - return (job.getJobConf().getMapSpeculativeExecution())&& ( + return (job.getMapSpeculativeExecution()) && ( hasSpeculativeTask(job.getTasks(TaskType.MAP), tts)); } @@ -651,7 +671,7 @@ class CapacityTaskScheduler extends Task boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) { //check if the job supports reduce speculative execution first then //check if the job has speculative tasks. - return (job.getJobConf().getReduceSpeculativeExecution()) && ( + return (job.getReduceSpeculativeExecution()) && ( hasSpeculativeTask(job.getTasks(TaskType.REDUCE), tts)); } Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original) +++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Fri May 21 18:22:59 2010 @@ -98,11 +98,11 @@ class MemoryMatcher { long totalMemUsableOnTT = 0; long memForThisTask = 0; if (taskType == TaskType.MAP) { - memForThisTask = job.getJobConf().getMemoryForMapTask(); + memForThisTask = job.getMemoryForMapTask(); totalMemUsableOnTT = getMemSizeForMapSlot() * taskTracker.getMaxMapSlots(); } else if (taskType == TaskType.REDUCE) { - memForThisTask = job.getJobConf().getMemoryForReduceTask(); + memForThisTask = job.getMemoryForReduceTask(); totalMemUsableOnTT = getMemSizeForReduceSlot() * taskTracker.getMaxReduceSlots(); Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java Fri May 21 18:22:59 2010 @@ -53,7 +53,7 @@ public class SimulatorJobInProgress exte @SuppressWarnings("deprecation") public SimulatorJobInProgress(JobID jobid, String jobSubmitDir, JobTracker jobtracker, JobConf default_conf, JobStory jobStory) { - super(); + super(default_conf); // jobSetupCleanupNeeded set to false in parent cstr, though // default is true Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri May 21 18:22:59 2010 @@ -121,8 +121,11 @@ public class JobInProgress { TaskInProgress setup[] = new TaskInProgress[0]; int numMapTasks = 0; int numReduceTasks = 0; - int numSlotsPerMap = 1; - int numSlotsPerReduce = 1; + final long memoryPerMap; + final long memoryPerReduce; + volatile int numSlotsPerMap = 1; + volatile int numSlotsPerReduce = 1; + final int maxTaskFailuresPerTracker; // Counters to track currently running/finished/failed Map/Reduce task-attempts int runningMapTasks = 0; @@ -234,8 +237,8 @@ public class JobInProgress { FileSystem fs; String user; JobID jobId; - private boolean hasSpeculativeMaps; - private boolean hasSpeculativeReduces; + volatile private boolean hasSpeculativeMaps; + volatile private boolean hasSpeculativeReduces; long inputLength = 0; Counters jobCounters = new Counters(); @@ -328,6 +331,11 @@ public class JobInProgress { this.profile = new JobProfile(conf.getUser(), jobid, "", "", conf.getJobName(),conf.getQueueName()); + this.memoryPerMap = conf.getMemoryForMapTask(); + this.memoryPerReduce = conf.getMemoryForReduceTask(); + + this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); + hasSpeculativeMaps = conf.getMapSpeculativeExecution(); hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); @@ -358,9 +366,14 @@ public class JobInProgress { this.tokenStorage = null; } - JobInProgress() { + JobInProgress(JobConf conf) { restartCount = 0; jobSetupCleanupNeeded = false; + + this.memoryPerMap = conf.getMemoryForMapTask(); + this.memoryPerReduce = conf.getMemoryForReduceTask(); + + this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); } /** @@ -425,6 +438,10 @@ public class JobInProgress { this.numMapTasks = conf.getNumMapTasks(); this.numReduceTasks = conf.getNumReduceTasks(); + + this.memoryPerMap = conf.getMemoryForMapTask(); + this.memoryPerReduce = conf.getMemoryForReduceTask(); + this.taskCompletionEvents = new ArrayList (numMapTasks + numReduceTasks + 10); JobContext jobContext = new JobContextImpl(conf, jobId); @@ -435,6 +452,8 @@ public class JobInProgress { this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent(); this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent(); + + this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker(); hasSpeculativeMaps = conf.getMapSpeculativeExecution(); hasSpeculativeReduces = conf.getReduceSpeculativeExecution(); @@ -527,11 +546,27 @@ public class JobInProgress { return user; } + boolean getMapSpeculativeExecution() { + return hasSpeculativeMaps; + } + + boolean getReduceSpeculativeExecution() { + return hasSpeculativeReduces; + } + + long getMemoryForMapTask() { + return memoryPerMap; + } + + long getMemoryForReduceTask() { + return memoryPerReduce; + } + /** * Get the number of slots required to run a single map task-attempt. * @return the number of slots required to run a single map task-attempt */ - synchronized int getNumSlotsPerMap() { + int getNumSlotsPerMap() { return numSlotsPerMap; } @@ -540,7 +575,7 @@ public class JobInProgress { * This is typically set by schedulers which support high-ram jobs. * @param slots the number of slots required to run a single map task-attempt */ - synchronized void setNumSlotsPerMap(int numSlotsPerMap) { + void setNumSlotsPerMap(int numSlotsPerMap) { this.numSlotsPerMap = numSlotsPerMap; } @@ -548,7 +583,7 @@ public class JobInProgress { * Get the number of slots required to run a single reduce task-attempt. * @return the number of slots required to run a single reduce task-attempt */ - synchronized int getNumSlotsPerReduce() { + int getNumSlotsPerReduce() { return numSlotsPerReduce; } @@ -558,7 +593,7 @@ public class JobInProgress { * @param slots the number of slots required to run a single reduce * task-attempt */ - synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) { + void setNumSlotsPerReduce(int numSlotsPerReduce) { this.numSlotsPerReduce = numSlotsPerReduce; } @@ -840,7 +875,7 @@ public class JobInProgress { finishedReduceTasks + speculativeReduceTasks; } - public synchronized int getNumSlotsPerTask(TaskType taskType) { + public int getNumSlotsPerTask(TaskType taskType) { if (taskType == TaskType.MAP) { return numSlotsPerMap; } else if (taskType == TaskType.REDUCE) { @@ -1683,7 +1718,7 @@ public class JobInProgress { trackerToFailuresMap.put(trackerHostName, ++trackerFailures); // Check if this tasktracker has turned 'flaky' - if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) { + if (trackerFailures.intValue() == maxTaskFailuresPerTracker) { ++flakyTaskTrackers; // Cancel reservations if appropriate @@ -1793,7 +1828,7 @@ public class JobInProgress { List getBlackListedTrackers() { List blackListedTrackers = new ArrayList(); for (Map.Entry e : trackerToFailuresMap.entrySet()) { - if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) { + if (e.getValue().intValue() >= maxTaskFailuresPerTracker) { blackListedTrackers.add(e.getKey()); } } @@ -2518,7 +2553,7 @@ public class JobInProgress { // int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker); if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && - taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) { + taskTrackerFailedTasks >= maxTaskFailuresPerTracker) { if (LOG.isDebugEnabled()) { String flakyTracker = convertTrackerNameToHostName(taskTracker); LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=947112&r1=947111&r2=947112&view=diff ============================================================================== --- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original) +++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May 21 18:22:59 2010 @@ -47,6 +47,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; import java.util.Vector; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -205,7 +206,7 @@ public class JobTracker implements MRCon } } - private int nextJobId = 1; + private final AtomicInteger nextJobId = new AtomicInteger(1); public static final Log LOG = LogFactory.getLog(JobTracker.class); @@ -1194,7 +1195,8 @@ public class JobTracker implements MRCon // // All the known jobs. (jobid->JobInProgress) - Map jobs = new TreeMap(); + Map jobs = + Collections.synchronizedMap(new TreeMap()); // (trackerID --> list of jobs to cleanup) Map> trackerToJobsToCleanup = @@ -2925,17 +2927,16 @@ public class JobTracker implements MRCon * @deprecated use {@link #getNewJobID()} instead */ @Deprecated - public synchronized JobID getNewJobId() throws IOException { + public JobID getNewJobId() throws IOException { return JobID.downgrade(getNewJobID()); } /** * Allocates a new JobId string. */ - public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID() - throws IOException { - return new org.apache.hadoop.mapreduce.JobID( - getTrackerIdentifier(), nextJobId++); + public org.apache.hadoop.mapreduce.JobID getNewJobID() throws IOException { + return new org.apache.hadoop.mapreduce.JobID + (getTrackerIdentifier(), nextJobId.getAndIncrement()); } /** @@ -2966,11 +2967,10 @@ public class JobTracker implements MRCon * instead */ @Deprecated - public synchronized JobStatus submitJob(JobID jobId, - String jobSubmitDir, - TokenStorage ts - ) throws IOException, - InterruptedException { + public JobStatus submitJob(JobID jobId, String jobSubmitDir, + TokenStorage ts) + throws IOException, InterruptedException { + return submitJob(jobId, 0, UserGroupInformation.getCurrentUser(), jobSubmitDir, false, ts); } @@ -2978,60 +2978,73 @@ public class JobTracker implements MRCon /** * Submits either a new job or a job from an earlier run. */ - private synchronized - JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, - int restartCount, UserGroupInformation ugi, - String jobSubmitDir, boolean recovered, TokenStorage ts - ) throws IOException, InterruptedException { - JobID jobId = JobID.downgrade(jobID); - if(jobs.containsKey(jobId)) { - //job already running, don't start twice - return jobs.get(jobId).getStatus(); - } - - //the conversion from String to Text for the UGI's username will - //not be required when we have the UGI to return us the username as - //Text. - JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()), - new Path(jobSubmitDir)); - JobInProgress job = - new JobInProgress(this, this.conf, restartCount, jobInfo, ts); - - String queue = job.getProfile().getQueueName(); - if(!(queueManager.getLeafQueueNames().contains(queue))) { - throw new IOException("Queue \"" + queue + "\" does not exist"); - } - - //check if queue is RUNNING - if(!queueManager.isRunning(queue)) { - throw new IOException("Queue \"" + queue + "\" is not running"); - } - try { - checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null); - } catch (AccessControlException ace) { - LOG.warn("Access denied for user " + job.getJobConf().getUser() - + ". Ignoring job " + jobId, ace); - throw ace; - } + private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, + int restartCount, UserGroupInformation ugi, + String jobSubmitDir, boolean recovered, TokenStorage ts + ) + throws IOException, InterruptedException { - // Check the job if it cannot run in the cluster because of invalid memory - // requirements. - try { - checkMemoryRequirements(job); - } catch (IOException ioe) { - throw ioe; - } + JobID jobId = null; - if (!recovered) { - //Store the information in a file so that the job can be recovered - //later (if at all) - Path jobDir = getSystemDirectoryForJob(jobId); - FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); - FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); - jobInfo.write(out); - out.close(); + JobInfo jobInfo; + + synchronized (this) { + jobId = JobID.downgrade(jobID); + if (jobs.containsKey(jobId)) { + // job already running, don't start twice + return jobs.get(jobId).getStatus(); + } + + // the conversion from String to Text for the UGI's username will + // not be required when we have the UGI to return us the username as + // Text. + jobInfo = + new JobInfo(jobId, new Text(ugi.getShortUserName()), new Path( + jobSubmitDir)); + } + + // Create the JobInProgress, temporarily unlock the JobTracker since + // we are about to copy job.xml from HDFSJobInProgress + JobInProgress job = + new JobInProgress(this, this.conf, restartCount, jobInfo, ts); + + synchronized (this) { + String queue = job.getProfile().getQueueName(); + if (!(queueManager.getLeafQueueNames().contains(queue))) { + throw new IOException("Queue \"" + queue + "\" does not exist"); + } + + // check if queue is RUNNING + if (!queueManager.isRunning(queue)) { + throw new IOException("Queue \"" + queue + "\" is not running"); + } + try { + checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null); + } catch (AccessControlException ace) { + LOG.warn("Access denied for user " + job.getJobConf().getUser() + + ". Ignoring job " + jobId, ace); + throw ace; + } + + // Check the job if it cannot run in the cluster because of invalid memory + // requirements. + try { + checkMemoryRequirements(job); + } catch (IOException ioe) { + throw ioe; + } + + if (!recovered) { + // Store the information in a file so that the job can be recovered + // later (if at all) + Path jobDir = getSystemDirectoryForJob(jobId); + FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION)); + FSDataOutputStream out = fs.create(getSystemFileForJob(jobId)); + jobInfo.write(out); + out.close(); + } + return addJob(jobId, job); } - return addJob(jobId, job); } /** @@ -3375,6 +3388,18 @@ public class JobTracker implements MRCon } /** + * Check if the job has been initialized. + * + * @param job {@link JobInProgress} to be checked + * @return true if the job has been initialized, + * false otherwise + */ + private boolean isJobInited(JobInProgress job) { + return job.inited(); + } + + + /** * @deprecated Use {@link #getJobProfile(org.apache.hadoop.mapreduce.JobID)} * instead */ @@ -3383,6 +3408,8 @@ public class JobTracker implements MRCon synchronized (this) { JobInProgress job = jobs.get(jobid); if (job != null) { + // Safe to call JobInProgress.getProfile while holding the lock + // on the JobTracker since it isn't a synchronized method return job.getProfile(); } } @@ -3410,6 +3437,8 @@ public class JobTracker implements MRCon synchronized (this) { JobInProgress job = jobs.get(jobid); if (job != null) { + // Safe to call JobInProgress.getStatus while holding the lock + // on the JobTracker since it isn't a synchronized method return job.getStatus(); } else { JobStatus status = retireJobs.get(jobid); @@ -3421,6 +3450,9 @@ public class JobTracker implements MRCon return completedJobStatusStore.readJobStatus(jobid); } + private static final org.apache.hadoop.mapreduce.Counters EMPTY_COUNTERS + = new org.apache.hadoop.mapreduce.Counters(); + /** * see * {@link ClientProtocol#getJobCounters(org.apache.hadoop.mapreduce.JobID)} @@ -3438,11 +3470,14 @@ public class JobTracker implements MRCon synchronized (this) { JobInProgress job = jobs.get(oldJobID); if (job != null) { - - // check the job-access + // check the job-access job.checkAccess(UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB); + if (!isJobInited(job)) { + return EMPTY_COUNTERS; + } + Counters counters = job.getCounters(); if (counters != null) { return new org.apache.hadoop.mapreduce.Counters(counters); @@ -3473,7 +3508,9 @@ public class JobTracker implements MRCon return null; } } - + + private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0]; + /** * @param jobid * @return array of TaskReport @@ -3484,8 +3521,8 @@ public class JobTracker implements MRCon @Deprecated public synchronized TaskReport[] getMapTaskReports(JobID jobid) { JobInProgress job = jobs.get(jobid); - if (job == null) { - return new TaskReport[0]; + if (job == null || !isJobInited(job)) { + return EMPTY_TASK_REPORTS; } else { Vector reports = new Vector(); Vector completeMapTasks = @@ -3514,8 +3551,8 @@ public class JobTracker implements MRCon @Deprecated public synchronized TaskReport[] getReduceTaskReports(JobID jobid) { JobInProgress job = jobs.get(jobid); - if (job == null) { - return new TaskReport[0]; + if (job == null || !isJobInited(job)) { + return EMPTY_TASK_REPORTS; } else { Vector reports = new Vector(); Vector completeReduceTasks = job.reportTasksInProgress(false, true); @@ -3542,8 +3579,8 @@ public class JobTracker implements MRCon @Deprecated public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) { JobInProgress job = jobs.get(jobid); - if (job == null) { - return new TaskReport[0]; + if (job == null || !isJobInited(job)) { + return EMPTY_TASK_REPORTS; } else { Vector reports = new Vector(); Vector completeTasks = job.reportCleanupTIPs(true); @@ -3573,8 +3610,8 @@ public class JobTracker implements MRCon @Deprecated public synchronized TaskReport[] getSetupTaskReports(JobID jobid) { JobInProgress job = jobs.get(jobid); - if (job == null) { - return new TaskReport[0]; + if (job == null || !isJobInited(job)) { + return EMPTY_TASK_REPORTS; } else { Vector reports = new Vector(); Vector completeTasks = job.reportSetupTIPs(true); @@ -3610,7 +3647,7 @@ public class JobTracker implements MRCon job.checkAccess(UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB); } else { - return new TaskReport[0]; + return EMPTY_TASK_REPORTS; } switch (type) { @@ -3623,16 +3660,14 @@ public class JobTracker implements MRCon case JOB_SETUP : return getSetupTaskReports(JobID.downgrade(jobid)); } - return new TaskReport[0]; + return EMPTY_TASK_REPORTS; } - TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0]; - /* * Returns a list of TaskCompletionEvent for the given job, * starting from fromEventId. */ - public synchronized TaskCompletionEvent[] getTaskCompletionEvents( + public TaskCompletionEvent[] getTaskCompletionEvents( org.apache.hadoop.mapreduce.JobID jobid, int fromEventId, int maxEvents) throws IOException { return getTaskCompletionEvents(JobID.downgrade(jobid), @@ -3645,21 +3680,20 @@ public class JobTracker implements MRCon * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String, int, int) */ @Deprecated - public synchronized TaskCompletionEvent[] getTaskCompletionEvents( - JobID jobid, int fromEventId, int maxEvents) throws IOException{ - synchronized (this) { - JobInProgress job = this.jobs.get(jobid); - if (null != job) { - if (job.inited()) { - return job.getTaskCompletionEvents(fromEventId, maxEvents); - } else { - return EMPTY_EVENTS; - } - } + public TaskCompletionEvent[] getTaskCompletionEvents( + JobID jobid, int fromEventId, int maxEvents) throws IOException{ + + JobInProgress job = this.jobs.get(jobid); + if (null != job) { + return job.inited() ? job.getTaskCompletionEvents(fromEventId, maxEvents) + : TaskCompletionEvent.EMPTY_ARRAY; } + return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents); } + private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0]; + /** * Get the diagnostics for a given task * @param taskId the id of the task @@ -3689,15 +3723,16 @@ public class JobTracker implements MRCon job.checkAccess(UserGroupInformation.getCurrentUser(), JobACL.VIEW_JOB); - TaskInProgress tip = job.getTaskInProgress(tipId); - if (tip != null) { - taskDiagnosticInfo = tip.getDiagnosticInfo(taskId); - } - + if (isJobInited(job)) { + TaskInProgress tip = job.getTaskInProgress(tipId); + if (tip != null) { + taskDiagnosticInfo = tip.getDiagnosticInfo(taskId); + } + } } - return ((taskDiagnosticInfo == null) ? new String[0] - : taskDiagnosticInfo.toArray(new String[0])); + return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS : + taskDiagnosticInfo.toArray(new String[taskDiagnosticInfo.size()])); } /** Get all the TaskStatuses from the tipid. */ @@ -4401,8 +4436,8 @@ public class JobTracker implements MRCon boolean invalidJob = false; String msg = ""; - long maxMemForMapTask = job.getJobConf().getMemoryForMapTask(); - long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask(); + long maxMemForMapTask = job.getMemoryForMapTask(); + long maxMemForReduceTask = job.getMemoryForReduceTask(); if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {