hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r555368 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/ src/test/org/apache/hadoop/dfs/
Date Wed, 11 Jul 2007 18:53:02 GMT
Author: cutting
Date: Wed Jul 11 11:52:59 2007
New Revision: 555368

URL: http://svn.apache.org/viewvc?view=rev&rev=555368
Log:
HADOOP-1286.  Add support to HDFS for distributed upgrades.  Contributed by Konstantin.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeCommand.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java
    lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Jul 11 11:52:59 2007
@@ -289,6 +289,11 @@
  89. HADOOP-1533.  Add persistent error logging for distcp. The logs are stored
     into a specified hdfs directory. (Senthil Subramanian via omalley)
 
+ 90. HADOOP-1286.  Add support to HDFS for distributed upgrades, which
+     permits coordinated upgrade of datanode data.
+     (Konstantin Shvachko via cutting)
+
+
 Release 0.13.0 - 2007-06-08
 
  1. HADOOP-1047.  Fix TestReplication to succeed more reliably.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/BlockCommand.java Wed Jul 11 11:52:59 2007
@@ -21,16 +21,20 @@
 import org.apache.hadoop.io.*;
 
 class DatanodeCommand implements Writable {
-  DatanodeProtocol.DataNodeAction action;
+  protected int action;
   
   public DatanodeCommand() {
-    this(DatanodeProtocol.DataNodeAction.DNA_UNKNOWN);
+    this(DatanodeProtocol.DNA_UNKNOWN);
   }
   
-  public DatanodeCommand(DatanodeProtocol.DataNodeAction action) {
+  public DatanodeCommand(int action) {
     this.action = action;
   }
 
+  int getAction() {
+    return this.action;
+  }
+  
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
@@ -43,12 +47,11 @@
   }
 
   public void write(DataOutput out) throws IOException {
-    WritableUtils.writeEnum(out, action);
+    out.writeInt(this.action);
   }
   
   public void readFields(DataInput in) throws IOException {
-    this.action = (DatanodeProtocol.DataNodeAction)
-      WritableUtils.readEnum(in, DatanodeProtocol.DataNodeAction.class);
+    this.action = in.readInt();
   }
 }
 
@@ -72,7 +75,7 @@
    * @param targets   nodes to transfer
    */
   public BlockCommand(Block blocks[], DatanodeInfo targets[][]) {
-    super( DatanodeProtocol.DataNodeAction.DNA_TRANSFER);
+    super( DatanodeProtocol.DNA_TRANSFER);
     this.blocks = blocks;
     this.targets = targets;
   }
@@ -82,7 +85,7 @@
    * @param blocks  blocks to invalidate
    */
   public BlockCommand(Block blocks[]) {
-    super(DatanodeProtocol.DataNodeAction.DNA_INVALIDATE);
+    super(DatanodeProtocol.DNA_INVALIDATE);
     this.blocks = blocks;
     this.targets = new DatanodeInfo[0][];
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Wed Jul 11 11:52:59 2007
@@ -194,6 +194,7 @@
    */
   DataNode(Configuration conf, 
            AbstractList<File> dataDirs) throws IOException {
+    datanodeObject = this;
     try {
       startDataNode(conf, dataDirs);
     } catch (IOException ie) {
@@ -273,7 +274,6 @@
       networkLoc = getNetworkLoc(conf);
     // register datanode
     register();
-    datanodeObject = this;
   }
 
   private NamespaceInfo handshake() throws IOException {
@@ -289,6 +289,23 @@
         } catch (InterruptedException ie) {}
       }
     }
+    String errorMsg = null;
+    // verify build version
+    if( ! nsInfo.getBuildVersion().equals( Storage.getBuildVersion() )) {
+      errorMsg = "Incompatible build versions: namenode BV = " 
+        + nsInfo.getBuildVersion() + "; datanode BV = "
+        + Storage.getBuildVersion();
+      LOG.fatal( errorMsg );
+      try {
+        namenode.errorReport( dnRegistration,
+                              DatanodeProtocol.NOTIFY, errorMsg );
+      } catch( SocketTimeoutException e ) {  // namenode is busy
+        LOG.info("Problem connecting to server: " + getNameNodeAddr());
+      }
+      throw new IOException( errorMsg );
+    }
+    assert FSConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
+      "Data-node and name-node layout versions must be the same.";
     return nsInfo;
   }
 
@@ -358,6 +375,8 @@
       ((DataXceiveServer) this.dataXceiveServer.getRunnable()).kill();
       this.dataXceiveServer.interrupt();
     }
+    if(upgradeManager != null)
+      upgradeManager.shutdownUpgrade();
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -531,15 +550,15 @@
   private boolean processCommand(DatanodeCommand cmd) throws IOException {
     if (cmd == null)
       return true;
-    switch(cmd.action) {
-    case DNA_TRANSFER:
+    switch(cmd.getAction()) {
+    case DatanodeProtocol.DNA_TRANSFER:
       //
       // Send a copy of a block to another datanode
       //
       BlockCommand bcmd = (BlockCommand)cmd;
       transferBlocks(bcmd.getBlocks(), bcmd.getTargets());
       break;
-    case DNA_INVALIDATE:
+    case DatanodeProtocol.DNA_INVALIDATE:
       //
       // Some local block(s) are obsolete and can be 
       // safely garbage-collected.
@@ -553,25 +572,38 @@
       }
       myMetrics.removedBlocks(toDelete.length);
       break;
-    case DNA_SHUTDOWN:
+    case DatanodeProtocol.DNA_SHUTDOWN:
       // shut down the data node
       this.shutdown();
       return false;
-    case DNA_REGISTER:
+    case DatanodeProtocol.DNA_REGISTER:
       // namenode requested a registration
       register();
       lastHeartbeat=0;
       lastBlockReport=0;
       break;
-    case DNA_FINALIZE:
+    case DatanodeProtocol.DNA_FINALIZE:
       storage.finalizeUpgrade();
       break;
+    case UpgradeCommand.UC_ACTION_START_UPGRADE:
+      // start distributed upgrade here
+      processDistributedUpgradeCommand((UpgradeCommand)cmd);
+      break;
     default:
-      LOG.warn("Unknown DatanodeCommand action: " + cmd.action);
+      LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }
     return true;
   }
-    
+
+  // Distributed upgrade manager
+  UpgradeManagerDatanode upgradeManager = new UpgradeManagerDatanode(this);
+
+  private void processDistributedUpgradeCommand(UpgradeCommand comm
+                                               ) throws IOException {
+    assert upgradeManager != null : "DataNode.upgradeManager is null.";
+    upgradeManager.processUpgradeCommand(comm);
+  }
+
   private void transferBlocks( Block blocks[], 
                                DatanodeInfo xferTargets[][] 
                                ) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataStorage.java Wed Jul 11 11:52:59 2007
@@ -165,8 +165,6 @@
     // check the layout version inside the storage file
     // Lock and Read old storage file
     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
-    if (oldFile == null)
-      throw new IOException("Cannot read file: " + oldF);
     FileLock oldLock = oldFile.getChannel().tryLock();
     try {
       oldFile.seek(0);
@@ -214,8 +212,6 @@
     
     // Lock and Read old storage file
     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
-    if (oldFile == null)
-      throw new IOException("Cannot read file: " + oldF);
     FileLock oldLock = oldFile.getChannel().tryLock();
     if (oldLock == null)
       throw new IOException("Cannot lock file: " + oldF);
@@ -284,6 +280,8 @@
     if (this.layoutVersion == FSConstants.LAYOUT_VERSION 
         && this.cTime == nsInfo.getCTime())
       return; // regular startup
+    // verify necessity of a distributed upgrade
+    verifyDistributedUpgradeProgress(nsInfo);
     if (this.layoutVersion > FSConstants.LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
       doUpgrade(sd, nsInfo);  // upgrade
@@ -433,13 +431,20 @@
     if (!oldF.createNewFile())
       throw new IOException("Cannot create file " + oldF);
     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
-    if (oldFile == null)
-      throw new IOException("Cannot read file: " + oldF);
     // write new version into old storage file
     try {
       writeCorruptedData(oldFile);
     } finally {
       oldFile.close();
     }
+  }
+
+  private void verifyDistributedUpgradeProgress(
+                  NamespaceInfo nsInfo
+                ) throws IOException {
+    UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
+    assert um != null : "DataNode.upgradeManager is null.";
+    um.setUpgradeState(false, getLayoutVersion());
+    um.initializeUpgrade(nsInfo);
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Wed Jul 11 11:52:59 2007
@@ -44,14 +44,14 @@
 
   /**
    * Determines actions that data node should perform 
-   * when receiving a block command. 
+   * when receiving a datanode command. 
    */
-  public enum DataNodeAction{ DNA_UNKNOWN,    // unknown action   
-                              DNA_TRANSFER,   // transfer blocks to another datanode
-                              DNA_INVALIDATE, // invalidate blocks
-                              DNA_SHUTDOWN,   // shutdown node
-                              DNA_REGISTER,   // re-register
-                              DNA_FINALIZE; } // finalize previous upgrade
+  final static int DNA_UNKNOWN = 0;    // unknown action   
+  final static int DNA_TRANSFER = 1;   // transfer blocks to another datanode
+  final static int DNA_INVALIDATE = 2; // invalidate blocks
+  final static int DNA_SHUTDOWN = 3;   // shutdown node
+  final static int DNA_REGISTER = 4;   // re-register
+  final static int DNA_FINALIZE = 5;   // finalize previous upgrade
 
   /** 
    * Register Datanode.
@@ -106,4 +106,6 @@
                           String msg) throws IOException;
     
   public NamespaceInfo versionRequest() throws IOException;
+  
+  UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Wed Jul 11 11:52:59 2007
@@ -136,7 +136,7 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -5;
+  public static final int LAYOUT_VERSION = -6;
   // Current version: 
-  // File modification times added.
+  // Dustributed upgrade is introduced.
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSEditLog.java Wed Jul 11 11:52:59 2007
@@ -92,6 +92,10 @@
     return editStreams == null ? 0 : editStreams.size();
   }
 
+  boolean isOpen() {
+    return getNumEditStreams() > 0;
+  }
+
   /**
    * Create empty edit log files.
    * Initialize the output stream for logging.
@@ -228,7 +232,6 @@
       try {
         while (true) {
           long timestamp = 0;
-          long ctime = 0; 
           long mtime = 0;
           byte opcode = -1;
           try {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Wed Jul 11 11:52:59 2007
@@ -183,12 +183,14 @@
     if (!isFormatted && startOpt != StartupOption.ROLLBACK)
       throw new IOException("NameNode is not formatted.");
     if (startOpt != StartupOption.UPGRADE
-        && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
-        && layoutVersion != FSConstants.LAYOUT_VERSION)
-      throw new IOException(
-                            "\nFile system image contains an old layout version " + layoutVersion
-                            + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
-                            + " is required.\nPlease restart NameNode with -upgrade option.");
+          && layoutVersion < LAST_PRE_UPGRADE_LAYOUT_VERSION
+          && layoutVersion != FSConstants.LAYOUT_VERSION)
+        throw new IOException(
+                          "\nFile system image contains an old layout version " + layoutVersion
+                          + ".\nAn upgrade to version " + FSConstants.LAYOUT_VERSION
+                          + " is required.\nPlease restart NameNode with -upgrade option.");
+    // check whether distributed upgrade is reguired and/or should be continued
+    verifyDistributedUpgradeProgress(startOpt);
 
     // 2. Format unformatted dirs.
     this.checkpointTime = 0L;
@@ -219,14 +221,20 @@
     case REGULAR:
       if (loadFSImage())
         saveFSImage();
-      else
-        editLog.open();
     }
     assert editLog != null : "editLog must be initialized";
-    assert editLog.getNumEditStreams() > 0 : "editLog should be opened";
+    if(!editLog.isOpen())
+      editLog.open();
   }
 
   private void doUpgrade() throws IOException {
+    if(getDistributedUpgradeState()) {
+      // only distributed upgrade need to continue
+      // don't do version upgrade
+      this.loadFSImage();
+      initializeDistributedUpgrade();
+      return;
+    }
     // Upgrade is allowed only if there are 
     // no previous fs states in any of the directories
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
@@ -273,6 +281,7 @@
       isUpgradeFinalized = false;
       LOG.info("Upgrade of " + sd.root + " is complete.");
     }
+    initializeDistributedUpgrade();
     editLog.open();
   }
 
@@ -325,6 +334,8 @@
       LOG.info("Rollback of " + sd.root + " is complete.");
     }
     isUpgradeFinalized = true;
+    // check whether name-node can start in regular mode
+    verifyDistributedUpgradeProgress(StartupOption.REGULAR);
   }
 
   private void doFinalize(StorageDirectory sd) throws IOException {
@@ -360,6 +371,12 @@
     if (layoutVersion == 0)
       throw new IOException("NameNode directory " 
                             + sd.root + " is not formatted.");
+    String sDUS, sDUV;
+    sDUS = props.getProperty("distributedUpgradeState"); 
+    sDUV = props.getProperty("distributedUpgradeVersion");
+    setDistributedUpgradeState(
+        sDUS == null? false : Boolean.parseBoolean(sDUS),
+        sDUV == null? getLayoutVersion() : Integer.parseInt(sDUV));
     this.checkpointTime = readCheckpointTime(sd);
   }
 
@@ -391,6 +408,12 @@
                            StorageDirectory sd 
                            ) throws IOException {
     super.setFields(props, sd);
+    boolean uState = getDistributedUpgradeState();
+    int uVersion = getDistributedUpgradeVersion();
+    if(uState && uVersion != getLayoutVersion()) {
+      props.setProperty("distributedUpgradeState", Boolean.toString(uState));
+      props.setProperty("distributedUpgradeVersion", Integer.toString(uVersion)); 
+    }
     writeCheckpointTime(sd);
   }
 
@@ -440,8 +463,6 @@
     // check the layout version inside the image file
     File oldF = new File(oldImageDir, "fsimage");
     RandomAccessFile oldFile = new RandomAccessFile(oldF, "rws");
-    if (oldFile == null)
-      throw new IOException("Cannot read file: " + oldF);
     try {
       oldFile.seek(0);
       int odlVersion = oldFile.readInt();
@@ -597,7 +618,7 @@
     // which we read in the image
     //
     needToSave |= (loadFSEdits(latestSD) > 0);
-
+    
     return needToSave;
   }
 
@@ -610,7 +631,9 @@
     assert this.getLayoutVersion() < 0 : "Negative layout version is expected.";
     assert curFile != null : "curFile is null";
 
-    FSDirectory fsDir = FSNamesystem.getFSNamesystem().dir;
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
+
     //
     // Load in bits
     //
@@ -699,7 +722,8 @@
    * Save the contents of the FS image to the file.
    */
   void saveFSImage(File newFile ) throws IOException {
-    FSDirectory fsDir = FSNamesystem.getFSNamesystem().dir;
+    FSNamesystem fsNamesys = FSNamesystem.getFSNamesystem();
+    FSDirectory fsDir = fsNamesys.dir;
     //
     // Write out data
     //
@@ -1000,13 +1024,51 @@
       if (!oldImage.createNewFile())
         throw new IOException("Cannot create file " + oldImage);
     RandomAccessFile oldFile = new RandomAccessFile(oldImage, "rws");
-    if (oldFile == null)
-      throw new IOException("Cannot read file: " + oldImage);
     // write new version into old image file
     try {
       writeCorruptedData(oldFile);
     } finally {
       oldFile.close();
     }
+  }
+
+  private boolean getDistributedUpgradeState() {
+    return FSNamesystem.getFSNamesystem().getDistributedUpgradeState();
+  }
+
+  private int getDistributedUpgradeVersion() {
+    return FSNamesystem.getFSNamesystem().getDistributedUpgradeVersion();
+  }
+
+  private void setDistributedUpgradeState(boolean uState, int uVersion) {
+    FSNamesystem.getFSNamesystem().upgradeManager.setUpgradeState(uState, uVersion);
+  }
+
+  private void verifyDistributedUpgradeProgress(StartupOption startOpt
+                                                ) throws IOException {
+    if(startOpt == StartupOption.ROLLBACK)
+      return;
+    UpgradeManager um = FSNamesystem.getFSNamesystem().upgradeManager;
+    assert um != null : "FSNameSystem.upgradeManager is null.";
+    if(startOpt != StartupOption.UPGRADE) {
+      if(um.getUpgradeState())
+        throw new IOException(
+                    "\n   Previous distributed upgrade was not completed. "
+                  + "\n   Please restart NameNode with -upgrade option.");
+      if(um.getDistributedUpgrades() != null)
+        throw new IOException("\n   Distributed upgrade for NameNode version " 
+          + um.getUpgradeVersion() + " to current LV " + FSConstants.LAYOUT_VERSION
+          + " is required.\n   Please restart NameNode with -upgrade option.");
+    }
+  }
+
+  private void initializeDistributedUpgrade() throws IOException {
+    UpgradeManagerNamenode um = FSNamesystem.getFSNamesystem().upgradeManager;
+    um.initializeUpgrade();
+    // write new upgrade state into disk
+    FSNamesystem.getFSNamesystem().getFSImage().writeAll();
+    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
+        + um.getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is initialized.");
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Wed Jul 11 11:52:59 2007
@@ -308,7 +308,8 @@
     
   NamespaceInfo getNamespaceInfo() {
     return new NamespaceInfo(dir.fsImage.getNamespaceID(),
-                             dir.fsImage.getCTime());
+                             dir.fsImage.getCTime(),
+                             getDistributedUpgradeVersion());
   }
 
   /** Close down this filesystem manager.
@@ -1644,7 +1645,7 @@
    * If a substantial amount of time passed since the last datanode 
    * heartbeat then request an immediate block report.  
    * 
-   * @return true if block report is required or false otherwise.
+   * @return true if registration is required or false otherwise.
    * @throws IOException
    */
   public boolean gotHeartbeat(DatanodeID nodeID,
@@ -1940,9 +1941,13 @@
                                   + nodeID.getName() + " storage " + key 
                                   + " is removed from datanodeMap.");
   }
-    
-  private FSEditLog getEditLog() {
-    return dir.fsImage.getEditLog();
+
+  FSImage getFSImage() {
+    return dir.fsImage;
+  }
+
+  FSEditLog getEditLog() {
+    return getFSImage().getEditLog();
   }
 
   /**
@@ -3097,8 +3102,25 @@
       
     /**
      * Leave safe mode.
+     * Switch to manual safe mode if distributed upgrade is required.
      */
-    synchronized void leave() {
+    synchronized void leave(boolean checkForUpgrades) {
+      if(checkForUpgrades) {
+        // verify whether a distributed upgrade needs to be started
+        boolean needUpgrade = false;
+        try {
+          needUpgrade = startDistributedUpgradeIfNeeded();
+        } catch(IOException e) {
+          FSNamesystem.LOG.error(StringUtils.stringifyException(e));
+        }
+        if(needUpgrade) {
+          // switch to manual safe mode
+          safeMode = new SafeModeInfo();
+          NameNode.stateChangeLog.info("STATE* SafeModeInfo.leave: " 
+                                      + "Safe mode is ON.\n" + getTurnOffTip()); 
+          return;
+        }
+      }
       if (reached >= 0)
         NameNode.stateChangeLog.info(
                                      "STATE* SafeModeInfo.leave: " + "Safe mode is OFF."); 
@@ -3152,7 +3174,7 @@
       // the threshold is reached
       if (!isOn() ||                           // safe mode is off
           extension <= 0 || threshold <= 0) {  // don't need to wait
-        this.leave();                           // just leave safe mode
+        this.leave(true); // leave safe mode
         return;
       }
       if (reached > 0)  // threshold has already been reached before
@@ -3204,9 +3226,11 @@
      * A tip on how safe mode is to be turned off: manually or automatically.
      */
     String getTurnOffTip() {
-      return (isManual() ? 
-              "Use \"hadoop dfs -safemode leave\" to turn safe mode off." :
-              "Safe mode will be turned off automatically.");
+      return (isManual() ?  getDistributedUpgradeState() ?
+        "Safe mode will be turned off automatically upon completion of " + 
+        "the distributed upgrade: status = " + getDistributedUpgradeStatus() + "%" :
+        "Use \"hadoop dfs -safemode leave\" to turn safe mode off." :
+        "Safe mode will be turned off automatically.");
     }
       
     /**
@@ -3259,7 +3283,7 @@
         }
       }
       // leave safe mode an stop the monitor
-      safeMode.leave();
+      safeMode.leave(true);
       smmthread = null;
     }
   }
@@ -3294,7 +3318,6 @@
 
   /**
    * Decrement number of blocks that reached minimal replication.
-   * @param replication current replication
    */
   void decrementSafeBlockCount(Block b) {
     if (safeMode == null) // mostly true
@@ -3328,13 +3351,16 @@
    * Leave safe mode.
    * @throws IOException
    */
-  synchronized void leaveSafeMode() throws IOException {
+  synchronized void leaveSafeMode(boolean checkForUpgrades) throws IOException {
     if (!isInSafeMode()) {
       NameNode.stateChangeLog.info(
                                    "STATE* FSNamesystem.leaveSafeMode: " + "Safe mode is already OFF."); 
       return;
     }
-    safeMode.leave();
+    if(getDistributedUpgradeState())
+      throw new SafeModeException("Distributed upgrade is in progress",
+                                  safeMode);
+    safeMode.leave(checkForUpgrades);
   }
     
   String getSafeModeTip() {
@@ -3375,5 +3401,32 @@
   private boolean isValidBlock(Block b) {
     return (blocksMap.getINode(b) != null ||
             pendingCreates.contains(b));
+  }
+
+  // Distributed upgrade manager
+  UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode();
+
+  UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException {
+    return upgradeManager.processUpgradeCommand(comm);
+  }
+
+  int getDistributedUpgradeVersion() {
+    return upgradeManager.getUpgradeVersion();
+  }
+
+  UpgradeCommand getDistributedUpgradeCommand() throws IOException {
+    return upgradeManager.getBroadcastCommand();
+  }
+
+  boolean getDistributedUpgradeState() {
+    return upgradeManager.getUpgradeState();
+  }
+
+  short getDistributedUpgradeStatus() {
+    return upgradeManager.getUpgradeStatus();
+  }
+
+  boolean startDistributedUpgradeIfNeeded() throws IOException {
+    return upgradeManager.startUpgrade();
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Wed Jul 11 11:52:59 2007
@@ -505,7 +505,7 @@
   public boolean setSafeMode(SafeModeAction action) throws IOException {
     switch(action) {
     case SAFEMODE_LEAVE: // leave safe mode
-      namesystem.leaveSafeMode();
+      namesystem.leaveSafeMode(false);
       break;
     case SAFEMODE_ENTER: // enter safe mode
       namesystem.enterSafeMode();
@@ -599,7 +599,7 @@
                                 deleteList)) {
       // request block report from the datanode
       assert(xferResults[0] == null && deleteList[0] == null);
-      return new DatanodeCommand(DataNodeAction.DNA_REGISTER);
+      return new DatanodeCommand(DatanodeProtocol.DNA_REGISTER);
     }
         
     //
@@ -619,7 +619,10 @@
     if (deleteList[0] != null) {
       return new BlockCommand((Block[]) deleteList[0]);
     }
-    return null;
+    
+    // check whether a distributed upgrade need to be done
+    // and send a request to start one if required
+    return namesystem.getDistributedUpgradeCommand();
   }
 
   public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
@@ -632,7 +635,7 @@
     if (blocksToDelete != null && blocksToDelete.length > 0)
       return new BlockCommand(blocksToDelete);
     if (getFSImage().isUpgradeFinalized())
-      return new DatanodeCommand(DataNodeAction.DNA_FINALIZE);
+      return new DatanodeCommand(DatanodeProtocol.DNA_FINALIZE);
     return null;
   }
 
@@ -665,6 +668,10 @@
     
   public NamespaceInfo versionRequest() throws IOException {
     return namesystem.getNamespaceInfo();
+  }
+
+  public UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException {
+    return namesystem.processDistributedUpgradeCommand(comm);
   }
 
   /** 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NamespaceInfo.java Wed Jul 11 11:52:59 2007
@@ -34,18 +34,26 @@
  */
 class NamespaceInfo extends StorageInfo implements Writable {
   String  buildVersion;
+  int distributedUpgradeVersion;
 
   public NamespaceInfo() {
     super();
     buildVersion = null;
   }
   
-  public NamespaceInfo(int nsID, long cT) {
+  public NamespaceInfo(int nsID, long cT, int duVersion) {
     super(FSConstants.LAYOUT_VERSION, nsID, cT);
     buildVersion = Storage.getBuildVersion();
+    this.distributedUpgradeVersion = duVersion;
   }
   
-  public String getBuildVersion() { return buildVersion; }
+  public String getBuildVersion() {
+    return buildVersion;
+  }
+
+  public int getDistributedUpgradeVersion() {
+    return distributedUpgradeVersion;
+  }
   
   /////////////////////////////////////////////////
   // Writable
@@ -63,6 +71,7 @@
     out.writeInt(getLayoutVersion());
     out.writeInt(getNamespaceID());
     out.writeLong(getCTime());
+    out.writeInt(getDistributedUpgradeVersion());
   }
 
   public void readFields(DataInput in) throws IOException {
@@ -70,5 +79,6 @@
     layoutVersion = in.readInt();
     namespaceID = in.readInt();
     cTime = in.readLong();
+    distributedUpgradeVersion = in.readInt();
   }
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java?view=diff&rev=555368&r1=555367&r2=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Storage.java Wed Jul 11 11:52:59 2007
@@ -170,6 +170,7 @@
       RandomAccessFile file = new RandomAccessFile(to, "rws");
       FileOutputStream out = null;
       try {
+        file.setLength(0);
         file.seek(0);
         out = new FileOutputStream(file.getFD());
         props.store(out, null);

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeCommand.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeCommand.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeCommand.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeCommand.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,92 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * This as a generic distributed upgrade command.
+ * 
+ * During the upgrade cluster components send upgrade commands to each other
+ * in order to obtain or share information with them.
+ * It is supposed that each upgrade defines specific upgrade command by
+ * deriving them from this class.
+ * The upgrade command contains version of the upgrade, which is verified 
+ * on the receiving side and current status of the upgrade.
+ */
+class UpgradeCommand extends DatanodeCommand {
+  final static int UC_ACTION_UNKNOWN = DatanodeProtocol.DNA_UNKNOWN;
+  final static int UC_ACTION_REPORT_STATUS = 100; // report upgrade status
+  final static int UC_ACTION_START_UPGRADE = 101; // start upgrade
+
+  private int version;
+  private short upgradeStatus;
+
+  UpgradeCommand() {
+    super(UC_ACTION_UNKNOWN);
+    this.version = 0;
+    this.upgradeStatus = 0;
+  }
+
+  UpgradeCommand(int action, int version, short status) {
+    super(action);
+    this.version = version;
+    this.upgradeStatus = status;
+  }
+
+  int getVersion() {
+    return this.version;
+  }
+
+  short getCurrentStatus() {
+    return this.upgradeStatus;
+  }
+
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (UpgradeCommand.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new UpgradeCommand(); }
+       });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeInt(this.version);
+    out.writeShort(this.upgradeStatus);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.version = in.readInt();
+    this.upgradeStatus = in.readShort();
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManager.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.SortedSet;
+
+/**
+ * Generic upgrade manager.
+ * 
+ * {@link #broadcastCommand} is the command that should be 
+ *
+ */
+abstract class UpgradeManager {
+  protected SortedSet<Upgradeable> currentUpgrades = null;
+  protected boolean upgradeState = false; // true if upgrade is in progress
+  protected int upgradeVersion = 0;
+  protected UpgradeCommand broadcastCommand = null;
+
+  synchronized UpgradeCommand getBroadcastCommand() {
+    return this.broadcastCommand;
+  }
+
+  boolean getUpgradeState() {
+    return this.upgradeState;
+  }
+
+  int getUpgradeVersion(){
+    return this.upgradeVersion;
+  }
+
+  void setUpgradeState(boolean uState, int uVersion) {
+    this.upgradeState = uState;
+    this.upgradeVersion = uVersion;
+  }
+
+  SortedSet<Upgradeable> getDistributedUpgrades() throws IOException {
+    return UpgradeObjectCollection.getDistributedUpgrades(
+                                            getUpgradeVersion(), getType());
+  }
+
+  short getUpgradeStatus() {
+    if(currentUpgrades == null)
+      return 100;
+    return currentUpgrades.first().getUpgradeStatus();
+  }
+
+  boolean initializeUpgrade() throws IOException {
+    currentUpgrades = getDistributedUpgrades();
+    if(currentUpgrades == null) {
+      // set new upgrade state
+      setUpgradeState(false, FSConstants.LAYOUT_VERSION);
+      return false;
+    }
+    Upgradeable curUO = currentUpgrades.first();
+    // set and write new upgrade state into disk
+    setUpgradeState(true, curUO.getVersion());
+    return true;
+  }
+
+  abstract FSConstants.NodeType getType();
+  abstract boolean startUpgrade() throws IOException;
+  abstract void completeUpgrade() throws IOException;
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.net.SocketTimeoutException;
+
+import org.apache.hadoop.util.Daemon;
+
+/**
+ * Upgrade manager for data-nodes.
+ *
+ * Distributed upgrades for a data-node are performed in a separate thread.
+ * The upgrade starts when the data-node receives the start upgrade command
+ * from the namenode. At that point the manager finds a respective upgrade
+ * object and starts a daemon in order to perform the upgrade defined by the 
+ * object.
+ */
+class UpgradeManagerDatanode extends UpgradeManager {
+  DataNode dataNode = null;
+  Daemon upgradeDaemon = null;
+
+  UpgradeManagerDatanode(DataNode dataNode) {
+    super();
+    this.dataNode = dataNode;
+  }
+
+  public FSConstants.NodeType getType() {
+    return FSConstants.NodeType.DATA_NODE;
+  }
+
+  void initializeUpgrade(NamespaceInfo nsInfo) throws IOException {
+    if( ! super.initializeUpgrade())
+      return; // distr upgrade is not needed
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is initialized.");
+    upgradeState = false;
+    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
+    if(nsUpgradeVersion >= getUpgradeVersion())
+      return;
+    String errorMsg = 
+        "\n   Datanode missed a distributed upgrade and will shutdown."
+      + "\n   namenode distributed upgrade version = " + nsUpgradeVersion
+      + "\n   expected version = " + getUpgradeVersion();
+    DataNode.LOG.fatal( errorMsg );
+    try {
+      dataNode.namenode.errorReport(dataNode.dnRegistration,
+                                    DatanodeProtocol.NOTIFY, errorMsg);
+    } catch( SocketTimeoutException e ) {  // namenode is busy
+      DataNode.LOG.info("Problem connecting to server: " 
+          + dataNode.getNameNodeAddr());
+    }
+    throw new IOException( errorMsg );
+  }
+
+  /**
+   * Start distributed upgrade.
+   * Instantiates distributed upgrade objects.
+   * 
+   * @return true if distributed upgrade is required or false otherwise
+   * @throws IOException
+   */
+  synchronized boolean startUpgrade() throws IOException {
+    if(upgradeState) {  // upgrade is already in progress
+      assert currentUpgrades != null : 
+        "UpgradeManagerDatanode.currentUpgrades is null.";
+      UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
+      curUO.startUpgrade();
+      return true;
+    }
+    if(broadcastCommand != null) {
+      // the upgrade has been finished by this data-node,
+      // but the cluster is still running it, 
+      // reply with the broadcast command
+      assert currentUpgrades == null : 
+        "UpgradeManagerDatanode.currentUpgrades is not null.";
+      assert upgradeDaemon == null : 
+        "UpgradeManagerDatanode.upgradeDaemon is not null.";
+      dataNode.namenode.processUpgradeCommand(broadcastCommand);
+      return true;
+    }
+    currentUpgrades = getDistributedUpgrades();
+    if(currentUpgrades == null) {
+      DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
+          + getUpgradeVersion() + " to current LV " 
+          + FSConstants.LAYOUT_VERSION + " cannot be started. "
+          + "The upgrade object is not defined.");
+      return false;
+    }
+    upgradeState = true;
+    if(currentUpgrades.size() > 1)
+      throw new IOException(
+          "More than one distributed upgrade objects registered for version " 
+          + getUpgradeVersion());
+    UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
+    curUO.setDatanode(dataNode);
+    curUO.startUpgrade();
+    upgradeDaemon = new Daemon(curUO);
+    upgradeDaemon.start();
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is started.");
+    return true;
+  }
+
+  synchronized void processUpgradeCommand(UpgradeCommand command
+                                          ) throws IOException {
+    assert command.getAction() == UpgradeCommand.UC_ACTION_START_UPGRADE :
+      "Only start upgrade action can be processed at this time.";
+    this.upgradeVersion = command.getVersion();
+    // Start distributed upgrade
+    if(startUpgrade()) // upgrade started
+      return;
+    throw new IOException(
+        "Distributed upgrade for DataNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " cannot be started. "
+        + "The upgrade object is not defined.");
+  }
+
+  synchronized void completeUpgrade() throws IOException {
+    assert currentUpgrades != null : 
+      "UpgradeManagerDatanode.currentUpgrades is null.";
+    UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
+    broadcastCommand = curUO.completeUpgrade();
+    upgradeState = false;
+    currentUpgrades = null;
+    upgradeDaemon = null;
+    DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is complete.");
+  }
+
+  synchronized void shutdownUpgrade() {
+    if(upgradeDaemon != null)
+      upgradeDaemon.interrupt();
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.util.SortedSet;
+import java.io.IOException;
+
+/**
+ * Upgrade manager for name-nodes.
+ *
+ * Distributed upgrades for a name-node starts when the safe mode conditions 
+ * are met and the name-node is about to exit it.
+ * At this point the name-node enters manual safe mode which will remain
+ * on until the upgrade is completed.
+ * After that the name-nodes processes upgrade commands from data-nodes
+ * and updates its status.
+ */
+class UpgradeManagerNamenode extends UpgradeManager {
+  public FSConstants.NodeType getType() {
+    return FSConstants.NodeType.NAME_NODE;
+  }
+
+  /**
+   * Start distributed upgrade.
+   * Instantiates distributed upgrade objects.
+   * 
+   * @return true if distributed upgrade is required or false otherwise
+   * @throws IOException
+   */
+  synchronized boolean startUpgrade() throws IOException {
+    if(!upgradeState) {
+      initializeUpgrade();
+      if(!upgradeState) return false;
+      // write new upgrade state into disk
+      FSNamesystem.getFSNamesystem().getFSImage().writeAll();
+    }
+    assert currentUpgrades != null : "currentUpgrades is null";
+    this.broadcastCommand = currentUpgrades.first().startUpgrade();
+    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is started.");
+    return true;
+  }
+
+  synchronized UpgradeCommand processUpgradeCommand(UpgradeCommand command
+                                                    ) throws IOException {
+    NameNode.LOG.debug("\n   Distributed upgrade for NameNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is processing upgrade command: "
+        + command.getAction() + " status = " + getUpgradeStatus() + "%");
+    if(currentUpgrades == null) {
+      NameNode.LOG.info("Ignoring upgrade command: " 
+          + command.getAction() + " version " + command.getVersion()
+          + ". No distributed upgrades are currently running on the NameNode");
+      return null;
+    }
+    UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
+    if(command.getVersion() != curUO.getVersion())
+      throw new IncorrectVersionException(command.getVersion(), 
+          "UpgradeCommand", curUO.getVersion());
+    UpgradeCommand reply = curUO.processUpgradeCommand(command);
+    if(curUO.getUpgradeStatus() < 100) {
+      return reply;
+    }
+    // current upgrade is done
+    curUO.completeUpgrade();
+    // proceede with the next one
+    currentUpgrades.remove(curUO);
+    if(currentUpgrades.isEmpty()) { // all upgrades are done
+      completeUpgrade();
+    } else {  // start next upgrade
+      curUO = (UpgradeObjectNamenode)currentUpgrades.first();
+      this.broadcastCommand = curUO.startUpgrade();
+    }
+    return reply;
+  }
+
+  synchronized void completeUpgrade() throws IOException {
+    NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 
+        + getUpgradeVersion() + " to current LV " 
+        + FSConstants.LAYOUT_VERSION + " is complete.");
+    // set and write new upgrade state into disk
+    setUpgradeState(false, FSConstants.LAYOUT_VERSION);
+    FSNamesystem.getFSNamesystem().getFSImage().writeAll();
+    currentUpgrades = null;
+    broadcastCommand = null;
+    FSNamesystem.getFSNamesystem().leaveSafeMode(false);
+  }
+
+  public static void main(String[] args) throws IOException {
+    UpgradeManagerNamenode um = new UpgradeManagerNamenode();
+    SortedSet<Upgradeable> uos;
+    uos = UpgradeObjectCollection.getDistributedUpgrades(-4, 
+        FSConstants.NodeType.NAME_NODE);
+    System.out.println(uos.size());
+    um.startUpgrade();
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.dfs.UpgradeObjectCollection.UOSignature;
+
+/**
+ * Abstract upgrade object.
+ */
+abstract class UpgradeObject implements Upgradeable {
+  protected short status;
+  
+  public short getUpgradeStatus() {
+    return status;
+  }
+
+  public String getDescription() {
+    return "Upgrade object for " + getType() + " layout version " + getVersion();
+  }
+
+  public int compareTo(Upgradeable o) {
+    if(this.getVersion() != o.getVersion())
+      return (getVersion() < o.getVersion() ? -1 : 1);
+    int res = this.getType().toString().compareTo(o.getType().toString());
+    if(res != 0)
+      return res;
+    return getClass().getCanonicalName().compareTo(
+                    o.getClass().getCanonicalName());
+  }
+
+  public boolean equals(Object o) {
+    if (!(o instanceof UpgradeObject)) {
+      return false;
+    }
+    return this.compareTo((UpgradeObject)o) == 0;
+  }
+
+  public int hashCode() {
+    return new UOSignature(this).hashCode(); 
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectCollection.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+import java.util.SortedSet;
+import java.util.TreeSet;
+
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Collection of upgrade objects.
+ *
+ * Upgrade objects should be registered here before they can be used. 
+ */
+class UpgradeObjectCollection {
+  static {
+    initialize();
+    // Registered distributed upgrade objects here
+    // registerUpgrade(new UpgradeObject());
+  }
+
+  static class UOSignature implements Comparable<UOSignature> {
+    int version;
+    FSConstants.NodeType type;
+    String className;
+
+    UOSignature(Upgradeable uo) {
+      this.version = uo.getVersion();
+      this.type = uo.getType();
+      this.className = uo.getClass().getCanonicalName();
+    }
+
+    int getVersion() {
+      return version;
+    }
+
+    FSConstants.NodeType getType() {
+      return type;
+    }
+
+    String getClassName() {
+      return className;
+    }
+
+    Upgradeable instantiate() throws IOException {
+      try {
+        return (Upgradeable)Class.forName(getClassName()).newInstance();
+      } catch(ClassNotFoundException e) {
+        throw new IOException(StringUtils.stringifyException(e));
+      } catch(InstantiationException e) {
+        throw new IOException(StringUtils.stringifyException(e));
+      } catch(IllegalAccessException e) {
+        throw new IOException(StringUtils.stringifyException(e));
+      }
+    }
+
+    public int compareTo(UOSignature o) {
+      if(this.version != o.version)
+        return (version < o.version ? -1 : 1);
+      int res = this.getType().toString().compareTo(o.getType().toString());
+      if(res != 0)
+        return res;
+      return className.compareTo(o.className);
+    }
+
+    public boolean equals(Object o) {
+        if (!(o instanceof UOSignature)) {
+          return false;
+        }
+        return this.compareTo((UOSignature)o) == 0;
+      }
+
+      public int hashCode() {
+        return version ^ ((type==null)?0:type.hashCode()) 
+                       ^ ((className==null)?0:className.hashCode());
+      }
+  }
+
+  /**
+   * Static collection of upgrade objects sorted by version.
+   * Layout versions are negative therefore newer versions will go first.
+   */
+  static SortedSet<UOSignature> upgradeTable;
+
+  static final void initialize() {
+    upgradeTable = new TreeSet<UOSignature>();
+  }
+
+  static void registerUpgrade(Upgradeable uo) {
+    // Registered distributed upgrade objects here
+    upgradeTable.add(new UOSignature(uo));
+  }
+
+  static SortedSet<Upgradeable> getDistributedUpgrades(int versionFrom, 
+                                                       FSConstants.NodeType type
+                                                       ) throws IOException {
+    assert FSConstants.LAYOUT_VERSION <= versionFrom : "Incorrect version " 
+      + versionFrom + ". Expected to be <= " + FSConstants.LAYOUT_VERSION;
+    SortedSet<Upgradeable> upgradeObjects = new TreeSet<Upgradeable>();
+    for(UOSignature sig : upgradeTable) {
+      if(sig.getVersion() < FSConstants.LAYOUT_VERSION)
+        continue;
+      if(sig.getVersion() > versionFrom)
+        break;
+      if(sig.getType() != type )
+        continue;
+      upgradeObjects.add(sig.instantiate());
+    }
+    if(upgradeObjects.size() == 0)
+      return null;
+    return upgradeObjects;
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import org.apache.hadoop.util.StringUtils;
+import java.io.IOException;
+
+/**
+ * Base class for data-node upgrade objects.
+ * Data-node upgrades are run in separate threads.
+ */
+abstract class UpgradeObjectDatanode extends UpgradeObject implements Runnable {
+  private DataNode dataNode = null;
+
+  public FSConstants.NodeType getType() {
+    return FSConstants.NodeType.DATA_NODE;
+  }
+
+  protected DataNode getDatanode() {
+    return dataNode;
+  }
+
+  void setDatanode(DataNode dataNode) {
+    this.dataNode = dataNode;
+  }
+
+  /**
+   * Specifies how the upgrade is performed. 
+   * @throws IOException
+   */
+  abstract void doUpgrade() throws IOException;
+
+  public void run() {
+    assert dataNode != null : "UpgradeObjectDatanode.dataNode is null";
+    while(dataNode.shouldRun) {
+      try {
+        doUpgrade();
+      } catch(Exception e) {
+        DataNode.LOG.error(StringUtils.stringifyException(e));
+      }
+      break;
+    }
+
+    // report results
+    if(getUpgradeStatus() < 100) {
+      DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
+          + getVersion() + " to current LV " 
+          + FSConstants.LAYOUT_VERSION + " cannot be completed.");
+    }
+
+    // Complete the upgrade by calling the manager method
+    try {
+      dataNode.upgradeManager.completeUpgrade();
+    } catch(IOException e) {
+      DataNode.LOG.error(StringUtils.stringifyException(e));
+    }
+  }
+
+  /**
+   * Complete upgrade and return a status complete command for broadcasting.
+   * 
+   * Data-nodes finish upgrade at different times.
+   * The data-node needs to re-confirm with the name-node that the upgrade
+   * is complete while other nodes are still upgrading.
+   */
+  public UpgradeCommand completeUpgrade() throws IOException {
+    return new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS,
+                              getVersion(), (short)100);
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * Base class for name-node upgrade objects.
+ * Data-node upgrades are run in separate threads.
+ */
+abstract class UpgradeObjectNamenode extends UpgradeObject {
+
+  /**
+   * Process an upgrade command.
+   * RPC has only one very generic command for all upgrade related inter 
+   * component communications. 
+   * The actual command recognition and execution should be handled here.
+   * The reply is sent back also as an UpgradeCommand.
+   * 
+   * @param command
+   * @return the reply command which is analyzed on the client side.
+   */
+  abstract UpgradeCommand processUpgradeCommand(UpgradeCommand command
+                                               ) throws IOException;
+
+  public FSConstants.NodeType getType() {
+    return FSConstants.NodeType.NAME_NODE;
+  }
+
+  /**
+   */
+  public UpgradeCommand startUpgrade() throws IOException {
+    // broadcast that data-nodes must start the upgrade
+    return new UpgradeCommand(UpgradeCommand.UC_ACTION_START_UPGRADE,
+                              getVersion(), (short)0);
+  }
+
+  FSNamesystem getFSNamesystem() {
+    return FSNamesystem.getFSNamesystem();
+  }
+}

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.IOException;
+
+/**
+ * Common interface for distributed upgrade objects.
+ * 
+ * Each upgrade object corresponds to a layout version,
+ * which is the latest version that should be upgraded using this object.
+ * That is all components whose layout version is greater or equal to the
+ * one returned by {@link #getVersion()} must be upgraded with this object.
+ */
+public interface Upgradeable extends Comparable<Upgradeable> {
+  /**
+   * Get the layout version of the upgrade object.
+   * @return layout version
+   */
+  int getVersion();
+
+  /**
+   * Get the type of the software component, which this object is upgrading.
+   * @return type
+   */
+  FSConstants.NodeType getType();
+
+  /**
+   * Description of the upgrade object for displaying.
+   * @return description
+   */
+  String getDescription();
+
+  /**
+   * Upgrade status determines a percentage of the work done out of the total 
+   * amount required by the upgrade.
+   * 
+   * 100% means that the upgrade is completed.
+   * Any value < 100 means it is not complete.
+   * 
+   * The return value should provide at least 2 values, e.g. 0 and 100.
+   * @return integer value in the range [0, 100].
+   */
+  short getUpgradeStatus();
+
+  /**
+   * Prepare for the upgrade.
+   * E.g. initialize upgrade data structures and set status to 0.
+   * 
+   * Returns an upgrade command that is used for broadcasting to other cluster
+   * components. 
+   * E.g. name-node informs data-nodes that they must perform a distributed upgrade.
+   * 
+   * @return an UpgradeCommand for broadcasting.
+   * @throws IOException
+   */
+  UpgradeCommand startUpgrade() throws IOException;
+
+  /**
+   * Complete upgrade.
+   * E.g. cleanup upgrade data structures or write metadata to disk.
+   * 
+   * Returns an upgrade command that is used for broadcasting to other cluster
+   * components. 
+   * E.g. data-nodes inform the name-node that they completed the upgrade
+   * while other data-nodes are still upgrading.
+   * 
+   * @throws IOException
+   */
+  UpgradeCommand completeUpgrade() throws IOException;
+}

Added: lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java?view=auto&rev=555368
==============================================================================
--- lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java (added)
+++ lucene/hadoop/trunk/src/test/org/apache/hadoop/dfs/TestDistributedUpgrade.java Wed Jul 11 11:52:59 2007
@@ -0,0 +1,182 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.dfs;
+
+import java.io.File;
+import java.io.IOException;
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+
+import static org.apache.hadoop.dfs.FSConstants.NodeType.DATA_NODE;
+import static org.apache.hadoop.dfs.FSConstants.NodeType.NAME_NODE;
+import static org.apache.hadoop.dfs.FSConstants.LAYOUT_VERSION;
+import org.apache.hadoop.dfs.FSConstants.StartupOption;
+
+/**
+ */
+public class TestDistributedUpgrade extends TestCase {
+  private static final Log LOG = LogFactory.getLog(
+                             "org.apache.hadoop.dfs.TestDistributedUpgrade");
+  private Configuration conf;
+  private int testCounter = 0;
+  private MiniDFSCluster cluster = null;
+    
+  /**
+   * Writes an INFO log message containing the parameters.
+   */
+  void log(String label, int numDirs) {
+    LOG.info("============================================================");
+    LOG.info("***TEST " + (testCounter++) + "*** " 
+             + label + ":"
+             + " numDirs="+numDirs);
+  }
+  
+  /**
+   * Attempts to start a NameNode with the given operation.  Starting
+   * the NameNode should throw an exception.
+   */
+  void startNameNodeShouldFail(StartupOption operation) {
+    try {
+      cluster = new MiniDFSCluster(conf, 0, operation); // should fail
+      throw new AssertionError("NameNode should have failed to start");
+    } catch (Exception expected) {
+      expected = null;
+      // expected
+    }
+  }
+  
+  /**
+   * Attempts to start a DataNode with the given operation.  Starting
+   * the DataNode should throw an exception.
+   */
+  void startDataNodeShouldFail(StartupOption operation) {
+    try {
+      cluster.startDataNodes(conf, 1, false, operation, null); // should fail
+      throw new AssertionError("DataNode should have failed to start");
+    } catch (Exception expected) {
+      // expected
+      assertFalse(cluster.isDataNodeUp());
+    }
+  }
+ 
+  /**
+   */
+  public void testDistributedUpgrade() throws Exception {
+    File[] baseDirs;
+    int numDirs = 1;
+    UpgradeUtilities.initialize();
+
+    // register new upgrade objects (ignore all existing)
+    UpgradeObjectCollection.initialize();
+    UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Datanode());
+    UpgradeObjectCollection.registerUpgrade(new UpgradeObject_Test_Namenode());
+
+    conf = UpgradeUtilities.initializeStorageStateConf(numDirs);
+    String[] nameNodeDirs = conf.getStrings("dfs.name.dir");
+    String[] dataNodeDirs = conf.getStrings("dfs.data.dir");
+    DFSAdmin dfsAdmin = new DFSAdmin();
+    dfsAdmin.setConf(conf);
+    String[] pars = {"-safemode", "wait"};
+
+    log("NameNode start in regular mode when dustributed upgrade is required", numDirs);
+    baseDirs = UpgradeUtilities.createStorageDirs(NAME_NODE, nameNodeDirs, "current");
+    UpgradeUtilities.createVersionFile(NAME_NODE, baseDirs,
+        new StorageInfo(LAYOUT_VERSION+2,
+                        UpgradeUtilities.getCurrentNamespaceID(cluster),
+                        UpgradeUtilities.getCurrentFsscTime(cluster)));
+    startNameNodeShouldFail(StartupOption.REGULAR);
+
+    log("Start NameNode only distributed upgrade", numDirs);
+    cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE);
+    cluster.shutdown();
+
+    log("NameNode start in regular mode when dustributed upgrade has been started", numDirs);
+    startNameNodeShouldFail(StartupOption.REGULAR);
+
+    log("NameNode rollback to the old version that require a dustributed upgrade", numDirs);
+    startNameNodeShouldFail(StartupOption.ROLLBACK);
+
+    log("Normal distributed upgrade for the cluster", numDirs);
+    cluster = new MiniDFSCluster(conf, 0, StartupOption.UPGRADE);
+    UpgradeUtilities.createStorageDirs(DATA_NODE, dataNodeDirs, "current");
+    cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+    dfsAdmin.run(pars);
+    cluster.shutdown();
+
+    // it should be ok to start in regular mode
+    log("NameCluster regular startup after the upgrade", numDirs);
+    cluster = new MiniDFSCluster(conf, 0, StartupOption.REGULAR);
+    cluster.startDataNodes(conf, 1, false, StartupOption.REGULAR, null);
+    cluster.shutdown();
+    UpgradeUtilities.createEmptyDirs(nameNodeDirs);
+    UpgradeUtilities.createEmptyDirs(dataNodeDirs);
+  }
+
+  public static void main(String[] args) throws Exception {
+    new TestDistributedUpgrade().testDistributedUpgrade();
+    LOG.info("=== DONE ===");
+  }
+}
+
+/**
+ * Upgrade object for data-node
+ */
+class UpgradeObject_Test_Datanode extends UpgradeObjectDatanode {
+  public int getVersion() {
+    return LAYOUT_VERSION+1;
+  }
+
+  public void doUpgrade() throws IOException {
+    this.status = (short)100;
+    getDatanode().namenode.processUpgradeCommand(
+        new UpgradeCommand(UpgradeCommand.UC_ACTION_REPORT_STATUS, 
+            getVersion(), getUpgradeStatus()));
+  }
+
+  public UpgradeCommand startUpgrade() throws IOException {
+    this.status = (short)0;
+    return null;
+  }
+}
+
+/**
+ * Upgrade object for name-node
+ */
+class UpgradeObject_Test_Namenode extends UpgradeObjectNamenode {
+  public int getVersion() {
+    return LAYOUT_VERSION+1;
+  }
+
+  synchronized public UpgradeCommand processUpgradeCommand(
+                                  UpgradeCommand command) throws IOException {
+    switch(command.action) {
+      case UpgradeCommand.UC_ACTION_REPORT_STATUS:
+        this.status += command.getCurrentStatus()/2;  // 2 reports needed
+        break;
+      default:
+        this.status++;
+    }
+    return null;
+  }
+
+  public UpgradeCommand completeUpgrade() throws IOException {
+    return null;
+  }
+}



Mime
View raw message