hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r448371 - in /lucene/hadoop/trunk: ./ conf/ src/java/org/apache/hadoop/dfs/ src/java/org/apache/hadoop/io/ src/webapps/dfs/
Date Wed, 20 Sep 2006 22:23:10 GMT
Author: cutting
Date: Wed Sep 20 15:23:08 2006
New Revision: 448371

URL: http://svn.apache.org/viewvc?view=rev&rev=448371
Log:
HADOOP-306.  Add a safe mode to DFS.  Contributed by Konstantin.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SafeModeException.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/conf/hadoop-default.xml
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSShell.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java
    lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Sep 20 15:23:08 2006
@@ -43,6 +43,14 @@
     a maximum of one update per one percent of progress.
     (omalley via cutting)
 
+13. HADOOP-306.  Add a "safe" mode to DFS.  The name node enters this
+    when less than a specified percentage of file data is complete.
+    Currently safe mode is only used on startup, but eventually it
+    will also be entered when datanodes disconnect and file data
+    becomes incomplete.  While in safe mode no filesystem
+    modifications are permitted and block replication is inhibited.
+    (Konstantin Shvachko via cutting)
+
 
 Release 0.6.2 (unreleased)
 

Modified: lucene/hadoop/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/conf/hadoop-default.xml?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/conf/hadoop-default.xml (original)
+++ lucene/hadoop/trunk/conf/hadoop-default.xml Wed Sep 20 15:23:08 2006
@@ -241,6 +241,32 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.blockreport.intervalMsec</name>
+  <value>3600000</value>
+  <description>Determines block reporting interval.</description>
+</property>
+
+<property>
+  <name>dfs.safemode.threshold.pct</name>
+  <value>0.95f</value>
+  <description>
+  	Specifies the percentage of blocks that should satisfy 
+  	the minimal replication requirement defined by dfs.replication.min.
+  	Values less than or equal to 0 mean not to start in safe mode.
+  	Values greater than 1 will make safe mode permanent.
+ 	</description>
+</property>
+
+<property>
+  <name>dfs.safemode.extension</name>
+  <value>30000</value>
+  <description>
+  	Determines extension of safe mode in milliseconds 
+  	after the threshold level is reached.
+ 	</description>
+</property>
+
 
 <!-- map/reduce properties -->
 
@@ -552,7 +578,7 @@
 <property>
   <name>ipc.client.idlethreshold</name>
   <value>4000</value>
-  <description>Defines the threshold numner of connections after which
+  <description>Defines the threshold number of connections after which
                connections will be inspected for idleness.
   </description>
 </property>

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Wed Sep 20 15:23:08
2006
@@ -27,8 +27,7 @@
  **********************************************************************/
 interface ClientProtocol extends VersionedProtocol {
 
-  public static final long versionID = 2L;  // infoPort added to DatanodeID
-                                            // affected: DatanodeInfo, LocatedBlock
+  public static final long versionID = 3L;  // setSafeMode() added
   
     ///////////////////////////////////////
     // File contents
@@ -244,4 +243,59 @@
      * @throws IOException
      */
     public long getBlockSize(String filename) throws IOException;
+
+    /**
+     * Enter, leave or get safe mode.
+     * <p>
+     * Safe mode is a name node state when it
+     * <ol><li>does not accept changes to name space (read-only), and</li>
+     * <li>does not replicate or delete blocks.</li></ol>
+     * 
+     * <p>
+     * Safe mode is entered automatically at name node startup.
+     * Safe mode can also be entered manually using
+     * {@link #setSafeMode(FSConstants.SafeModeAction) setSafeMode( SafeModeAction.SAFEMODE_GET
)}.
+     * <p>
+     * At startup the name node accepts data node reports collecting
+     * information about block locations.
+     * In order to leave safe mode it needs to collect a configurable
+     * percentage called threshold of blocks, which satisfy the minimal 
+     * replication condition.
+     * The minimal replication condition is that each block must have at least
+     * <tt>dfs.replication.min</tt> replicas.
+     * When the threshold is reached the name node extends safe mode
+     * for a configurable amount of time
+     * to let the remaining data nodes to check in before it
+     * will start replicating missing blocks.
+     * Then the name node leaves safe mode.
+     * <p>
+     * If safe mode is turned on manually using
+     * {@link #setSafeMode(FSConstants.SafeModeAction) setSafeMode( SafeModeAction.SAFEMODE_ENTER
)}
+     * then the name node stays in safe mode until it is manually turned off
+     * using {@link #setSafeMode(FSConstants.SafeModeAction) setSafeMode( SafeModeAction.SAFEMODE_LEAVE
)}.
+     * Current state of the name node can be verified using
+     * {@link #setSafeMode(FSConstants.SafeModeAction) setSafeMode( SafeModeAction.SAFEMODE_GET
)}
+     * <h4>Configuration parameters:</h4>
+     * <tt>dfs.safemode.threshold.pct</tt> is the threshold parameter.<br>
+     * <tt>dfs.safemode.extension</tt> is the safe mode extension parameter.<br>
+     * <tt>dfs.replication.min</tt> is the minimal replication parameter.
+     * 
+     * <h4>Special cases:</h4>
+     * The name node does not enter safe mode at startup if the threshold is 
+     * set to 0 or if the name space is empty.<br>
+     * If the threshold is set to 1 then all blocks need to have at least 
+     * minimal replication.<br>
+     * If the threshold value is greater than 1 then the name node will not be 
+     * able to turn off safe mode automatically.<br>
+     * Safe mode can always be turned off manually.
+     * 
+     * @param action  <ul> <li>0 leave safe mode;</li>
+     *                <li>1 enter safe mode;</li>
+     *                <li>2 get safe mode state.</li></ul>
+     * @return <ul><li>0 if the safe mode is OFF or</li> 
+     *         <li>1 if the safe mode is ON.</li></ul>
+     * @throws IOException
+     * @author Konstantin Shvachko
+     */
+    public boolean setSafeMode( FSConstants.SafeModeAction action ) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Wed Sep 20 15:23:08
2006
@@ -351,6 +351,17 @@
     public DatanodeInfo[] datanodeReport() throws IOException {
         return namenode.getDatanodeReport();
     }
+    
+    /**
+     * Enter, leave or get safe mode.
+     * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} 
+     * for more details.
+     * 
+     * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+     */
+    public boolean setSafeMode( SafeModeAction action ) throws IOException {
+      return namenode.setSafeMode( action );
+    }
 
     /**
      */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSShell.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSShell.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSShell.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSShell.java Wed Sep 20 15:23:08 2006
@@ -300,7 +300,10 @@
         long raw = dfs.getRawCapacity();
         long rawUsed = dfs.getRawUsed();
         long used = dfs.getUsed();
+        boolean mode = dfs.setSafeMode( FSConstants.SafeModeAction.SAFEMODE_GET );
 
+        if( mode )
+          System.out.println("Safe mode is ON" );
         System.out.println("Total raw bytes: " + raw + " (" + byteDesc(raw) + ")");
         System.out.println("Used raw bytes: " + rawUsed + " (" + byteDesc(rawUsed) + ")");
         System.out.println("% used: " + limitDecimal(((1.0 * rawUsed) / raw) * 100, 2) +
"%");
@@ -318,18 +321,65 @@
         }
       }
     }
+    
+    /**
+     * Safe mode maintenance command.
+     * 
+     * Usage: java DFSShell -safemode [enter | leave | get]
+     */
+    public void setSafeMode( String argv[], int idx ) throws IOException {
+      final String safeModeUsage = "Usage: java DFSShell -safemode [enter | leave | get]";
+      if( ! (fs instanceof DistributedFileSystem) ) {
+        System.out.println( "FileSystem is " + fs.getName() );
+        return;
+      }
+      if( idx != argv.length-1 ) {
+        System.out.println( safeModeUsage );
+        return;
+      }
+      FSConstants.SafeModeAction action;
+      if( "leave".equalsIgnoreCase(argv[idx]) )
+        action = FSConstants.SafeModeAction.SAFEMODE_LEAVE;
+      else if( "enter".equalsIgnoreCase(argv[idx]) )
+        action = FSConstants.SafeModeAction.SAFEMODE_ENTER;
+      else if( "get".equalsIgnoreCase(argv[idx]) )
+        action = FSConstants.SafeModeAction.SAFEMODE_GET;
+      else {
+        System.out.println( safeModeUsage );
+        return;
+      }
+      DistributedFileSystem dfs = (DistributedFileSystem)fs;
+      boolean mode = dfs.setSafeMode( action );
+      System.out.println( "Safe mode is " + ( mode ? "ON" : "OFF" ));
+    }
 
     /**
      * run
      */
     public int run( String argv[] ) throws Exception {
         if (argv.length < 1) {
-            System.out.println("Usage: java DFSShell [-fs <local | namenode:port>]"+
-                    " [-conf <configuration file>] [-D <[property=value>]"+
-                    " [-ls <path>] [-lsr <path>] [-du <path>] [-mv <src>
<dst>] [-cp <src> <dst>] [-rm <src>]" +
-                    " [-put <localsrc> <dst>] [-copyFromLocal <localsrc>
<dst>] [-moveFromLocal <localsrc> <dst>]" + 
-                    " [-get <src> <localdst>] [-getmerge <src> <localdst>
[addnl]] [-cat <src>] [-copyToLocal <src> <localdst>]" +
-                    " [-moveToLocal <src> <localdst>] [-mkdir <path>] [-report]
[-setrep [-R] <rep> <path/file>]");
+            System.out.println("Usage: java DFSShell" + 
+                " [-fs <local | namenode:port>]" +
+                " [-conf <configuration file>]" +
+                " [-D <[property=value>]"+
+                " [-ls <path>]"+
+                " [-lsr <path>]"+
+                " [-du <path>]"+
+                " [-mv <src> <dst>]"+
+                " [-cp <src> <dst>]"+
+                " [-rm <src>]" +
+                " [-put <localsrc> <dst>]"+
+                " [-copyFromLocal <localsrc> <dst>]"+
+                " [-moveFromLocal <localsrc> <dst>]" + 
+                " [-get <src> <localdst>]"+
+                " [-getmerge <src> <localdst> [addnl]]"+
+                " [-cat <src>]"+
+                " [-copyToLocal <src> <localdst>]" +
+                " [-moveToLocal <src> <localdst>]"+
+                " [-mkdir <path>]"+
+                " [-report]"+
+                " [-setrep [-R] <rep> <path/file>]" +
+                " [-safemode enter | leave | get]");
             return -1;
         }
 
@@ -377,6 +427,8 @@
                 mkdir(argv[i++]);
             } else if ("-report".equals(cmd)) {
                 report();
+            } else if ("-safemode".equals(cmd)) {
+                setSafeMode(argv,i);
             }
             exitCode = 0;;
         } catch (IOException e ) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Sep 20 15:23:08 2006
@@ -155,6 +155,8 @@
         this(InetAddress.getLocalHost().getHostName(), 
              dataDirs,
              createSocketAddr(conf.get("fs.default.name", "local")), conf);
+        // register datanode
+        register();
         int infoServerPort = conf.getInt("dfs.datanode.info.port", 50075);
         String infoServerBindAddress = conf.get("dfs.datanode.info.bindAddress", "0.0.0.0");
         this.infoServer = new StatusHttpServer("datanode", infoServerBindAddress, infoServerPort,
true);
@@ -165,8 +167,6 @@
         } catch (Exception e) {LOG.warn("addServlet threw exception", e);}
         this.infoServer.start();
         this.dnRegistration.infoPort = this.infoServer.getPort();
-        // register datanode
-        register();
         datanodeObject = this;
     }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Wed Sep
20 15:23:08 2006
@@ -253,4 +253,16 @@
     public DatanodeInfo[] getDataNodeStats() throws IOException {
       return dfs.datanodeReport();
     }
+    
+    /**
+     * Enter, leave or get safe mode.
+     * See {@link ClientProtocol#setSafeMode(FSConstants.SafeModeAction)} 
+     * for more details.
+     *  
+     * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+     */
+    public boolean setSafeMode( FSConstants.SafeModeAction action ) 
+    throws IOException {
+      return dfs.setSafeMode( action );
+    }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Sep 20 15:23:08
2006
@@ -102,7 +102,6 @@
     public static long HEARTBEAT_INTERVAL = 3 * 1000;
     public static long EXPIRE_INTERVAL = 10 * 60 * 1000;
     public static long BLOCKREPORT_INTERVAL = 60 * 60 * 1000;
-    public static long DATANODE_STARTUP_PERIOD = 2 * 60 * 1000;
     public static long LEASE_PERIOD = 60 * 1000;
     public static int READ_TIMEOUT = 60 * 1000;
 
@@ -113,6 +112,9 @@
     
     //TODO mb@media-style.com: should be conf injected?
     public static final int BUFFER_SIZE = new Configuration().getInt("io.file.buffer.size",
4096);
+
+    // SafeMode actions
+    public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
 
     // Version is reflected in the dfs image and edit log files.
     // Version is reflected in the data storage file.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Sep 20 15:23:08
2006
@@ -165,9 +165,9 @@
     // Threaded object that checks to see if we have been
     // getting heartbeats from all clients. 
     //
-    HeartbeatMonitor hbmon = null;
-    LeaseMonitor lmon = null;
-    Daemon hbthread = null, lmthread = null;
+    Daemon hbthread = null;   // HeartbeatMonitor thread
+    Daemon lmthread = null;   // LeaseMonitor thread
+    Daemon smmthread = null;  // SafeModeMonitor thread
     boolean fsRunning = true;
     long systemStart = 0;
 
@@ -183,6 +183,7 @@
     public static FSNamesystem fsNamesystemObject;
     private String localMachine;
     private int port;
+    private SafeModeInfo safeMode;  // safe mode information
 
     /**
      * dir is where the filesystem directory state 
@@ -190,33 +191,44 @@
      */
     public FSNamesystem(File dir, Configuration conf) throws IOException {
         fsNamesystemObject = this;
-        this.infoPort = conf.getInt("dfs.info.port", 50070);
-        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
-        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
-        this.infoServer.start();
         InetSocketAddress addr = DataNode.createSocketAddr(conf.get("fs.default.name", "local"));
-        this.localMachine = addr.getHostName();
-        this.port = addr.getPort();
-        this.dir = new FSDirectory(dir);
-        this.dir.loadFSImage( conf );
-        this.hbthread = new Daemon(new HeartbeatMonitor());
-        this.lmthread = new Daemon(new LeaseMonitor());
-        hbthread.start();
-        lmthread.start();
-        this.systemStart = System.currentTimeMillis();
-        this.startTime = new Date(systemStart); 
-
         this.maxReplication = conf.getInt("dfs.replication.max", 512);
         this.minReplication = conf.getInt("dfs.replication.min", 1);
+        if( minReplication <= 0 )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.min = " 
+              + minReplication
+              + " must be greater than 0" );
+        if( maxReplication >= (int)Short.MAX_VALUE )
+          throw new IOException(
+              "Unexpected configuration parameters: dfs.replication.max = " 
+              + maxReplication + " must be less than " + (Short.MAX_VALUE) );
         if( maxReplication < minReplication )
           throw new IOException(
               "Unexpected configuration parameters: dfs.replication.min = " 
               + minReplication
               + " must be less than dfs.replication.max = " 
               + maxReplication );
-
         this.maxReplicationStreams = conf.getInt("dfs.max-repl-streams", 2);
         this.heartBeatRecheck= 1000;
+
+        this.localMachine = addr.getHostName();
+        this.port = addr.getPort();
+        this.dir = new FSDirectory(dir);
+        this.dir.loadFSImage( conf );
+        this.safeMode = new SafeModeInfo( conf );
+        setBlockTotal();
+        this.hbthread = new Daemon(new HeartbeatMonitor());
+        this.lmthread = new Daemon(new LeaseMonitor());
+        hbthread.start();
+        lmthread.start();
+        this.systemStart = now();
+        this.startTime = new Date(systemStart); 
+
+        this.infoPort = conf.getInt("dfs.info.port", 50070);
+        this.infoBindAddress = conf.get("dfs.info.bindAddress", "0.0.0.0");
+        this.infoServer = new StatusHttpServer("dfs",infoBindAddress, infoPort, false);
+        this.infoServer.start();
     }
     /** Return the FSNamesystem object
      * 
@@ -308,6 +320,8 @@
     public boolean setReplication(String src, 
                                   short replication
                                 ) throws IOException {
+      if( isInSafeMode() )
+        throw new SafeModeException( "Cannot set replication for " + src, safeMode );
       verifyReplication(src, replication, null );
 
       Vector oldReplication = new Vector();
@@ -382,6 +396,8 @@
                                           ) throws IOException {
       NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: file "
             +src+" for "+holder+" at "+clientMachine);
+      if( isInSafeMode() )
+        throw new SafeModeException( "Cannot create file" + src, safeMode );
       try {
         if (pendingCreates.get(src) != null) {
            throw new AlreadyBeingCreatedException(
@@ -465,6 +481,8 @@
                                                     ) throws IOException {
         NameNode.stateChangeLog.debug("BLOCK* NameSystem.getAdditionalBlock: file "
             +src+" for "+clientName);
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot add block to " + src, safeMode );
         FileUnderConstruction pendingFile = 
           (FileUnderConstruction) pendingCreates.get(src);
         // make sure that we still have the lease on this file
@@ -562,8 +580,11 @@
      * Before we return, we make sure that all the file's blocks have 
      * been reported by datanodes and are replicated correctly.
      */
-    public synchronized int completeFile(UTF8 src, UTF8 holder) {
+    public synchronized int completeFile( UTF8 src, 
+                                          UTF8 holder) throws IOException {
         NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " + src + " for " +
holder );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot complete file " + src, safeMode );
         if (dir.getFile(src) != null || pendingCreates.get(src) == null) {
             NameNode.stateChangeLog.warn( "DIR* NameSystem.completeFile: "
                     + "failed to complete " + src
@@ -705,8 +726,10 @@
     /**
      * Change the indicated filename.
      */
-    public boolean renameTo(UTF8 src, UTF8 dst) {
+    public boolean renameTo(UTF8 src, UTF8 dst) throws IOException {
         NameNode.stateChangeLog.debug("DIR* NameSystem.renameTo: " + src + " to " + dst );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot rename " + src, safeMode );
         return dir.renameTo(src, dst);
     }
 
@@ -714,8 +737,10 @@
      * Remove the indicated filename from the namespace.  This may
      * invalidate some blocks that make up the file.
      */
-    public synchronized boolean delete(UTF8 src) {
+    public synchronized boolean delete(UTF8 src) throws IOException {
         NameNode.stateChangeLog.debug("DIR* NameSystem.delete: " + src );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot delete " + src, safeMode );
         Block deletedBlocks[] = (Block[]) dir.delete(src);
         if (deletedBlocks != null) {
             for (int i = 0; i < deletedBlocks.length; i++) {
@@ -762,8 +787,10 @@
     /**
      * Create all the necessary directories
      */
-    public boolean mkdirs(UTF8 src) {
+    public boolean mkdirs( String src ) throws IOException {
         NameNode.stateChangeLog.debug("DIR* NameSystem.mkdirs: " + src );
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot create directory " + src, safeMode );
         return dir.mkdirs(src);
     }
 
@@ -847,10 +874,10 @@
             renew();
         }
         public void renew() {
-            this.lastUpdate = System.currentTimeMillis();
+            this.lastUpdate = now();
         }
         public boolean expired() {
-            if (System.currentTimeMillis() - lastUpdate > LEASE_PERIOD) {
+            if (now() - lastUpdate > LEASE_PERIOD) {
                 return true;
             } else {
                 return false;
@@ -943,7 +970,11 @@
     /**
      * Get a lock (perhaps exclusive) on the given file
      */
-    public synchronized int obtainLock(UTF8 src, UTF8 holder, boolean exclusive) {
+    public synchronized int obtainLock( UTF8 src, 
+                                        UTF8 holder, 
+                                        boolean exclusive) throws IOException {
+        if( isInSafeMode() )
+          throw new SafeModeException( "Cannot lock file " + src, safeMode );
         int result = dir.obtainLock(src, holder, exclusive);
         if (result == COMPLETE_SUCCESS) {
             synchronized (leases) {
@@ -1013,8 +1044,10 @@
     /**
      * Renew the lease(s) held by the given client
      */
-    public void renewLease(UTF8 holder) {
+    public void renewLease(UTF8 holder) throws IOException {
         synchronized (leases) {
+            if( isInSafeMode() )
+              throw new SafeModeException( "Cannot renew lease for " + holder, safeMode );
             Lease lease = (Lease) leases.get(holder);
             if (lease != null) {
                 sortedLeases.remove(lease);
@@ -1102,7 +1135,9 @@
             "BLOCK* NameSystem.registerDatanode: "
             + "node " + nodeS.name
             + " is replaced by " + nodeReg.getName() + "." );
+        getEditLog().logRemoveDatanode( nodeS );
         nodeS.name = nodeReg.getName();
+        getEditLog().logAddDatanode( nodeS );
         return;
       }
 
@@ -1391,6 +1426,9 @@
             FSDirectory.INode fileINode = dir.getFileByBlock(block);
             if( fileINode == null )  // block does not belong to any file
                 return;
+            // check whether safe replication is reached for the block
+            // only if it is a part of a files
+            incrementSafeBlockCount( containingNodes.size() );
             short fileReplication = fileINode.getReplication();
             if (containingNodes.size() >= fileReplication ) {
                 neededReplications.remove(block);
@@ -1494,9 +1532,14 @@
                 +block.getBlockName() + " from "+node.getName() );
         TreeSet containingNodes = (TreeSet) blocksMap.get(block);
         if (containingNodes == null || ! containingNodes.contains(node)) {
-            throw new IllegalArgumentException("No machine mapping found for block " + block
+ ", which should be at node " + node);
+          NameNode.stateChangeLog.debug("BLOCK* NameSystem.removeStoredBlock: "
+            +block.getBlockName()+" has already been removed from node "+node );
+          return;
         }
         containingNodes.remove(node);
+        decrementSafeBlockCount( containingNodes.size() );
+        if( containingNodes.size() == 0 )
+          blocksMap.remove(block);
         //
         // It's possible that the block was removed because of a datanode
         // failure.  If the block is still valid, check if replication is
@@ -1631,6 +1674,11 @@
      * Check if there are any recently-deleted blocks a datanode should remove.
      */
     public synchronized Block[] blocksToInvalidate( DatanodeID nodeID ) {
+        // Ask datanodes to perform block delete  
+        // only if safe mode is off.
+        if( isInSafeMode() )
+          return null;
+        
         Vector invalidateSet = (Vector) recentInvalidateSets.remove( 
                                                       nodeID.getStorageID() );
  
@@ -1661,6 +1709,11 @@
      */
     public synchronized Object[] pendingTransfers(DatanodeID srcNode,
                                                   int xmitsInProgress) {
+    // Ask datanodes to perform block replication  
+    // only if safe mode is off.
+    if( isInSafeMode() )
+      return null;
+    
     synchronized (neededReplications) {
       Object results[] = null;
       int scheduledXfers = 0;
@@ -1692,7 +1745,7 @@
                                                       srcNode.getStorageID() );
             // srcNode must contain the block, and the block must
             // not be scheduled for removal on that node
-            if (containingNodes.contains(srcNode)
+            if (containingNodes != null && containingNodes.contains(srcNode)
                 && (excessBlocks == null || ! excessBlocks.contains(block))) {
               DatanodeDescriptor targets[] = chooseTargets(
                   Math.min( fileINode.getReplication() - containingNodes.size(),
@@ -2024,4 +2077,354 @@
       return infoPort;
     }
 
+    /**
+     * SafeModeInfo contains information related to the safe mode.
+     * <p>
+     * An instance of {@link SafeModeInfo} is created when the name node
+     * enters safe mode.
+     * <p>
+     * During name node startup {@link SafeModeInfo} counts the number of
+     * <em>safe blocks</em>, those that have at least the minimal number of
+     * replicas, and calculates the ratio of safe blocks to the total number
+     * of blocks in the system, which is the size of
+     * {@link FSDirectory#activeBlocks}. When the ratio reaches the
+     * {@link #threshold} it starts the {@link SafeModeMonitor} daemon in order
+     * to monitor whether the safe mode extension is passed. Then it leaves safe
+     * mode and destroys itself.
+     * <p>
+     * If safe mode is turned on manually then the number of safe blocks is
+     * not tracked because the name node is not intended to leave safe mode
+     * automatically in the case.
+     *
+     * @see ClientProtocol#setSafeMode(FSConstants.SafeModeAction)
+     * @see SafeModeMonitor
+     * @author Konstantin Shvachko
+     */
+    class SafeModeInfo {
+      // configuration fields
+      /** Safe mode threshold condition %.*/
+      private double threshold;
+      /** Safe mode extension after the threshold. */
+      private int extension;
+      /** Min replication required by safe mode. */
+      private int safeReplication;
+      
+      // internal fields
+      /** Time when threshold was reached.
+       * 
+       * <br>-1 safe mode is off
+       * <br> 0 safe mode is on, but threshold is not reached yet 
+       */
+      private long reached = -1;  
+      /** Total number of blocks. */
+      int blockTotal; 
+      /** Number of safe blocks. */
+      private int blockSafe;
+      
+      /**
+       * Creates SafeModeInfo when the name node enters
+       * automatic safe mode at startup.
+       *  
+       * @param conf configuration
+       */
+      SafeModeInfo( Configuration conf ) {
+        this.threshold = conf.getFloat( "dfs.safemode.threshold.pct", 0.95f );
+        this.extension = conf.getInt( "dfs.safemode.extension", 0 );
+        this.safeReplication = conf.getInt( "dfs.replication.min", 1 );
+        this.blockTotal = 0; 
+        this.blockSafe = 0;
+      }
+
+      /**
+       * Creates SafeModeInfo when safe mode is entered manually.
+       *
+       * The {@link #threshold} is set to 1.5 so that it could never be reached.
+       * {@link #blockTotal} is set to -1 to indicate that safe mode is manual.
+       * 
+       * @see SafeModeInfo
+       */
+      private SafeModeInfo() {
+        this.threshold = 1.5f;  // this threshold can never be riched
+        this.extension = 0;
+        this.safeReplication = Short.MAX_VALUE + 1; // more than maxReplication
+        this.blockTotal = -1;
+        this.blockSafe = -1;
+        this.reached = -1;
+        enter();
+      }
+      
+      /**
+       * Check if safe mode is on.
+       * @return true if in safe mode
+       */
+      synchronized boolean isOn() {
+        try {
+          isConsistent();   // SHV this an assert
+        } catch( IOException e ) {
+          System.err.print( StringUtils.stringifyException( e ));
+        }
+        return this.reached >= 0;
+      }
+      
+      /**
+       * Enter safe mode.
+       */
+      void enter() {
+        if( reached != 0 )
+          NameNode.stateChangeLog.info(
+            "STATE* SafeModeInfo.enter: " + "Safe mode is ON.\n" 
+            + getTurnOffTip() );
+        this.reached = 0;
+      }
+      
+      /**
+       * Leave safe mode.
+       */
+      synchronized void leave() {
+        if( reached >= 0 )
+          NameNode.stateChangeLog.info(
+            "STATE* SafeModeInfo.leave: " + "Safe mode is OFF." ); 
+        reached = -1;
+        safeMode = null;
+      }
+      
+      /** 
+       * Safe mode can be turned off iff 
+       * the threshold is reached and 
+       * the extension time have passed.
+       * @return true if can leave or false otherwise.
+       */
+      synchronized boolean canLeave() {
+        if( reached == 0 )
+          return false;
+        if( now() - reached < extension )
+          return false;
+        return ! needEnter();
+      }
+      
+      /** 
+       * There is no need to enter safe mode 
+       * if DFS is empty or {@link #threshold} == 0
+       */
+      boolean needEnter() {
+        return getSafeBlockRatio() < threshold;
+      }
+      
+      /**
+       * Ratio of the number of safe blocks to the total number of blocks 
+       * to be compared with the threshold.
+       */
+      private float getSafeBlockRatio() {
+        return ( blockTotal == 0 ? 1 : (float)blockSafe/blockTotal );
+      }
+      
+      /**
+       * Check and trigger safe mode if needed. 
+       */
+      private void checkMode() {
+        if( needEnter() ) {
+          enter();
+          return;
+        }
+        // the threshold is reached
+        if( ! isOn() ||                           // safe mode is off
+            extension <= 0 || threshold <= 0 ) {  // don't need to wait
+          this.leave();                           // just leave safe mode
+          return;
+        }
+        if( reached > 0 )  // threshold has already been reached before
+          return;
+        // start monitor
+        reached = now();
+        smmthread = new Daemon(new SafeModeMonitor());
+        smmthread.start();
+      }
+      
+      /**
+       * Set total number of blocks.
+       */
+      synchronized void setBlockTotal( int total) {
+        this.blockTotal = total; 
+        checkMode();
+      }
+      
+      /**
+       * Increment number of safe blocks if current block has 
+       * reached minimal replication.
+       * @param replication current replication 
+       */
+      synchronized void incrementSafeBlockCount( short replication ) {
+        if( (int)replication == safeReplication )
+          this.blockSafe++;
+        checkMode();
+      }
+      
+      /**
+       * Decrement number of safe blocks if current block has 
+       * fallen below minimal replication.
+       * @param replication current replication 
+       */
+      synchronized void decrementSafeBlockCount( short replication ) {
+        if( replication == safeReplication-1 )
+          this.blockSafe--;
+        checkMode();
+      }
+      
+      /**
+       * Check if safe mode was entered manually or at startup.
+       */
+      boolean isManual() {
+        return blockTotal == -1;
+      }
+      
+      /**
+       * A tip on how safe mode is to be turned off: manually or automatically.
+       */
+      String getTurnOffTip() {
+        return ( isManual() ? 
+            "Use \"hadoop dfs -safemode leave\" to turn safe mode off." :
+            "Safe mode will be turned off automatically." );
+      }
+      
+      /**
+       * Returns printable state of the class.
+       */
+      public String toString() {
+        String resText = "Current safe block ratio = " 
+          + getSafeBlockRatio() 
+          + ". Target threshold = " + threshold
+          + ". Minimal replication = " + safeReplication + ".";
+        if( reached > 0 ) 
+          resText += " Threshold was reached " + new Date(reached) + ".";
+        return resText;
+      }
+      
+      /**
+       * Checks consistency of the class state.
+       * @deprecated This is for debugging purposes.
+       */
+      void isConsistent() throws IOException {
+        if( blockTotal == -1 && blockSafe == -1 ) {
+          return; // manual safe mode
+        }
+        int activeBlocks = dir.activeBlocks.size();
+        if( blockTotal != activeBlocks )
+          throw new IOException( "blockTotal " + blockTotal 
+              + " does not match all blocks count. " 
+              + "activeBlocks = " + activeBlocks 
+              + ". safeBlocks = " + blockSafe 
+              + " safeMode is: " 
+              + ((safeMode == null) ? "null" : safeMode.toString()) ); 
+        if( blockSafe < 0 || blockSafe > blockTotal )
+          throw new IOException( "blockSafe " + blockSafe 
+              + " is out of range [0," + blockTotal + "]. " 
+              + "activeBlocks = " + activeBlocks 
+              + " safeMode is: " 
+              + ((safeMode == null) ? "null" : safeMode.toString()) ); 
+      } 
+    }
+    
+    /**
+     * Periodically check whether it is time to leave safe mode.
+     * This thread starts when the threshold level is reached.
+     *
+     * @author Konstantin Shvachko
+     */
+    class SafeModeMonitor implements Runnable {
+      /** interval in msec for checking safe mode: {@value} */
+      private static final long recheckInterval = 1000;
+      
+      /**
+       */
+      public void run() {
+        while( ! safeMode.canLeave() ) {
+          try {
+            Thread.sleep(recheckInterval);
+          } catch (InterruptedException ie) {
+          }
+        }
+        // leave safe mode an stop the monitor
+        safeMode.leave();
+        smmthread = null;
+      }
+    }
+    
+    /**
+     * Current system time.
+     * @return current time in msec.
+     */
+    static long now() {
+      return System.currentTimeMillis();
+    }
+    
+    /**
+     * Check whether the name node is in safe mode.
+     * @return true if safe mode is ON, false otherwise
+     */
+    boolean isInSafeMode() {
+      if( safeMode == null )
+        return false;
+      return safeMode.isOn();
+    }
+    
+    /**
+     * Increment number of blocks that reached minimal replication.
+     * @param replication current replication 
+     */
+    void incrementSafeBlockCount( int replication ) {
+      if( safeMode == null )
+        return;
+      safeMode.incrementSafeBlockCount( (short)replication );
+    }
+
+    /**
+     * Decrement number of blocks that reached minimal replication.
+     * @param replication current replication
+     */
+    void decrementSafeBlockCount( int replication ) {
+      if( safeMode == null )
+        return;
+      safeMode.decrementSafeBlockCount( (short)replication );
+    }
+
+    /**
+     * Set the total number of blocks in the system. 
+     */
+    void setBlockTotal() {
+      if( safeMode == null )
+        return;
+      safeMode.setBlockTotal( dir.activeBlocks.size() );
+    }
+
+    /**
+     * Enter safe mode manually.
+     * @throws IOException
+     */
+    synchronized void enterSafeMode() throws IOException {
+      if( isInSafeMode() ) {
+        NameNode.stateChangeLog.info(
+            "STATE* FSNamesystem.enterSafeMode: " + "Safe mode is already ON."); 
+        return;
+      }
+      safeMode = new SafeModeInfo();
+    }
+    
+    /**
+     * Leave safe mode.
+     * @throws IOException
+     */
+    synchronized void leaveSafeMode() throws IOException {
+      if( ! isInSafeMode() ) {
+        NameNode.stateChangeLog.info(
+            "STATE* FSNamesystem.leaveSafeMode: " + "Safe mode is already OFF."); 
+        return;
+      }
+      safeMode.leave();
+    }
+    
+    String getSafeModeTip() {
+      if( ! isInSafeMode() )
+        return "";
+      return safeMode.getTurnOffTip();
+    }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/JspHelper.java Wed Sep 20 15:23:08
2006
@@ -165,4 +165,9 @@
       out.print("</tbody></table>");
     }
 
+    public String getSafeModeText() {
+      if( ! fsn.isInSafeMode() )
+        return "";
+      return "Safe mode is ON. <em>" + fsn.getSafeModeTip() + "<em>";
+    }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Sep 20 15:23:08 2006
@@ -75,8 +75,6 @@
     private FSNamesystem namesystem;
     private Server server;
     private int handlerCount = 2;
-    private long datanodeStartupPeriod;
-    private volatile long firstBlockReportTime;
     
     /** only used for testing purposes  */
     private boolean stopRequested = false;
@@ -134,8 +132,6 @@
         this.namesystem = new FSNamesystem(dir, conf);
         this.handlerCount = conf.getInt("dfs.namenode.handler.count", 10);
         this.server = RPC.getServer(this, bindAddress, port, handlerCount, false, conf);
-        this.datanodeStartupPeriod =
-            conf.getLong("dfs.datanode.startupMsec", DATANODE_STARTUP_PERIOD);
         this.server.start();
         myMetrics = new NameNodeMetrics();
     }
@@ -360,7 +356,7 @@
             throw new IOException("mkdirs: Pathname too long.  Limit " 
                 + MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
         }
-        return namesystem.mkdirs(new UTF8(src));
+        return namesystem.mkdirs( src );
     }
 
     /**
@@ -423,6 +419,22 @@
         }
         return results;
     }
+    
+    /**
+     * @inheritDoc
+     */
+    public boolean setSafeMode( SafeModeAction action ) throws IOException {
+      switch( action ) {
+      case SAFEMODE_LEAVE: // leave safe mode
+        namesystem.leaveSafeMode();
+        break;
+      case SAFEMODE_ENTER: // enter safe mode
+        namesystem.enterSafeMode();
+        break;
+      case SAFEMODE_GET: // get safe mode
+      }
+      return namesystem.isInSafeMode();
+    }
 
     ////////////////////////////////////////////////////////////////
     // DatanodeProtocol
@@ -450,23 +462,6 @@
         namesystem.gotHeartbeat( nodeReg, capacity, remaining, xceiverCount );
         
         //
-        // Only ask datanodes to perform block operations (transfer, delete) 
-        // after a startup quiet period.  The assumption is that all the
-        // datanodes will be started together, but the namenode may
-        // have been started some time before.  (This is esp. true in
-        // the case of network interruptions.)  So, wait for some time
-        // to pass from the time of connection to the first block-transfer.
-        // Otherwise we transfer a lot of blocks unnecessarily.
-        //
-        // Hairong: Ideally in addition we also look at the history. For example,
-        // we should wait until at least 98% of datanodes are connected to the server
-        //
-        if( firstBlockReportTime==0 ||
-            System.currentTimeMillis()-firstBlockReportTime < datanodeStartupPeriod) {
-            return null;
-        }
-        
-        //
         // Ask to perform pending transfers, if any
         //
         Object xferResults[] = namesystem.pendingTransfers( nodeReg,
@@ -493,8 +488,6 @@
         verifyRequest( nodeReg );
         stateChangeLog.debug("*BLOCK* NameNode.blockReport: "
                 +"from "+nodeReg.getName()+" "+blocks.length+" blocks" );
-        if( firstBlockReportTime==0)
-              firstBlockReportTime=System.currentTimeMillis();
 
         return namesystem.processReport( nodeReg, blocks );
      }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SafeModeException.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SafeModeException.java?view=auto&rev=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SafeModeException.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/SafeModeException.java Wed Sep 20 15:23:08
2006
@@ -0,0 +1,17 @@
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * This exception is thrown when the name node is in safe mode.
+ * Client cannot modified namespace until the safe mode is off. 
+ * 
+ * @author Konstantin Shvachko
+ */
+public class SafeModeException extends IOException {
+
+  public SafeModeException( String text, FSNamesystem.SafeModeInfo mode  ) {
+    super( text + ". Name node is in safe mode.\n" + mode.getTurnOffTip());
+  }
+
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/io/ObjectWritable.java Wed Sep 20 15:23:08
2006
@@ -145,7 +145,8 @@
       } else {
         throw new IllegalArgumentException("Not a primitive: "+declaredClass);
       }
-      
+    } else if (declaredClass.isEnum() ) {         // enum
+      UTF8.writeString( out, ((Enum)instance).name() );
     } else if (Writable.class.isAssignableFrom(declaredClass)) { // Writable
       UTF8.writeString(out, instance.getClass().getName());
       ((Writable)instance).write(out);
@@ -212,7 +213,8 @@
       
     } else if (declaredClass == String.class) {        // String
       instance = UTF8.readString(in);
-      
+    } else if( declaredClass.isEnum() ) {         // enum
+      instance = Enum.valueOf( declaredClass, UTF8.readString(in) );
     } else {                                      // Writable
       Class instanceClass = null;
       try {

Modified: lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp?view=diff&rev=448371&r1=448370&r2=448371
==============================================================================
--- lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/dfs/dfshealth.jsp Wed Sep 20 15:23:08 2006
@@ -105,6 +105,8 @@
 <b><a href="/nn_browsedfscontent.jsp">Browse the filesystem</a></b>
 <hr>
 <h2>Cluster Summary</h2>
+<b> <%= jspHelper.getSafeModeText()%> </b>
+<p>
 The capacity of this cluster is <%= totalCapacity()%> and remaining is <%= totalRemaining()%>.
 <br>
 <% 



Mime
View raw message