hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acmur...@apache.org
Subject svn commit: r1390016 - in /hadoop/common/branches/branch-1.1: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/java/org/apache/hadoop/ma...
Date Tue, 25 Sep 2012 17:46:35 GMT
Author: acmurthy
Date: Tue Sep 25 17:46:34 2012
New Revision: 1390016

URL: http://svn.apache.org/viewvc?rev=1390016&view=rev
Log:
Merge -c 1377714 from branch-1 to branch-1.1 to fix MAPREDUCE-4328. Add a JobTracker safemode
to allow it to be resilient to NameNode failures.

Added:
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
      - copied unchanged from r1377714, hadoop/common/branches/branch-1/src/mapred/org/apache/hadoop/mapred/HDFSMonitorThread.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
      - copied unchanged from r1377714, hadoop/common/branches/branch-1/src/test/org/apache/hadoop/mapred/TestJobTrackerQuiescence.java
Modified:
    hadoop/common/branches/branch-1.1/CHANGES.txt
    hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
    hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
    hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Sep 25 17:46:34 2012
@@ -54,6 +54,13 @@ Release 1.1.0 - 2012.09.16
     configured timeout and are selected as the last location to read from.
     (Jing Zhao via szetszwo)
 
+    MAPREDUCE-4328. Add a JobTracker safemode to allow it to be resilient to
+    NameNode failures. The safemode can be entered either automatically via
+    the configurable background thread to monitor the NameNode or by the
+    admin. In the safemode the JobTracker doesn't schedule new tasks, marks
+    all failed tasks as KILLED for future retries and doesn't accept new job
+    submissions. (acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-8656. Backport forced daemon shutdown of HADOOP-8353 into branch-1

Modified: hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
Tue Sep 25 17:46:34 2012
@@ -1042,10 +1042,20 @@ class CapacityTaskScheduler extends Task
      */ 
     updateAllQueues(mapClusterCapacity, reduceClusterCapacity);
     
-    // schedule tasks
+    /*
+     * Schedule tasks
+     */
+    
     List<Task> result = new ArrayList<Task>();
-    addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
-    addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+    
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+    } else {
+      addMapTasks(taskTracker, result, maxMapSlots, currentMapSlots);
+      addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+    }
+    
     return result;
   }
 

Modified: hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Tue Sep 25 17:46:34 2012
@@ -688,6 +688,12 @@ public class TestCapacityScheduler exten
     public QueueManager getQueueManager() {
       return qm;
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   // represents a fake queue configuration info

Modified: hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
Tue Sep 25 17:46:34 2012
@@ -412,7 +412,13 @@ public class FairScheduler extends TaskS
 
     // Update time waited for local maps for jobs skipped on last heartbeat
     updateLocalityWaitTimes(currentTime);
-    
+
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+      return null;
+    } 
+
     TaskTrackerStatus tts = tracker.getStatus();
 
     int mapsAssigned = 0; // loop counter for map in the below while loop

Modified: hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Tue Sep 25 17:46:34 2012
@@ -513,6 +513,12 @@ public class TestFairScheduler extends T
       trackerForTip.get(attemptIdStr).getTaskReports().remove(status);
       return true;
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   protected JobConf conf;

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AdminOperationsProtocol.java
Tue Sep 25 17:46:34 2012
@@ -50,4 +50,13 @@ public interface AdminOperationsProtocol
    * Refresh the node list at the {@link JobTracker} 
    */
   void refreshNodes() throws IOException;
+  
+  /**
+   * Set safe mode for the JobTracker.
+   * @param safeModeAction safe mode action
+   * @return current safemode
+   * @throws IOException
+   */
+  boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException;
 }

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/AuditLogger.java
Tue Sep 25 17:46:34 2012
@@ -34,6 +34,9 @@ class AuditLogger {
     static final String REFRESH_QUEUE = "REFRESH_QUEUE";
     static final String REFRESH_NODES = "REFRESH_NODES";
     
+    static final String GET_SAFEMODE = "GET_SAFEMODE";
+    static final String SET_SAFEMODE = "SET_SAFEMODE";
+    
     // Some commonly used descriptions
     static final String UNAUTHORIZED_USER = "Unauthorized user";
   }

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
Tue Sep 25 17:46:34 2012
@@ -80,6 +80,12 @@ class JobQueueTaskScheduler extends Task
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
+    // Check for JT safe-mode
+    if (taskTrackerManager.isInSafeMode()) {
+      LOG.info("JobTracker is in safe-mode, not scheduling any tasks.");
+      return null;
+    } 
+
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Tue Sep 25 17:46:34 2012
@@ -18,13 +18,9 @@
 package org.apache.hadoop.mapred;
 
 
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.PrintWriter;
-import java.io.InputStreamReader;
 import java.io.Writer;
 import java.lang.management.ManagementFactory;
 import java.net.BindException;
@@ -53,6 +49,7 @@ import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -61,27 +58,33 @@ import org.apache.hadoop.fs.FSDataInputS
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.LocalDirAllocator;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.mapred.JobSubmissionProtocol;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ipc.RPC.VersionMismatch;
+import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.AuditLogger.Constants;
-import org.apache.hadoop.mapred.Counters.CountersExceededException;
 import org.apache.hadoop.mapred.JobHistory.Keys;
-import org.apache.hadoop.mapred.JobHistory.Listener;
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.QueueManager.QueueACL;
 import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.mapreduce.ClusterMetrics;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
+import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.mapreduce.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -89,6 +92,7 @@ import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.Groups;
 import org.apache.hadoop.security.RefreshUserMappingsProtocol;
 import org.apache.hadoop.security.SecurityUtil;
@@ -96,6 +100,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.authorize.AuthorizationException;
+import org.apache.hadoop.security.authorize.PolicyProvider;
 import org.apache.hadoop.security.authorize.ProxyUsers;
 import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
@@ -104,16 +109,6 @@ import org.apache.hadoop.util.HostsFileR
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
-
-import org.apache.hadoop.mapreduce.ClusterMetrics;
-import org.apache.hadoop.mapreduce.JobSubmissionFiles;
-import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
-import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.security.Credentials;
 import org.mortbay.util.ajax.JSON;
 
 /*******************************************************
@@ -203,7 +198,7 @@ public class JobTracker implements MRCon
   
   public static enum State { INITIALIZING, RUNNING }
   State state = State.INITIALIZING;
-  private static final int FS_ACCESS_RETRY_PERIOD = 10000;
+  private static final int FS_ACCESS_RETRY_PERIOD = 1000;
   static final String JOB_INFO_FILE = "job-info";
   static final String JOB_TOKEN_FILE = "jobToken";
   private DNSToSwitchMapping dnsToSwitchMapping;
@@ -276,6 +271,16 @@ public class JobTracker implements MRCon
     return clock;
   }
     
+  static final String JT_HDFS_MONITOR_ENABLE = 
+      "mapreduce.jt.hdfs.monitor.enable";
+  static final boolean DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE = false;
+  
+  static final String JT_HDFS_MONITOR_THREAD_INTERVAL = 
+      "mapreduce.jt.hdfs.monitor.interval.ms";
+  static final int DEFAULT_JT_HDFS_MONITOR_THREAD_INTERVAL_MS = 5000;
+  
+  private Thread hdfsMonitor;
+
   /**
    * Start the JobTracker with given configuration.
    * 
@@ -1872,8 +1877,179 @@ public class JobTracker implements MRCon
     this(conf, identifier, clock, new QueueManager(new Configuration(conf)));
   } 
   
+  private void initJTConf(JobConf conf) {
+    if (conf.getBoolean(
+        DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false)) {
+      LOG.warn(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY + 
+          " is enabled, disabling it");
+      conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, false);
+    }
+  }
+  
+  private void initializeFilesystem() throws IOException, InterruptedException {
+    // Connect to HDFS NameNode
+    while (!Thread.currentThread().isInterrupted() && fs == null) {
+      try {
+        fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
+          public FileSystem run() throws IOException {
+            return FileSystem.get(conf);
+          }});
+      } catch (IOException ie) {
+        fs = null;
+        LOG.info("Problem connecting to HDFS Namenode... re-trying", ie);
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Ensure HDFS is healthy
+    if ("hdfs".equalsIgnoreCase(fs.getUri().getScheme())) {
+      while (!DistributedFileSystem.isHealthy(fs.getUri())) {
+        LOG.info("HDFS initialized but not 'healthy' yet, waiting...");
+        Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+      }
+    }
+  }
+  
+  private void initialize() 
+      throws IOException, InterruptedException {
+    // initialize history parameters.
+    final JobTracker jtFinal = this;
+    
+    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
+      @Override
+      public Boolean run() throws Exception {
+        JobHistory.init(jtFinal, conf, jtFinal.localMachine, 
+            jtFinal.startTime);
+        return true;
+      }
+    });
+
+    // start the recovery manager
+    recoveryManager = new RecoveryManager();
+    
+    while (!Thread.currentThread().isInterrupted()) {
+      try {
+        // if we haven't contacted the namenode go ahead and do it
+        // clean up the system dir, which will only work if hdfs is out of 
+        // safe mode
+        if(systemDir == null) {
+          systemDir = new Path(getSystemDir());    
+        }
+        try {
+          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
+          if (!systemDirStatus.getOwner().equals(
+              getMROwner().getShortUserName())) {
+            throw new AccessControlException("The systemdir " + systemDir +
+                " is not owned by " + getMROwner().getShortUserName());
+          }
+          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
+            LOG.warn("Incorrect permissions on " + systemDir +
+                ". Setting it to " + SYSTEM_DIR_PERMISSION);
+            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
+          }
+        } catch (FileNotFoundException fnf) {} //ignore
+        // Make sure that the backup data is preserved
+        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
+        // Check if the history is enabled .. as we cant have persistence with 
+        // history disabled
+        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
+            && systemDirData != null) {
+          for (FileStatus status : systemDirData) {
+            try {
+              recoveryManager.checkAndAddJob(status);
+            } catch (Throwable t) {
+              LOG.warn("Failed to add the job " + status.getPath().getName(), 
+                       t);
+            }
+          }
+          
+          // Check if there are jobs to be recovered
+          hasRestarted = recoveryManager.shouldRecover();
+          if (hasRestarted) {
+            break; // if there is something to recover else clean the sys dir
+          }
+        }
+        LOG.info("Cleaning up the system directory");
+        fs.delete(systemDir, true);
+        if (FileSystem.mkdirs(fs, systemDir, 
+            new FsPermission(SYSTEM_DIR_PERMISSION))) {
+          break;
+        }
+        LOG.error("Mkdirs failed to create " + systemDir);
+      } catch (AccessControlException ace) {
+        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
+                 + ") because of permissions.");
+        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
+                 + ") and then start the JobTracker.");
+        LOG.warn("Bailing out ... ", ace);
+        throw ace;
+      } catch (IOException ie) {
+        LOG.info("problem cleaning system directory: " + systemDir, ie);
+      }
+      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
+    }
+    
+    if (Thread.currentThread().isInterrupted()) {
+      throw new InterruptedException();
+    }
+    
+    // Same with 'localDir' except it's always on the local disk.
+    if (!hasRestarted) {
+      conf.deleteLocalFiles(SUBDIR);
+    }
+
+    // Initialize history DONE folder
+    FileSystem historyFS = getMROwner().doAs(
+        new PrivilegedExceptionAction<FileSystem>() {
+      public FileSystem run() throws IOException {
+        JobHistory.initDone(conf, fs);
+        final String historyLogDir = 
+          JobHistory.getCompletedJobHistoryLocation().toString();
+        infoServer.setAttribute("historyLogDir", historyLogDir);
+
+        infoServer.setAttribute
+          ("serialNumberDirectoryDigits",
+           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
+
+        infoServer.setAttribute
+          ("serialNumberTotalDigits",
+           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
+        
+        return new Path(historyLogDir).getFileSystem(conf);
+      }
+    });
+    infoServer.setAttribute("fileSys", historyFS);
+    infoServer.setAttribute("jobConf", conf);
+    infoServer.setAttribute("aclManager", aclsManager);
+
+    if (JobHistoryServer.isEmbedded(conf)) {
+      LOG.info("History server being initialized in embedded mode");
+      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
+      jobHistoryServer.start();
+      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
+    }
+
+    //initializes the job status store
+    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
+    
+    // Setup HDFS monitoring
+    if (this.conf.getBoolean(
+        JT_HDFS_MONITOR_ENABLE, DEFAULT_JT_HDFS_MONITOR_THREAD_ENABLE)) {
+      hdfsMonitor = new HDFSMonitorThread(this.conf, this, this.fs);
+      hdfsMonitor.start();
+    }
+
+  }
+  
   JobTracker(final JobConf conf, String identifier, Clock clock, QueueManager qm) 
-  throws IOException, InterruptedException { 
+  throws IOException, InterruptedException {
+    
+    initJTConf(conf);
+    
     this.queueManager = qm;
     this.clock = clock;
     // Set ports, start RPC servers, setup security policy etc.
@@ -1979,7 +2155,12 @@ public class JobTracker implements MRCon
     // Set service-level authorization security policy
     if (conf.getBoolean(
           ServiceAuthorizationManager.SERVICE_AUTHORIZATION_CONFIG, false)) {
-      ServiceAuthorizationManager.refresh(conf, new MapReducePolicyProvider());
+      PolicyProvider policyProvider = 
+          (PolicyProvider)(ReflectionUtils.newInstance(
+              conf.getClass(PolicyProvider.POLICY_PROVIDER_CONFIG, 
+                  MapReducePolicyProvider.class, PolicyProvider.class), 
+              conf));
+        ServiceAuthorizationManager.refresh(conf, policyProvider);
     }
     
     int handlerCount = conf.getInt("mapred.job.tracker.handler.count", 10);
@@ -2006,16 +2187,6 @@ public class JobTracker implements MRCon
     infoServer = new HttpServer("job", infoBindAddress, tmpInfoPort, 
         tmpInfoPort == 0, conf, aclsManager.getAdminsAcl());
     infoServer.setAttribute("job.tracker", this);
-    // initialize history parameters.
-    final JobTracker jtFinal = this;
-    getMROwner().doAs(new PrivilegedExceptionAction<Boolean>() {
-      @Override
-      public Boolean run() throws Exception {
-        JobHistory.init(jtFinal, conf,jtFinal.localMachine, 
-            jtFinal.startTime);
-        return true;
-      }
-    });
     
     infoServer.addServlet("reducegraph", "/taskgraph", TaskGraphServlet.class);
     infoServer.start();
@@ -2035,125 +2206,12 @@ public class JobTracker implements MRCon
         infoBindAddress + ":" + this.infoPort); 
     LOG.info("JobTracker webserver: " + this.infoServer.getPort());
     
-    // start the recovery manager
-    recoveryManager = new RecoveryManager();
-    
-    while (!Thread.currentThread().isInterrupted()) {
-      try {
-        // if we haven't contacted the namenode go ahead and do it
-        if (fs == null) {
-          fs = getMROwner().doAs(new PrivilegedExceptionAction<FileSystem>() {
-            public FileSystem run() throws IOException {
-              return FileSystem.get(conf);
-          }});
-        }
-        // clean up the system dir, which will only work if hdfs is out of 
-        // safe mode
-        if(systemDir == null) {
-          systemDir = new Path(getSystemDir());    
-        }
-        try {
-          FileStatus systemDirStatus = fs.getFileStatus(systemDir);
-          if (!systemDirStatus.getOwner().equals(
-              getMROwner().getShortUserName())) {
-            throw new AccessControlException("The systemdir " + systemDir +
-                " is not owned by " + getMROwner().getShortUserName());
-          }
-          if (!systemDirStatus.getPermission().equals(SYSTEM_DIR_PERMISSION)) {
-            LOG.warn("Incorrect permissions on " + systemDir +
-                ". Setting it to " + SYSTEM_DIR_PERMISSION);
-            fs.setPermission(systemDir,new FsPermission(SYSTEM_DIR_PERMISSION));
-          }
-        } catch (FileNotFoundException fnf) {} //ignore
-        // Make sure that the backup data is preserved
-        FileStatus[] systemDirData = fs.listStatus(this.systemDir);
-        // Check if the history is enabled .. as we cant have persistence with 
-        // history disabled
-        if (conf.getBoolean("mapred.jobtracker.restart.recover", false) 
-            && systemDirData != null) {
-          for (FileStatus status : systemDirData) {
-            try {
-              recoveryManager.checkAndAddJob(status);
-            } catch (Throwable t) {
-              LOG.warn("Failed to add the job " + status.getPath().getName(), 
-                       t);
-            }
-          }
-          
-          // Check if there are jobs to be recovered
-          hasRestarted = recoveryManager.shouldRecover();
-          if (hasRestarted) {
-            break; // if there is something to recover else clean the sys dir
-          }
-        }
-        LOG.info("Cleaning up the system directory");
-        fs.delete(systemDir, true);
-        if (FileSystem.mkdirs(fs, systemDir, 
-            new FsPermission(SYSTEM_DIR_PERMISSION))) {
-          break;
-        }
-        LOG.error("Mkdirs failed to create " + systemDir);
-      } catch (AccessControlException ace) {
-        LOG.warn("Failed to operate on mapred.system.dir (" + systemDir 
-                 + ") because of permissions.");
-        LOG.warn("Manually delete the mapred.system.dir (" + systemDir 
-                 + ") and then start the JobTracker.");
-        LOG.warn("Bailing out ... ", ace);
-        throw ace;
-      } catch (IOException ie) {
-        LOG.info("problem cleaning system directory: " + systemDir, ie);
-      }
-      Thread.sleep(FS_ACCESS_RETRY_PERIOD);
-    }
-    
-    if (Thread.currentThread().isInterrupted()) {
-      throw new InterruptedException();
-    }
-    
-    // Same with 'localDir' except it's always on the local disk.
-    if (!hasRestarted) {
-      jobConf.deleteLocalFiles(SUBDIR);
-    }
-
-    // Initialize history DONE folder
-    FileSystem historyFS = getMROwner().doAs(
-        new PrivilegedExceptionAction<FileSystem>() {
-      public FileSystem run() throws IOException {
-        JobHistory.initDone(conf, fs);
-        final String historyLogDir = 
-          JobHistory.getCompletedJobHistoryLocation().toString();
-        infoServer.setAttribute("historyLogDir", historyLogDir);
-
-        infoServer.setAttribute
-          ("serialNumberDirectoryDigits",
-           Integer.valueOf(JobHistory.serialNumberDirectoryDigits()));
-
-        infoServer.setAttribute
-          ("serialNumberTotalDigits",
-           Integer.valueOf(JobHistory.serialNumberTotalDigits()));
-        
-        return new Path(historyLogDir).getFileSystem(conf);
-      }
-    });
-    infoServer.setAttribute("fileSys", historyFS);
-    infoServer.setAttribute("jobConf", conf);
-    infoServer.setAttribute("aclManager", aclsManager);
-
-    if (JobHistoryServer.isEmbedded(conf)) {
-      LOG.info("History server being initialized in embedded mode");
-      jobHistoryServer = new JobHistoryServer(conf, aclsManager, infoServer);
-      jobHistoryServer.start();
-      LOG.info("Job History Server web address: " + JobHistoryServer.getAddress(conf));
-    }
-
     this.dnsToSwitchMapping = ReflectionUtils.newInstance(
         conf.getClass("topology.node.switch.mapping.impl", ScriptBasedMapping.class,
             DNSToSwitchMapping.class), conf);
     this.numTaskCacheLevels = conf.getInt("mapred.task.cache.levels", 
         NetworkTopology.DEFAULT_HOST_LEVEL);
 
-    //initializes the job status store
-    completedJobStatusStore = new CompletedJobStatusStore(conf, aclsManager);
   }
 
   private static SimpleDateFormat getDateFormat() {
@@ -2244,6 +2302,17 @@ public class JobTracker implements MRCon
    * Run forever
    */
   public void offerService() throws InterruptedException, IOException {
+    // start the inter-tracker server 
+    this.interTrackerServer.start();
+    
+    // Initialize the JobTracker FileSystem within safemode
+    setSafeModeInternal(SafeModeAction.SAFEMODE_ENTER);
+    initializeFilesystem();
+    setSafeModeInternal(SafeModeAction.SAFEMODE_LEAVE);
+    
+    // Initialize JobTracker
+    initialize();
+    
     // Prepare for recovery. This is done irrespective of the status of restart
     // flag.
     while (true) {
@@ -2283,12 +2352,10 @@ public class JobTracker implements MRCon
       completedJobsStoreThread.start();
     }
 
-    // start the inter-tracker server once the jt is ready
-    this.interTrackerServer.start();
-    
     synchronized (this) {
       state = State.RUNNING;
     }
+    
     LOG.info("Starting RUNNING");
     
     this.interTrackerServer.join();
@@ -3488,6 +3555,12 @@ public class JobTracker implements MRCon
   // returns cleanup tasks first, then setup tasks.
   synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
+    
+    // Don't assign *any* new task in safemode
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     int maxMapTasks = taskTracker.getMaxMapSlots();
     int maxReduceTasks = taskTracker.getMaxReduceSlots();
     int numMaps = taskTracker.countOccupiedMapSlots();
@@ -3622,6 +3695,10 @@ public class JobTracker implements MRCon
   public JobStatus submitJob(JobID jobId, String jobSubmitDir,
       UserGroupInformation ugi, Credentials ts, boolean recovered)
       throws IOException {
+    if (isInSafeMode()) {
+      throw new IOException("JobTracker in safemode");
+    }
+    
     JobInfo jobInfo = null;
     if (ugi == null) {
       ugi = UserGroupInformation.getCurrentUser();
@@ -4315,6 +4392,11 @@ public class JobTracker implements MRCon
    * @see org.apache.hadoop.mapred.JobSubmissionProtocol#getSystemDir()
    */
   public String getSystemDir() {
+    // Might not be initialized yet, TT handles this
+    if (isInSafeMode()) {
+      return null;
+    }
+    
     Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system")); 

     return fs.makeQualified(sysDir).toString();
   }
@@ -5083,4 +5165,81 @@ public class JobTracker implements MRCon
     return map;
   }
   // End MXbean implementaiton
+
+  /**
+   * JobTracker SafeMode
+   */
+  // SafeMode actions
+  public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
+  
+  private AtomicBoolean safeMode = new AtomicBoolean(false);
+  private AtomicBoolean adminSafeMode = new AtomicBoolean(false);
+  private String adminSafeModeUser = "";
+  
+  public boolean setSafeMode(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+
+    // Anyone can check JT safe-mode
+    if (safeModeAction == SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = this.safeMode.get();
+      LOG.info("Getting safemode information: safemode=" + safeMode + ". " +
+          "Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      AuditLogger.logSuccess(user, Constants.GET_SAFEMODE, 
+          Constants.JOBTRACKER);
+      return safeMode;
+    }
+    
+    // Check access for modifications to safe-mode
+    if (!aclsManager.isMRAdmin(UserGroupInformation.getCurrentUser())) {
+      AuditLogger.logFailure(user, Constants.SET_SAFEMODE, 
+          aclsManager.getAdminsAcl().toString(), Constants.JOBTRACKER, 
+          Constants.UNAUTHORIZED_USER);
+      throw new AccessControlException(user + 
+                                       " is not authorized to set " +
+                                       " JobTracker safemode.");
+    }
+    AuditLogger.logSuccess(user, Constants.SET_SAFEMODE, Constants.JOBTRACKER);
+
+    boolean currSafeMode = setSafeModeInternal(safeModeAction);
+    adminSafeMode.set(currSafeMode);
+    adminSafeModeUser = user;
+    return currSafeMode;
+  }
+  
+  boolean isInAdminSafeMode() {
+    return adminSafeMode.get();
+  }
+  
+  boolean setSafeModeInternal(JobTracker.SafeModeAction safeModeAction) 
+      throws IOException {
+    if (safeModeAction != SafeModeAction.SAFEMODE_GET) {
+      boolean safeMode = false;
+      if (safeModeAction == SafeModeAction.SAFEMODE_ENTER) {
+        safeMode = true;
+      } else if (safeModeAction == SafeModeAction.SAFEMODE_LEAVE) {
+        safeMode = false;
+      }
+      LOG.info("Setting safe mode to " + safeMode + ". Requested by : " +
+          UserGroupInformation.getCurrentUser().getShortUserName());
+      this.safeMode.set(safeMode);
+    }
+    return this.safeMode.get();
+  }
+
+  public boolean isInSafeMode() {
+    return safeMode.get();
+  }
+  
+  String getSafeModeText() {
+    if (!isInSafeMode())
+      return "OFF";
+    String safeModeInfo = 
+        adminSafeMode.get() ? 
+            "Set by admin <strong>" + adminSafeModeUser + "</strong>": 
+            "HDFS unavailable";
+    return "<em>ON - " + safeModeInfo + "</em>";
+  }
+  
 }

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Tue Sep 25 17:46:34 2012
@@ -31,6 +31,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.SortedRanges.Range;
+import org.apache.hadoop.mapred.TaskStatus.State;
 import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.net.Node;
@@ -604,10 +605,18 @@ class TaskInProgress {
           
       changed = oldState != newState;
     }
+    
     // if task is a cleanup attempt, do not replace the complete status,
     // update only specific fields.
     // For example, startTime should not be updated, 
     // but finishTime has to be updated.
+    
+    // Don't fail tasks when JobTracker is in safe-mode
+    if (status.getRunState() == State.FAILED && jobtracker.isInSafeMode()) {
+      LOG.info("JT is in safe-mode; marking " + taskid + " as KILLED");
+      status.setRunState(State.KILLED);
+    }
+
     if (!isCleanupAttempt(taskid)) {
       taskStatuses.put(taskid, status);
     } else {

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
Tue Sep 25 17:46:34 2012
@@ -1704,8 +1704,17 @@ public class TaskTracker implements MRCo
           }
           
           String dir = jobClient.getSystemDir();
-          if (dir == null) {
-            throw new IOException("Failed to get system directory");
+          while (dir == null) {
+            LOG.info("Failed to get system directory...");
+            
+            // Re-try
+            try {
+              // Sleep interval: 1000 ms - 5000 ms
+              int sleepInterval = 1000 + r.nextInt(4000);
+              Thread.sleep(sleepInterval);
+            } catch (InterruptedException ie) 
+            {}
+            dir = jobClient.getSystemDir();
           }
           systemDirectory = new Path(dir);
           systemFS = systemDirectory.getFileSystem(fConf);

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/TaskTrackerManager.java
Tue Sep 25 17:46:34 2012
@@ -112,4 +112,10 @@ interface TaskTrackerManager {
    * @param job JobInProgress object
    */
   public void failJob(JobInProgress job);
+  
+  /**
+   * Get safe mode.
+   * @return
+   */
+  public boolean isInSafeMode();
 }

Modified: hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
(original)
+++ hadoop/common/branches/branch-1.1/src/mapred/org/apache/hadoop/mapred/tools/MRAdmin.java
Tue Sep 25 17:46:34 2012
@@ -22,7 +22,6 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.mapred.AdminOperationsProtocol;
@@ -57,7 +56,9 @@ public class MRAdmin extends Configured 
     "The full syntax is: \n\n" +
     "hadoop mradmin [-refreshServiceAcl] [-refreshQueues] " +
     "[-refreshNodes] [-refreshUserToGroupsMappings] " +
-    "[-refreshSuperUserGroupsConfiguration] [-help [cmd]]\n";
+    "[-refreshSuperUserGroupsConfiguration] " +
+    "[-safemode <enter | leave | wait | get> " +
+    "[-help [cmd]]\n";
 
   String refreshServiceAcl = "-refreshServiceAcl: Reload the service-level authorization
policy file\n" +
     "\t\tJobtracker will reload the authorization policy file.\n";
@@ -74,6 +75,14 @@ public class MRAdmin extends Configured 
   String refreshNodes =
     "-refreshNodes: Refresh the hosts information at the jobtracker.\n";
   
+  String safemode = "-safemode <enter|leave|get|wait>:  Safe mode maintenance command.\n"
+ 
+      "\t\tSafe mode is a JobTracker state in which it\n" +
+      "\t\t\t1.  does not accept new job submissions\n" +
+      "\t\t\t2.  does not schedule any new tasks\n" +
+      "\t\t\t3.  does not fail any tasks due to any error\n" +
+      "\t\tSafe mode can be entered manually, but then\n" +
+      "\t\tit can only be turned off manually as well.\n";
+      
   String help = "-help [cmd]: \tDisplays help for the given command or all commands if none\n"
+
     "\t\tis specified.\n";
 
@@ -87,6 +96,8 @@ public class MRAdmin extends Configured 
     System.out.println(refreshSuperUserGroupsConfiguration);
   }  else if ("refreshNodes".equals(cmd)) {
     System.out.println(refreshNodes);
+  }  else if ("safemode".equals(cmd)) {
+    System.out.println(safemode);
   } else if ("help".equals(cmd)) {
     System.out.println(help);
   } else {
@@ -125,7 +136,8 @@ public class MRAdmin extends Configured 
       System.err.println("           [-refreshQueues]");
       System.err.println("           [-refreshUserToGroupsMappings]");
       System.err.println("           [-refreshSuperUserGroupsConfiguration]");
-      System.err.println("           [-refreshNodes]");
+      System.err.println("           [-refreshNodes]");      
+      System.err.println("           [-safemode <enter | leave | get | wait>]");
       System.err.println("           [-help [cmd]]");
       System.err.println();
       ToolRunner.printGenericCommandUsage(System.err);
@@ -208,6 +220,58 @@ public class MRAdmin extends Configured 
     return 0;
   }
 
+  private int setSafeMode(String actionString) throws IOException {
+    JobTracker.SafeModeAction action;
+    Boolean waitExitSafe = false;
+
+    if ("leave".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_LEAVE;
+    } else if ("enter".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_ENTER;
+    } else if ("get".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+    } else if ("wait".equalsIgnoreCase(actionString)) {
+      action = JobTracker.SafeModeAction.SAFEMODE_GET;
+      waitExitSafe = true;
+    } else {
+      printUsage("-safemode");
+      return -1;
+    }
+
+    // Get the current configuration
+    Configuration conf = getConf();
+    
+    // Create the client
+    AdminOperationsProtocol adminOperationsProtocol = 
+      (AdminOperationsProtocol) 
+      RPC.getProxy(AdminOperationsProtocol.class, 
+                   AdminOperationsProtocol.versionID, 
+                   JobTracker.getAddress(conf), getUGI(conf), conf,
+                   NetUtils.getSocketFactory(conf, 
+                                             AdminOperationsProtocol.class));
+    
+
+    boolean inSafeMode = adminOperationsProtocol.setSafeMode(action);
+
+    //
+    // If we are waiting for safemode to exit, then poll and
+    // sleep till we are out of safemode.
+    //
+    if (waitExitSafe) {
+      while (inSafeMode) {
+        try {
+          Thread.sleep(3000);
+        } catch (java.lang.InterruptedException e) {
+          throw new IOException("Wait Interrupted");
+        }
+        inSafeMode = adminOperationsProtocol.setSafeMode(action);
+      }
+    }
+
+    System.out.println("Safe mode is " + (inSafeMode ? "ON" : "OFF"));
+
+    return 0;
+  }
   
   /**
    * refreshSuperUserGroupsConfiguration {@link JobTracker}.
@@ -297,6 +361,12 @@ public class MRAdmin extends Configured 
         return exitCode;
       }
     }
+    if ("-safemode".equals(cmd)) {
+      if (args.length != 2) {
+        printUsage(cmd);
+        return exitCode;
+      }
+    }
     
     exitCode = 0;
     try {
@@ -310,6 +380,8 @@ public class MRAdmin extends Configured 
         exitCode = refreshSuperUserGroupsConfiguration();
       } else if ("-refreshNodes".equals(cmd)) {
         exitCode = refreshNodes();
+      } else if ("-safemode".equals(cmd)) {
+        exitCode = setSafeMode(args[i++]);
       } else if ("-help".equals(cmd)) {
         if (i < args.length) {
           printUsage(args[i]);

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
Tue Sep 25 17:46:34 2012
@@ -238,6 +238,12 @@ public class TestJobQueueTaskScheduler e
       status.setRunState(TaskStatus.State.RUNNING);
       trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
     
   }
   

Modified: hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
(original)
+++ hadoop/common/branches/branch-1.1/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
Tue Sep 25 17:46:34 2012
@@ -183,6 +183,12 @@ public class TestParallelInitialization 
         listener.jobAdded(job);
       }
     }
+
+    @Override
+    public boolean isInSafeMode() {
+      // TODO Auto-generated method stub
+      return false;
+    }
   }
   
   protected JobConf jobConf;

Modified: hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp?rev=1390016&r1=1390015&r2=1390016&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/common/branches/branch-1.1/src/webapps/job/jobtracker.jsp Tue Sep 25 17:46:34 2012
@@ -107,7 +107,7 @@
 <b>Compiled:</b> <%= VersionInfo.getDate()%> by 
                  <%= VersionInfo.getUser()%><br>
 <b>Identifier:</b> <%= tracker.getTrackerIdentifier()%><br>     
           
-                   
+<b>SafeMode:</b> <%= tracker.getSafeModeText()%><br>            
       
 <hr>
 <h2>Cluster Summary (Heap Size is <%= StringUtils.byteDesc(Runtime.getRuntime().totalMemory())
%>/<%= StringUtils.byteDesc(Runtime.getRuntime().maxMemory()) %>)</h2>
 <% 



Mime
View raw message