hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject svn commit: r1077223 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/
Date Fri, 04 Mar 2011 03:53:24 GMT
Author: omalley
Date: Fri Mar  4 03:53:24 2011
New Revision: 1077223

URL: http://svn.apache.org/viewvc?rev=1077223&view=rev
Log:
commit dc981e7d0d74cc1ea1d0089b3fbc7d89af78a8b8
Author: Hemanth Yamijala <yhemanth@friendchild-lm.(none)>
Date:   Thu Feb 25 20:48:43 2010 +0530

    MAPREDUCE:1354 from https://issues.apache.org/jira/secure/attachment/12437010/mr-1354-y20.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1354. Make incremental changes in jobtracker for
    +    improving scalability (acmurthy)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Fri Mar  4 03:53:24 2011
@@ -461,7 +461,24 @@ class CapacityTaskScheduler extends Task
       Collections.sort(qsiForAssigningTasks, queueComparator);
     }
 
-
+    /**
+     * Ceil of result of dividing two integers.
+     * 
+     * This is *not* a utility method. 
+     * Neither <code>a</code> or <code>b</code> should be negative.
+     *  
+     * @param a
+     * @param b
+     * @return ceil of the result of a/b
+     */
+    private int divideAndCeil(int a, int b) {
+      if (b == 0) {
+        LOG.info("divideAndCeil called with a=" + a + " b=" + b);
+        return 0;
+      }
+      return (a + (b - 1)) / b;
+    }
+    
     private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
       // what is our current capacity? It is equal to the queue-capacity if
       // we're running below capacity. If we're running over capacity, then its
@@ -475,13 +492,15 @@ class CapacityTaskScheduler extends Task
       else {
         currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
       }
-      int limit = Math.max((int)(Math.ceil((double)currentCapacity/
-          (double)qsi.numJobsByUser.size())), 
-          (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+      int limit = 
+        Math.max(divideAndCeil(currentCapacity, qsi.numJobsByUser.size()), 
+                 divideAndCeil(qsi.ulMin*currentCapacity, 100));
       String user = j.getProfile().getUser();
       if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num slots occupied = " + 
-            tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("User " + user + " is over limit, num slots occupied=" + 
+                    tsi.numSlotsOccupiedByUser.get(user) + ", limit=" + limit);
+        }
         return true;
       }
       else {
@@ -814,8 +833,7 @@ class CapacityTaskScheduler extends Task
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      return 
-        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());
+      return job.getNumSlotsPerTask(TaskType.MAP);
     }
 
     @Override
@@ -831,7 +849,7 @@ class CapacityTaskScheduler extends Task
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //Check if job supports speculative map execution first then 
       //check if job has speculative maps.
-      return (job.getJobConf().getMapSpeculativeExecution())&& (
+      return (job.getMapSpeculativeExecution())&& (
           hasSpeculativeTask(job.getTasks(TaskType.MAP), 
               job.getStatus().mapProgress(), tts));
     }
@@ -877,8 +895,7 @@ class CapacityTaskScheduler extends Task
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      return
-        job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot()); 
  
+      return job.getNumSlotsPerTask(TaskType.REDUCE);    
     }
 
     @Override
@@ -894,7 +911,7 @@ class CapacityTaskScheduler extends Task
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //check if the job supports reduce speculative execution first then
       //check if the job has speculative tasks.
-      return (job.getJobConf().getReduceSpeculativeExecution()) && (
+      return (job.getReduceSpeculativeExecution()) && (
           hasSpeculativeTask(job.getTasks(TaskType.REDUCE), 
               job.getStatus().reduceProgress(), tts));
     }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
Fri Mar  4 03:53:24 2011
@@ -106,11 +106,11 @@ class MemoryMatcher {
     long totalMemUsableOnTT = 0;
     long memForThisTask = 0;
     if (taskType == TaskType.MAP) {
-      memForThisTask = job.getJobConf().getMemoryForMapTask();
+      memForThisTask = job.getMemoryForMapTask();
       totalMemUsableOnTT =
           scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
     } else if (taskType == TaskType.REDUCE) {
-      memForThisTask = job.getJobConf().getMemoryForReduceTask();
+      memForThisTask = job.getMemoryForReduceTask();
       totalMemUsableOnTT =
           scheduler.getMemSizeForReduceSlot()
               * taskTracker.getMaxReduceSlots();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Fri Mar  4 03:53:24 2011
@@ -98,8 +98,11 @@ public class JobInProgress {
   TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
-  int numSlotsPerMap = 1;
-  int numSlotsPerReduce = 1;
+  final long memoryPerMap;
+  final long memoryPerReduce;
+  volatile int numSlotsPerMap = 1;
+  volatile int numSlotsPerReduce = 1;
+  final int maxTaskFailuresPerTracker;
   
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
@@ -207,8 +210,8 @@ public class JobInProgress {
   private LocalFileSystem localFs;
   private FileSystem fs;
   private JobID jobId;
-  private boolean hasSpeculativeMaps;
-  private boolean hasSpeculativeReduces;
+  volatile private boolean hasSpeculativeMaps;
+  volatile private boolean hasSpeculativeReduces;
   private long inputLength = 0;
   private String user;
   private String historyFile = "";
@@ -299,6 +302,10 @@ public class JobInProgress {
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
     this.profile = new JobProfile(conf.getUser(), jobid, "", "",
                                   conf.getJobName(), conf.getQueueName());
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
     try {
@@ -374,12 +381,18 @@ public class JobInProgress {
 
     this.numMapTasks = conf.getNumMapTasks();
     this.numReduceTasks = conf.getNumReduceTasks();
+    
+    this.memoryPerMap = conf.getMemoryForMapTask();
+    this.memoryPerReduce = conf.getMemoryForReduceTask();
+    
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
        (numMapTasks + numReduceTasks + 10);
 
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
+    this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+    
     MetricsContext metricsContext = MetricsUtil.getContext("mapred");
     this.jobMetrics = MetricsUtil.createRecord(metricsContext, "job");
     this.jobMetrics.setTag("user", conf.getUser());
@@ -499,11 +512,27 @@ public class JobInProgress {
     return restartCount > 0;
   }
 
+  boolean getMapSpeculativeExecution() {
+    return hasSpeculativeMaps;
+  }
+  
+  boolean getReduceSpeculativeExecution() {
+    return hasSpeculativeReduces;
+  }
+  
+  long getMemoryForMapTask() {
+    return memoryPerMap;
+  }
+  
+  long getMemoryForReduceTask() {
+    return memoryPerReduce;
+  }
+  
   /**
    * Get the number of slots required to run a single map task-attempt.
    * @return the number of slots required to run a single map task-attempt
    */
-  synchronized int getNumSlotsPerMap() {
+  int getNumSlotsPerMap() {
     return numSlotsPerMap;
   }
 
@@ -512,7 +541,7 @@ public class JobInProgress {
    * This is typically set by schedulers which support high-ram jobs.
    * @param slots the number of slots required to run a single map task-attempt
    */
-  synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+  void setNumSlotsPerMap(int numSlotsPerMap) {
     this.numSlotsPerMap = numSlotsPerMap;
   }
 
@@ -520,7 +549,7 @@ public class JobInProgress {
    * Get the number of slots required to run a single reduce task-attempt.
    * @return the number of slots required to run a single reduce task-attempt
    */
-  synchronized int getNumSlotsPerReduce() {
+  int getNumSlotsPerReduce() {
     return numSlotsPerReduce;
   }
 
@@ -530,7 +559,7 @@ public class JobInProgress {
    * @param slots the number of slots required to run a single reduce 
    *              task-attempt
    */
-  synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+  void setNumSlotsPerReduce(int numSlotsPerReduce) {
     this.numSlotsPerReduce = numSlotsPerReduce;
   }
 
@@ -724,7 +753,7 @@ public class JobInProgress {
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
-  public synchronized int getNumSlotsPerTask(TaskType taskType) {
+  public int getNumSlotsPerTask(TaskType taskType) {
     if (taskType == TaskType.MAP) {
       return numSlotsPerMap;
     } else if (taskType == TaskType.REDUCE) {
@@ -1594,7 +1623,7 @@ public class JobInProgress {
       trackerToFailuresMap.put(trackerHostName, ++trackerFailures);
 
       // Check if this tasktracker has turned 'flaky'
-      if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
+      if (trackerFailures.intValue() == maxTaskFailuresPerTracker) {
         ++flakyTaskTrackers;
         
         // Cancel reservations if appropriate
@@ -1704,7 +1733,7 @@ public class JobInProgress {
   List<String> getBlackListedTrackers() {
     List<String> blackListedTrackers = new ArrayList<String>();
     for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
-       if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
+       if (e.getValue().intValue() >= maxTaskFailuresPerTracker) {
          blackListedTrackers.add(e.getKey());
        }
     }
@@ -2261,7 +2290,7 @@ public class JobInProgress {
     //
     int taskTrackerFailedTasks = getTrackerTaskFailures(taskTracker);
     if ((flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) && 
-        taskTrackerFailedTasks >= conf.getMaxTaskFailuresPerTracker()) {
+        taskTrackerFailedTasks >= maxTaskFailuresPerTracker) {
       if (LOG.isDebugEnabled()) {
         String flakyTracker = convertTrackerNameToHostName(taskTracker); 
         LOG.debug("Ignoring the black-listed tasktracker: '" + flakyTracker 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077223&r1=1077222&r2=1077223&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Fri Mar  4 03:53:24 2011
@@ -1840,7 +1840,8 @@ public class JobTracker implements MRCon
   //
 
   // All the known jobs.  (jobid->JobInProgress)
-  Map<JobID, JobInProgress> jobs = new TreeMap<JobID, JobInProgress>();
+  Map<JobID, JobInProgress> jobs =  
+    Collections.synchronizedMap(new TreeMap<JobID, JobInProgress>());
 
   // (user -> list of JobInProgress)
   TreeMap<String, ArrayList<JobInProgress>> userToJobsMap =
@@ -3609,15 +3610,20 @@ public class JobTracker implements MRCon
    * of the JobTracker.  But JobInProgress adds info that's useful for
    * the JobTracker alone.
    */
-  public synchronized JobStatus submitJob(
-      JobID jobId, String jobSubmitDir, TokenStorage ts)  throws IOException {
-    if(jobs.containsKey(jobId)) {
-      //job already running, don't start twice
-      return jobs.get(jobId).getStatus();
+  public JobStatus submitJob(JobID jobId, String jobSubmitDir, TokenStorage ts)
+      throws IOException {
+    JobInfo jobInfo = null;
+    synchronized (this) {
+      if (jobs.containsKey(jobId)) {
+        // job already running, don't start twice
+        return jobs.get(jobId).getStatus();
+      }
+      UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
+      jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
+          new Path(jobSubmitDir));
     }
-    UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
-    JobInfo jobInfo = new JobInfo(jobId, new Text(ugi.getShortUserName()),
-        new Path(jobSubmitDir));
+    // Create the JobInProgress, do not lock the JobTracker since
+    // we are about to copy job.xml from HDFS
     JobInProgress job = null;
     tokenStorage = ts;
     try {
@@ -3626,43 +3632,45 @@ public class JobTracker implements MRCon
       throw new IOException(e);
     }
     
-    String queue = job.getProfile().getQueueName();
-    if(!(queueManager.getQueues().contains(queue))) {      
-      job.fail();
-      throw new IOException("Queue \"" + queue + "\" does not exist");        
-    }
+    synchronized (this) {
+      String queue = job.getProfile().getQueueName();
+      if (!(queueManager.getQueues().contains(queue))) {
+        job.fail();
+        throw new IOException("Queue \"" + queue + "\" does not exist");
+      }
 
-    // check for access
-    try {
-      checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
-    } catch (IOException ioe) {
-       LOG.warn("Access denied for user " + job.getJobConf().getUser() 
-                + ". Ignoring job " + jobId, ioe);
-      job.fail();
-      throw ioe;
-    }
+      // check for access
+      try {
+        checkAccess(job, QueueManager.QueueOperation.SUBMIT_JOB);
+      } catch (IOException ioe) {
+        LOG.warn("Access denied for user " + job.getJobConf().getUser()
+            + ". Ignoring job " + jobId, ioe);
+        job.fail();
+        throw ioe;
+      }
 
-    // Check the job if it cannot run in the cluster because of invalid memory
-    // requirements.
-    try {
-      checkMemoryRequirements(job);
-    } catch (IOException ioe) {
-      throw ioe;
-    }
-    boolean recovered = true; //TODO: Once the Job recovery code is there,
-                              //(MAPREDUCE-873) we
-                              //must pass the "recovered" flag accurately.
-                              //This is handled in the trunk/0.22
-    if (!recovered) {
-      //Store the information in a file so that the job can be recovered
-      //later (if at all)
-      Path jobDir = getSystemDirectoryForJob(jobId);
-      FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
-      FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
-      jobInfo.write(out);
-      out.close();
+      // Check the job if it cannot run in the cluster because of invalid memory
+      // requirements.
+      try {
+        checkMemoryRequirements(job);
+      } catch (IOException ioe) {
+        throw ioe;
+      }
+      boolean recovered = true; // TODO: Once the Job recovery code is there,
+      // (MAPREDUCE-873) we
+      // must pass the "recovered" flag accurately.
+      // This is handled in the trunk/0.22
+      if (!recovered) {
+        // Store the information in a file so that the job can be recovered
+        // later (if at all)
+        Path jobDir = getSystemDirectoryForJob(jobId);
+        FileSystem.mkdirs(fs, jobDir, new FsPermission(SYSTEM_DIR_PERMISSION));
+        FSDataOutputStream out = fs.create(getSystemFileForJob(jobId));
+        jobInfo.write(out);
+        out.close();
+      }
+      return addJob(jobId, job);
     }
-    return addJob(jobId, job);
   }
 
   /**
@@ -3941,10 +3949,23 @@ public class JobTracker implements MRCon
     completedJobStatusStore.store(job);
   }
 
+  /**
+   * Check if the <code>job</code> has been initialized.
+   * 
+   * @param job {@link JobInProgress} to be checked
+   * @return <code>true</code> if the job has been initialized,
+   *         <code>false</code> otherwise
+   */
+  private boolean isJobInited(JobInProgress job) {
+    return job.inited(); 
+  }
+  
   public JobProfile getJobProfile(JobID jobid) {
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
+        // Safe to call JobInProgress.getProfile while holding the lock
+        // on the JobTracker since it isn't a synchronized method
         return job.getProfile();
       }  else {
         RetireJobInfo info = retireJobs.get(jobid);
@@ -3955,6 +3976,7 @@ public class JobTracker implements MRCon
     }
     return completedJobStatusStore.readJobProfile(jobid);
   }
+  
   public JobStatus getJobStatus(JobID jobid) {
     if (null == jobid) {
       LOG.warn("JobTracker.getJobStatus() cannot get status for null jobid");
@@ -3963,6 +3985,8 @@ public class JobTracker implements MRCon
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
+        // Safe to call JobInProgress.getStatus while holding the lock
+        // on the JobTracker since it isn't a synchronized method
         return job.getStatus();
       } else {
         
@@ -3974,19 +3998,24 @@ public class JobTracker implements MRCon
     }
     return completedJobStatusStore.readJobStatus(jobid);
   }
+  
+  private static final Counters EMPTY_COUNTERS = new Counters();
   public Counters getJobCounters(JobID jobid) {
     synchronized (this) {
       JobInProgress job = jobs.get(jobid);
       if (job != null) {
-        return job.getCounters();
+        return isJobInited(job) ? job.getCounters() : EMPTY_COUNTERS;
       } 
     }
     return completedJobStatusStore.readCounters(jobid);
   }
+  
+  private static final TaskReport[] EMPTY_TASK_REPORTS = new TaskReport[0];
+  
   public synchronized TaskReport[] getMapTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeMapTasks =
@@ -4007,8 +4036,8 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getReduceTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector completeReduceTasks = job.reportTasksInProgress(false, true);
@@ -4027,8 +4056,8 @@ public class JobTracker implements MRCon
 
   public synchronized TaskReport[] getCleanupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeTasks = job.reportCleanupTIPs(true);
@@ -4050,8 +4079,8 @@ public class JobTracker implements MRCon
   
   public synchronized TaskReport[] getSetupTaskReports(JobID jobid) {
     JobInProgress job = jobs.get(jobid);
-    if (job == null) {
-      return new TaskReport[0];
+    if (job == null || !isJobInited(job)) {
+      return EMPTY_TASK_REPORTS;
     } else {
       Vector<TaskReport> reports = new Vector<TaskReport>();
       Vector<TaskInProgress> completeTasks = job.reportSetupTIPs(true);
@@ -4070,8 +4099,6 @@ public class JobTracker implements MRCon
     }
   }
   
-  TaskCompletionEvent[] EMPTY_EVENTS = new TaskCompletionEvent[0];
-
   static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
       "mapred.cluster.map.memory.mb";
   static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
@@ -4087,21 +4114,22 @@ public class JobTracker implements MRCon
    * starting from fromEventId.
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getTaskCompletionEvents(java.lang.String,
int, int)
    */
-  public synchronized TaskCompletionEvent[] getTaskCompletionEvents(
+  public TaskCompletionEvent[] getTaskCompletionEvents(
       JobID jobid, int fromEventId, int maxEvents) throws IOException{
-    synchronized (this) {
-      JobInProgress job = this.jobs.get(jobid);
-      if (null != job) {
-        if (job.inited()) {
-          return job.getTaskCompletionEvents(fromEventId, maxEvents);
-        } else {
-          return EMPTY_EVENTS;
-        }
-      }
+    JobInProgress job = this.jobs.get(jobid);
+      
+    if (null != job) {
+      return isJobInited(job) ? 
+          job.getTaskCompletionEvents(fromEventId, maxEvents) : 
+          TaskCompletionEvent.EMPTY_ARRAY;
     }
-    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, fromEventId, maxEvents);
+
+    return completedJobStatusStore.readJobTaskCompletionEvents(jobid, 
+                                                               fromEventId, 
+                                                               maxEvents);
   }
 
+  private static final String[] EMPTY_TASK_DIAGNOSTICS = new String[0];
   /**
    * Get the diagnostics for a given task
    * @param taskId the id of the task
@@ -4113,7 +4141,7 @@ public class JobTracker implements MRCon
     JobID jobId = taskId.getJobID();
     TaskID tipId = taskId.getTaskID();
     JobInProgress job = jobs.get(jobId);
-    if (job != null) {
+    if (job != null && isJobInited(job)) {
       TaskInProgress tip = job.getTaskInProgress(tipId);
       if (tip != null) {
         taskDiagnosticInfo = tip.getDiagnosticInfo(taskId);
@@ -4121,8 +4149,8 @@ public class JobTracker implements MRCon
       
     }
     
-    return ((taskDiagnosticInfo == null) ? new String[0] 
-            : taskDiagnosticInfo.toArray(new String[0]));
+    return ((taskDiagnosticInfo == null) ? EMPTY_TASK_DIAGNOSTICS :
+             taskDiagnosticInfo.toArray(new String[taskDiagnosticInfo.size()]));
   }
     
   /** Get all the TaskStatuses from the tipid. */
@@ -4722,8 +4750,8 @@ public class JobTracker implements MRCon
 
     boolean invalidJob = false;
     String msg = "";
-    long maxMemForMapTask = job.getJobConf().getMemoryForMapTask();
-    long maxMemForReduceTask = job.getJobConf().getMemoryForReduceTask();
+    long maxMemForMapTask = job.getMemoryForMapTask();
+    long maxMemForReduceTask = job.getMemoryForReduceTask();
 
     if (maxMemForMapTask == JobConf.DISABLED_MEMORY_LIMIT
         || maxMemForReduceTask == JobConf.DISABLED_MEMORY_LIMIT) {



Mime
View raw message