hadoop-mapreduce-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r947112 - in /hadoop/mapreduce/trunk: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/mumak/src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/
Date Fri, 21 May 2010 18:22:59 GMT
Author: acmurthy
Date: Fri May 21 18:22:59 2010
New Revision: 947112

URL: http://svn.apache.org/viewvc?rev=947112&view=rev
Log:
MAPREDUCE-1354. Enhancements to JobTracker for better performance and scalability. Contributed
by Arun C. Murthy & Richard King.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri May 21 18:22:59 2010
@@ -31,6 +31,9 @@ Trunk (unreleased changes)
 
   OPTIMIZATIONS
 
+    MAPREDUCE-1354. Enhancements to JobTracker for better performance and
+    scalability. (Arun C. Murthy & Richard King via acmurthy) 
+
   BUG FIXES
 
     MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri May 21 18:22:59 2010
@@ -238,6 +238,25 @@ class CapacityTaskScheduler extends Task
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
+  
+    /**
+     * Ceil of result of dividing two integers.
+     * 
+     * This is *not* a utility method. 
+     * Neither <code>a</code> or <code>b</code> should be negative.
+     *  
+     * @param a
+     * @param b
+     * @return ceil of the result of a/b
+     */
+    private int divideAndCeil(int a, int b) {
+      if (b != 0) {
+        return (a + (b - 1)) / b;
+      }
+      
+      LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+      return 0;
+    }
 
     private boolean isUserOverLimit(JobInProgress j,
                                     QueueSchedulingContext qsc) {
@@ -255,13 +274,14 @@ class CapacityTaskScheduler extends Task
           tsi.getNumSlotsOccupied() +
             TaskDataView.getTaskDataView(type).getSlotsPerTask(j);
       }
-      int limit = Math.max((int)(Math.ceil((double)currentCapacity/
-          (double) qsc.getNumJobsByUser().size())),
-          (int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0)));
+      int limit = Math.max(divideAndCeil(currentCapacity, qsc.getNumJobsByUser().size()),
+			   divideAndCeil(qsc.getUlMin() * currentCapacity, 100));
       String user = j.getProfile().getUser();
       if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num slots occupied = " +
-            tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
+	if (LOG.isDebugEnabled()) {
+          LOG.debug("User " + user + " is over limit, num slots occupied = " +
+              tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
+	}
         return true;
       }
       else {
@@ -604,7 +624,7 @@ class CapacityTaskScheduler extends Task
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //Check if job supports speculative map execution first then
       //check if job has speculative maps.
-      return (job.getJobConf().getMapSpeculativeExecution())&& (
+      return (job.getMapSpeculativeExecution()) && (
           hasSpeculativeTask(job.getTasks(TaskType.MAP),
                              tts));
     }
@@ -651,7 +671,7 @@ class CapacityTaskScheduler extends Task
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //check if the job supports reduce speculative execution first then
       //check if the job has speculative tasks.
-      return (job.getJobConf().getReduceSpeculativeExecution()) && (
+      return (job.getReduceSpeculativeExecution()) && (
           hasSpeculativeTask(job.getTasks(TaskType.REDUCE),
                              tts));
     }

Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri May 21 18:22:59 2010
@@ -98,11 +98,11 @@ class MemoryMatcher {
     long totalMemUsableOnTT = 0;
     long memForThisTask = 0;
     if (taskType == TaskType.MAP) {
-      memForThisTask = job.getJobConf().getMemoryForMapTask();
+      memForThisTask = job.getMemoryForMapTask();
       totalMemUsableOnTT =
           getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
     } else if (taskType == TaskType.REDUCE) {
-      memForThisTask = job.getJobConf().getMemoryForReduceTask();
+      memForThisTask = job.getMemoryForReduceTask();
       totalMemUsableOnTT =
           getMemSizeForReduceSlot()
               * taskTracker.getMaxReduceSlots();

Modified: hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
(original)
+++ hadoop/mapreduce/trunk/src/contrib/mumak/src/java/org/apache/hadoop/mapred/SimulatorJobInProgress.java
Fri May 21 18:22:59 2010
@@ -53,7 +53,7 @@ public class SimulatorJobInProgress exte
   @SuppressWarnings("deprecation")
   public SimulatorJobInProgress(JobID jobid, String jobSubmitDir, JobTracker jobtracker,
       JobConf default_conf, JobStory jobStory) {
-    super();
+    super(default_conf);
     // jobSetupCleanupNeeded set to false in parent cstr, though
     // default is true
 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri May 21
18:22:59 2010
@@ -121,8 +121,11 @@ public class JobInProgress {
   TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
-  int numSlotsPerMap = 1;
-  int numSlotsPerReduce = 1;
+  final long memoryPerMap;
+  final long memoryPerReduce;
+  volatile int numSlotsPerMap = 1;
+  volatile int numSlotsPerReduce = 1;
+  final int maxTaskFailuresPerTracker;
   
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
@@ -234,8 +237,8 @@ public class JobInProgress {
   FileSystem fs;
   String user;
   JobID jobId;
-  private boolean hasSpeculativeMaps;
-  private boolean hasSpeculativeReduces;
+  volatile private boolean hasSpeculativeMaps;
+  volatile private boolean hasSpeculativeReduces;
   long inputLength = 0;
   
   Counters jobCounters = new Counters();
@@ -328,6 +331,11 @@ public class JobInProgress {
     this.profile = new JobProfile(conf.getUser(), jobid, "", "", 
                                   conf.getJobName(),conf.getQueueName());
 
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
     
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
@@ -358,9 +366,14 @@ public class JobInProgress {
     this.tokenStorage = null;
   }
   
-  JobInProgress() {
+  JobInProgress(JobConf conf) {
     restartCount = 0;
     jobSetupCleanupNeeded = false;
+
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
   }
   
   /**
@@ -425,6 +438,10 @@ public class JobInProgress {
 
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
+
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
     JobContext jobContext = new JobContextImpl(conf, jobId);
@@ -435,6 +452,8 @@ public class JobInProgress {
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
+
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
         
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
@@ -527,11 +546,27 @@ public class JobInProgress {
     return user;
   }
 
+  boolean getMapSpeculativeExecution() {
+    return hasSpeculativeMaps;
+  }
+  
+  boolean getReduceSpeculativeExecution() {
+    return hasSpeculativeReduces;
+  }
+  
+  long getMemoryForMapTask() {
+    return memoryPerMap;
+  }
+  
+  long getMemoryForReduceTask() {
+    return memoryPerReduce;
+  }
+  
   /**
    * Get the number of slots required to run a single map task-attempt.
    * @return the number of slots required to run a single map task-attempt
    */
-  synchronized int getNumSlotsPerMap() {
+  int getNumSlotsPerMap() {
     return numSlotsPerMap;
   }
 
@@ -540,7 +575,7 @@ public class JobInProgress {
    * This is typically set by schedulers which support high-ram jobs.
    * @param slots the number of slots required to run a single map task-attempt
    */
-  synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+  void setNumSlotsPerMap(int numSlotsPerMap) {
     this.numSlotsPerMap = numSlotsPerMap;
   }
 
@@ -548,7 +583,7 @@ public class JobInProgress {
    * Get the number of slots required to run a single reduce task-attempt.
    * @return the number of slots required to run a single reduce task-attempt
    */
-  synchronized int getNumSlotsPerReduce() {
+  int getNumSlotsPerReduce() {
     return numSlotsPerReduce;
   }
 
@@ -558,7 +593,7 @@ public class JobInProgress {
    * @param slots the number of slots required to run a single reduce 
    *              task-attempt
    */
-  synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+  void setNumSlotsPerReduce(int numSlotsPerReduce) {
     this.numSlotsPerReduce = numSlotsPerReduce;
   }
 
@@ -840,7 +875,7 @@ public class JobInProgress {
     finishedReduceTasks + speculativeReduceTasks;
   }
  
-  public synchronized int getNumSlotsPerTask(TaskType taskType) {
+  public int getNumSlotsPerTask(TaskType taskType) {
     if (taskType == TaskType.MAP) {
       return numSlotsPerMap;
     } else if (taskType == TaskType.REDUCE) {
@@ -1683,7 +1718,7 @@ public class JobInProgress {
       trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
 
       // Check if this tasktracker has turned 'flaky'
-      if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
+      if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
         ++flakyTaskTrackers;
         
         // Cancel reservations if appropriate
@@ -1793,7 +1828,7 @@ public class JobInProgress {
   List<String> getBlackListedTrackers() {
     List<String> blackListedTrackers = new ArrayList<String>();
     for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
-       if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
+       if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
          blackListedTrackers.add(e.getKey());
        }
     }
@@ -2518,7 +2553,7 @@ public class JobInProgress {
     //
     int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
     if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
-        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+        taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
       if (LOG.isDebugEnabled()) {
         String flakyTracker = convertTrackerNameToHostName(taskTracker); 
         LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=947112&r1=947111&r2=947112&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Fri May 21 18:22:59
2010
@@ -47,6 +47,7 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 
@@ -205,7 +206,7 @@ public class JobTracker implements MRCon
     }
   }
 
-  private int nextJobId = 1;
+  private final AtomicInteger nextJobId = new AtomicInteger(1);
 
   public static final Log LOG = LogFactory.getLog(JobTracker.class);
     
@@ -1194,7 +1195,8 @@ public class JobTracker implements MRCon
   //
 
   // All the known jobs.  (jobid->JobInProgress)
-  Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
+  Map<JobID, JobInProgress> jobs =  
+    Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
 
   // (trackerID --> list of jobs to cleanup)
   Map<String, Set<JobID>> trackerToJobsToCleanup = 
@@ -2925,17 +2927,16 @@ public class JobTracker implements MRCon
    * @deprecated use {@link #getNewJobID()} instead
    */
   @Deprecated
-  public synchronized JobID getNewJobId() throws IOException {
+  public JobID getNewJobId() throws IOException {
     return JobID.downgrade(getNewJobID());
   }
 
   /**
    * Allocates a new JobId string.
    */
-  public synchronized org.apache.hadoop.mapreduce.JobID getNewJobID()
-      throws IOException {
-    return new org.apache.hadoop.mapreduce.JobID(
-      getTrackerIdentifier(), nextJobId++);
+  public org.apache.hadoop.mapreduce.JobID getNewJobID() throws IOException {
+    return new org.apache.hadoop.mapreduce.JobID
+      (getTrackerIdentifier(), nextJobId.getAndIncrement());
   }
 
   /**
@@ -2966,11 +2967,10 @@ public class JobTracker implements MRCon
    *  instead
    */
   @Deprecated
-  public synchronized JobStatus submitJob(JobID jobId, 
-                                          String jobSubmitDir,
-                                          TokenStorage ts
-                                         ) throws IOException, 
-                                                  InterruptedException {
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir,
+			     TokenStorage ts)
+      throws IOException, InterruptedException {
+
     return submitJob(jobId, 0, UserGroupInformation.getCurrentUser(), 
                      jobSubmitDir, false, ts);
   }
@@ -2978,60 +2978,73 @@ public class JobTracker implements MRCon
   /**
    * Submits either a new job or a job from an earlier run.
    */
-  private synchronized 
-  JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
-                      int restartCount, UserGroupInformation ugi, 
-                      String jobSubmitDir, boolean recovered, TokenStorage ts
-                      ) throws IOException, InterruptedException {
-    JobID jobId = JobID.downgrade(jobID);
-    if(jobs.containsKey(jobId)) {
-      //job already running, don't start twice
-      return jobs.get(jobId).getStatus();
-    }
-
-    //the conversion from String to Text for the UGI's username will
-    //not be required when we have the UGI to return us the username as
-    //Text.
-    JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()), 
-        new Path(jobSubmitDir));
-    JobInProgress job = 
-      new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
-    
-    String queue = job.getProfile().getQueueName();
-    if(!(queueManager.getLeafQueueNames().contains(queue))) {
-      throw new IOException("Queue \"" + queue + "\" does not exist");        
-    }
-    
-    //check if queue is RUNNING
-    if(!queueManager.isRunning(queue)) {
-      throw new IOException("Queue \"" + queue + "\" is not running");
-    }
-    try {
-      checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
-    } catch (AccessControlException ace) {
-      LOG.warn("Access denied for user " + job.getJobConf().getUser() 
-          + ". Ignoring job " + jobId, ace);
-      throw ace;
-    }
+  private JobStatus submitJob(org.apache.hadoop.mapreduce.JobID jobID, 
+			      int restartCount, UserGroupInformation ugi, 
+			      String jobSubmitDir, boolean recovered, TokenStorage ts
+			      )
+      throws IOException, InterruptedException {
 
-    // Check the job if it cannot run in the cluster because of invalid memory
-    // requirements.
-    try {
-      checkMemoryRequirements(job);
-    } catch (IOException ioe) {
-      throw ioe;
-    }
+    JobID jobId = null;
 
-    if (!recovered) {
-      //Store the information in a file so that the job can be recovered
-      //later (if at all)
-      Path jobDir = getSystemDirectoryForJob(jobId);
-      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
-      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
-      jobInfo.write(out);
-      out.close();
+    JobInfo jobInfo;
+
+    synchronized (this) {
+      jobId = JobID.downgrade(jobID);
+      if (jobs.containsKey(jobId)) {
+        // job already running, don't start twice
+        return jobs.get(jobId).getStatus();
+      }
+
+      // the conversion from String to Text for the UGI's username will
+      // not be required when we have the UGI to return us the username as
+      // Text.
+      jobInfo =
+          new JobInfo(jobId, new Text(ugi.getShortUserName()), new Path(
+              jobSubmitDir));
+    }
+
+    // Create the JobInProgress, temporarily unlock the JobTracker since
+    // we are about to copy job.xml from HDFSJobInProgress
+    JobInProgress job =
+        new JobInProgress(this, this.conf, restartCount, jobInfo, ts);
+
+    synchronized (this) {
+      String queue = job.getProfile().getQueueName();
+      if (!(queueManager.getLeafQueueNames().contains(queue))) {
+        throw new IOException("Queue \"" + queue + "\" does not exist");
+      }
+
+      // check if queue is RUNNING
+      if (!queueManager.isRunning(queue)) {
+        throw new IOException("Queue \"" + queue + "\" is not running");
+      }
+      try {
+        checkAccess(job, ugi, Queue.QueueOperation.SUBMIT_JOB, null);
+      } catch (AccessControlException ace) {
+        LOG.warn("Access denied for user " + job.getJobConf().getUser()
+            + ". Ignoring job " + jobId, ace);
+        throw ace;
+      }
+
+      // Check the job if it cannot run in the cluster because of invalid memory
+      // requirements.
+      try {
+        checkMemoryRequirements(job);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+
+      if (!recovered) {
+        // Store the information in a file so that the job can be recovered
+        // later (if at all)
+        Path jobDir = getSystemDirectoryForJob(jobId);
+        FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+        FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+        jobInfo.write(out);
+        out.close();
+      }
+      return addJob(jobId, job);
     }
-    return addJob(jobId, job); 
   }
 
   /**
@@ -3375,6 +3388,18 @@ public class JobTracker implements MRCon
   }
   
   /**
+   * Check if the <code>job</code> has been initialized.
+   * 
+   * @param job {@link JobInProgress} to be checked
+   * @return <code>true</code> if the job has been initialized,
+   *         <code>false</code> otherwise
+   */
+  private boolean isJobInited(JobInProgress job) {
+    return job.inited(); 
+  }
+  
+
+  /**
    * @deprecated Use {@link #getJobProfile(org.apache.hadoop.mapreduce.JobID)} 
    * instead
    */
@@ -3383,6 +3408,8 @@ public class JobTracker implements MRCon
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
+        // Safe to call JobInProgress.getProfile while holding the lock
+        // on the JobTracker since it isn't a synchronized method
         return job.getProfile();
       } 
     }
@@ -3410,6 +3437,8 @@ public class JobTracker implements MRCon
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
+        // Safe to call JobInProgress.getStatus while holding the lock
+        // on the JobTracker since it isn't a synchronized method
         return job.getStatus();
       } else {
         JobStatus status = retireJobs.get(jobid);
@@ -3421,6 +3450,9 @@ public class JobTracker implements MRCon
     return completedJobStatusStore.readJobStatus(jobid);
   }
 
+  private static final org.apache.hadoop.mapreduce.Counters EMPTY_COUNTERS
+      = new org.apache.hadoop.mapreduce.Counters();
+
   /**
    * see
    * {@link ClientProtocol#getJobCounters(org.apache.hadoop.mapreduce.JobID)}
@@ -3438,11 +3470,14 @@ public class JobTracker implements MRCon
     synchronized (this) {
       JobInProgress job = jobs.get(oldJobID);
       if (job != null) {
-
-        // check the job-access
+	// check the job-access
         job.checkAccess(UserGroupInformation.getCurrentUser(),
             JobACL.VIEW_JOB);
 
+        if (!isJobInited(job)) {
+	  return EMPTY_COUNTERS;
+	}
+
         Counters counters = job.getCounters();
         if (counters != null) {
           return new org.apache.hadoop.mapreduce.Counters(counters);
@@ -3473,7 +3508,9 @@ public class JobTracker implements MRCon
       return null;
     } 
   }
-  
+
+  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
+
   /**
    * @param jobid
    * @return array of TaskReport
@@ -3484,8 +3521,8 @@ public class JobTracker implements MRCon
   @Deprecated
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeMapTasks =
@@ -3514,8 +3551,8 @@ public class JobTracker implements MRCon
   @Deprecated
   public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector completeReduceTasks = job.reportTasksInProgress(false, true);
@@ -3542,8 +3579,8 @@ public class JobTracker implements MRCon
   @Deprecated
   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
@@ -3573,8 +3610,8 @@ public class JobTracker implements MRCon
   @Deprecated
   public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
@@ -3610,7 +3647,7 @@ public class JobTracker implements MRCon
       job.checkAccess(UserGroupInformation.getCurrentUser(),
           JobACL.VIEW_JOB);
     } else { 
-      return new TaskReport[0];
+      return EMPTY_TASK_REPORTS;
     }
 
     switch (type) {
@@ -3623,16 +3660,14 @@ public class JobTracker implements MRCon
       case JOB_SETUP :
         return getSetupTaskReports(JobID.downgrade(jobid));
     }
-    return new TaskReport[0];
+    return EMPTY_TASK_REPORTS;
   }
 
-  TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
-
   /* 
    * Returns a list of TaskCompletionEvent for the given job, 
    * starting from fromEventId.
    */
-  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+  public TaskCompletionEvent[] getTaskCompletionEvents(
       org.apache.hadoop.mapreduce.JobID jobid, int fromEventId, int maxEvents)
       throws IOException {
     return getTaskCompletionEvents(JobID.downgrade(jobid),
@@ -3645,21 +3680,20 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String,
int, int)
    */
   @Deprecated
-  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
-      JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    synchronized (this) {
-      JobInProgress job = this.jobs.get(jobid);
-      if (null != job) {
-        if (job.inited()) {
-          return job.getTaskCompletionEvents(fromEventId, maxEvents);
-        } else {
-          return EMPTY_EVENTS;
-        }
-      }
+  public TaskCompletionEvent[] getTaskCompletionEvents(
+    JobID jobid, int fromEventId, int maxEvents) throws IOException{
+
+    JobInProgress job = this.jobs.get(jobid);
+    if (null != job) {
+      return job.inited() ? job.getTaskCompletionEvents(fromEventId, maxEvents)
+	  : TaskCompletionEvent.EMPTY_ARRAY;
     }
+
     return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
   }
 
+  private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0];
+
   /**
    * Get the diagnostics for a given task
    * @param taskId the id of the task
@@ -3689,15 +3723,16 @@ public class JobTracker implements MRCon
       job.checkAccess(UserGroupInformation.getCurrentUser(),
           JobACL.VIEW_JOB);
 
-      TaskInProgress tip = job.getTaskInProgress(tipId);
-      if (tip != null) {
-        taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
-      }
-      
+      if (isJobInited(job)) {
+        TaskInProgress tip = job.getTaskInProgress(tipId);
+        if (tip != null) {
+          taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
+        }
+      }      
     }
     
-    return ((taskDiagnosticInfo == null) ? new String[0] 
-            : taskDiagnosticInfo.toArray(new String[0]));
+    return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS :
+             taskDiagnosticInfo.toArray(new String[taskDiagnosticInfo.size()]));
   }
     
   /** Get all the TaskStatuses from the tipid. */
@@ -4401,8 +4436,8 @@ public class JobTracker implements MRCon
 
     boolean invalidJob = false;
     String msg = "";
-    long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
-    long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+    long maxMemForMapTask = job.getMemoryForMapTask();
+    long maxMemForReduceTask = job.getMemoryForReduceTask();
 
     if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
         || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {



Mime
View raw message