hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cutt...@apache.org
Subject svn commit: r556678 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/dfs/
Date Mon, 16 Jul 2007 17:43:35 GMT
Author: cutting
Date: Mon Jul 16 10:43:33 2007
New Revision: 556678

URL: http://svn.apache.org/viewvc?view=rev&rev=556678
Log:
HADOOP-1597.  Add status reports and post-upgrade options to HDFS distributed upgrade.  Contributed
by Konstantin.

Added:
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeStatusReport.java
Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jul 16 10:43:33 2007
@@ -348,6 +348,9 @@
 
 108. HADOOP-1433.  Add job priority.  (Johan Oskarsson via tomwhite)
 
+109. HADOOP-1597.  Add status reports and post-upgrade options to HDFS
+     distributed upgrade.  (Konstantin Shvachko via cutting)
+
 
 Release 0.13.0 - 2007-06-08
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/ClientProtocol.java Mon Jul 16 10:43:33
2007
@@ -19,6 +19,7 @@
 
 import java.io.*;
 import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 
 /**********************************************************************
  * ClientProtocol is used by a piece of DFS user code to communicate 
@@ -30,11 +31,9 @@
 
   /**
    * Compared to the previous version the following changes have been introduced:
-   * 13: getListing returns file creation times and modification times.
-   *     getFileInfo added.
-   *     DatanodeInfo serialization has hostname.
+   * 14: distributedUpgradeProgress() added.
    */
-  public static final long versionID = 13L;
+  public static final long versionID = 14L;
   
   ///////////////////////////////////////
   // File contents
@@ -361,6 +360,16 @@
    * @throws IOException
    */
   public void finalizeUpgrade() throws IOException;
+
+  /**
+   * Report distributed upgrade progress or force current upgrade to proceed.
+   * 
+   * @param action {@link FSConstants.UpgradeAction} to perform
+   * @return upgrade status information or null if no upgrades are in progress
+   * @throws IOException
+   */
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action) 
+  throws IOException;
 
   /**
    * Dumps namenode data structures into specified file. If file

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSAdmin.java Mon Jul 16 10:43:33 2007
@@ -23,6 +23,7 @@
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 
 /**
  * This class provides some DFS administrative access.
@@ -46,12 +47,16 @@
       long raw = dfs.getRawCapacity();
       long rawUsed = dfs.getRawUsed();
       long used = dfs.getUsed();
-      boolean mode = dfs.setSafeMode(
-                                     FSConstants.SafeModeAction.SAFEMODE_GET);
+      boolean mode = dfs.setSafeMode(FSConstants.SafeModeAction.SAFEMODE_GET);
+      UpgradeStatusReport status = 
+                      dfs.distributedUpgradeProgress(UpgradeAction.GET_STATUS);
 
       if (mode) {
         System.out.println("Safe mode is ON");
       }
+      if (status != null) {
+        System.out.println(status.getStatusText(false));
+      }
       System.out.println("Total raw bytes: " + raw
                          + " (" + byteDesc(raw) + ")");
       System.out.println("Used raw bytes: " + rawUsed
@@ -85,7 +90,7 @@
    */
   public void setSafeMode(String[] argv, int idx) throws IOException {
     if (!(fs instanceof DistributedFileSystem)) {
-      System.err.println("FileSystem is " + fs.getName());
+      System.err.println("FileSystem is " + fs.getUri());
       return;
     }
     if (idx != argv.length - 1) {
@@ -172,6 +177,9 @@
       "\t\tof datanodes that are allowed to connect to the namenode\n" +
       "\t\tand those that should be decommissioned/recommissioned.\n";
 
+    String upgradeProgress = "-upgradeProgress <status|details|force>: request current\n"
+      + "distributed upgrade status, a detailed status or force the upgrade to proceed.";
+
     String help = "-help [cmd]: \tDisplays help for given command or all commands if none\n"
+
       "\t\tis specified.\n";
 
@@ -181,6 +189,8 @@
       System.out.println(safemode);
     } else if ("refreshNodes".equals(cmd)) {
       System.out.println(refreshNodes);
+    } else if ("upgradeProgress".equals(cmd)) {
+      System.out.println(upgradeProgress);
     } else if ("help".equals(cmd)) {
       System.out.println(help);
     } else {
@@ -188,6 +198,7 @@
       System.out.println(report);
       System.out.println(safemode);
       System.out.println(refreshNodes);
+      System.out.println(upgradeProgress);
       System.out.println(help);
     }
 
@@ -215,6 +226,44 @@
   }
 
   /**
+   * Command to request current distributed upgrade status, 
+   * a detailed status, or to force the upgrade to proceed.
+   * 
+   * Usage: java DFSAdmin -upgradeProgress [status | details | force]
+   * @exception IOException 
+   */
+  public int upgradeProgress(String[] argv, int idx) throws IOException {
+    if (!(fs instanceof DistributedFileSystem)) {
+      System.out.println("FileSystem is " + fs.getUri());
+      return -1;
+    }
+    if (idx != argv.length - 1) {
+      printUsage("-upgradeProgress");
+      return -1;
+    }
+
+    UpgradeAction action;
+    if ("status".equalsIgnoreCase(argv[idx])) {
+      action = UpgradeAction.GET_STATUS;
+    } else if ("details".equalsIgnoreCase(argv[idx])) {
+      action = UpgradeAction.DETAILED_STATUS;
+    } else if ("force".equalsIgnoreCase(argv[idx])) {
+      action = UpgradeAction.FORCE_PROCEED;
+    } else {
+      printUsage("-upgradeProgress");
+      return -1;
+    }
+
+    DistributedFileSystem dfs = (DistributedFileSystem) fs;
+    UpgradeStatusReport status = dfs.distributedUpgradeProgress(action);
+    String statusText = (status == null ? 
+        "There are no distributed upgrades in progress." :
+          status.getStatusText(action == UpgradeAction.DETAILED_STATUS));
+    System.out.println(statusText);
+    return 0;
+  }
+
+  /**
    * Dumps DFS data structures into specified file.
    * Usage: java DFSAdmin -metasave filename
    * @param argv List of of command line parameters.
@@ -248,6 +297,9 @@
     } else if ("-finalizeUpgrade".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-finalizeUpgrade]");
+    } else if ("-upgradeProgress".equals(cmd)) {
+      System.err.println("Usage: java DFSAdmin"
+                         + " [-upgradeProgress status | details | force]");
     } else if ("-metasave".equals(cmd)) {
       System.err.println("Usage: java DFSAdmin"
                          + " [-metasave filename]");
@@ -257,6 +309,7 @@
       System.err.println("           [-safemode enter | leave | get | wait]");
       System.err.println("           [-refreshNodes]");
       System.err.println("           [-finalizeUpgrade]");
+      System.err.println("           [-upgradeProgress status | details | force]");
       System.err.println("           [-metasave filename]");
       System.err.println("           [-help [cmd]]");
     }
@@ -301,6 +354,11 @@
         printUsage(cmd);
         return exitCode;
       }
+    } else if ("-upgradeProgress".equals(cmd)) {
+        if (argv.length != 2) {
+          printUsage(cmd);
+          return exitCode;
+        }
     } else if ("-metasave".equals(cmd)) {
       if (argv.length != 2) {
         printUsage(cmd);
@@ -331,6 +389,8 @@
         exitCode = refreshNodes();
       } else if ("-finalizeUpgrade".equals(cmd)) {
         exitCode = finalizeUpgrade();
+      } else if ("-upgradeProgress".equals(cmd)) {
+        exitCode = upgradeProgress(argv, i);
       } else if ("-metasave".equals(cmd)) {
         exitCode = metaSave(argv, i);
       } else if ("-help".equals(cmd)) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DFSClient.java Mon Jul 16 10:43:33
2007
@@ -491,6 +491,14 @@
   }
 
   /**
+   * @see ClientProtocol#distributedUpgradeProgress(FSConstants.UpgradeAction)
+   */
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+                                                        ) throws IOException {
+    return namenode.distributedUpgradeProgress(action);
+  }
+
+  /**
    */
   public boolean mkdirs(UTF8 src) throws IOException {
     checkOpen();
@@ -670,7 +678,7 @@
      * Fetch it from the namenode if not cached.
      * 
      * @param offset
-     * @return
+     * @return located block
      * @throws IOException
      */
     private LocatedBlock getBlockAt(long offset) throws IOException {
@@ -699,7 +707,7 @@
      * 
      * @param offset
      * @param length
-     * @return
+     * @return consequent segment of located blocks
      * @throws IOException
      */
     private List<LocatedBlock> getBlockRange(long offset, long length) 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DataNode.java Mon Jul 16 10:43:33 2007
@@ -615,6 +615,19 @@
     upgradeManager.processUpgradeCommand(comm);
   }
 
+
+  /**
+   * Start distributed upgrade if it should be initiated by the data-node.
+   */
+  private void startDistributedUpgradeIfNeeded() throws IOException {
+    UpgradeManagerDatanode um = DataNode.getDataNode().upgradeManager;
+    assert um != null : "DataNode.upgradeManager is null.";
+    if(!um.getUpgradeState())
+      return;
+    um.setUpgradeState(false, um.getUpgradeVersion());
+    um.startUpgrade();
+    return;
+  }
   private void transferBlocks( Block blocks[], 
                                DatanodeInfo xferTargets[][] 
                                ) throws IOException {
@@ -1152,6 +1165,7 @@
         
     while (shouldRun) {
       try {
+        startDistributedUpgradeIfNeeded();
         offerService();
       } catch (Exception ex) {
         LOG.error("Exception: " + StringUtils.stringifyException(ex));

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DatanodeProtocol.java Mon Jul 16 10:43:33
2007
@@ -31,11 +31,9 @@
  **********************************************************************/
 interface DatanodeProtocol extends VersionedProtocol {
   /*
-   * 6: versionRequest() added;
-   * sendHeartbeat() and blockReport() return DatanodeCommand;
-   * DatanodeRegistration contains StorageInfo
+   * 7: processUpgradeCommand() added;
    */
-  public static final long versionID = 6L;
+  public static final long versionID = 7L;
   
   // error code
   final static int NOTIFY = 0;
@@ -106,6 +104,16 @@
                           String msg) throws IOException;
     
   public NamespaceInfo versionRequest() throws IOException;
-  
+
+  /**
+   * This is a very general way to send a command to the name-node during
+   * distributed upgrade process.
+   * 
+   * The generosity is because the variety of upgrade commands is unpredictable.
+   * The reply from the name-node is also received in the form of an upgrade 
+   * command. 
+   * 
+   * @return a reply in the form of an upgrade command
+   */
   UpgradeCommand processUpgradeCommand(UpgradeCommand comm) throws IOException;
 }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/DistributedFileSystem.java Mon Jul
16 10:43:33 2007
@@ -24,6 +24,7 @@
 import org.apache.hadoop.io.*;
 import org.apache.hadoop.fs.*;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
 import org.apache.hadoop.util.*;
 
 /****************************************************************
@@ -278,6 +279,11 @@
       dfs.finalizeUpgrade();
     }
 
+    public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+                                                          ) throws IOException {
+      return dfs.distributedUpgradeProgress(action);
+    }
+
     /*
      * Requests the namenode to dump data strcutures into specified 
      * file.
@@ -403,6 +409,11 @@
    */
   public void finalizeUpgrade() throws IOException {
     ((RawDistributedFileSystem)fs).finalizeUpgrade();
+  }
+
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+                                                        ) throws IOException {
+    return ((RawDistributedFileSystem)fs).distributedUpgradeProgress(action);
   }
 
   /*

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSConstants.java Mon Jul 16 10:43:33
2007
@@ -132,6 +132,19 @@
     DATA_NODE;
   }
 
+  /**
+   * Distributed upgrade actions:
+   * 
+   * 1. Get upgrade status.
+   * 2. Get detailed upgrade status.
+   * 3. Proceed with the upgrade if it is stuck, no matter what the status is.
+   */
+  public static enum UpgradeAction {
+    GET_STATUS,
+    DETAILED_STATUS,
+    FORCE_PROCEED;
+  }
+
   // Version is reflected in the dfs image and edit log files.
   // Version is reflected in the data storage file.
   // Versions are negative.

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSImage.java Mon Jul 16 10:43:33 2007
@@ -638,7 +638,7 @@
     // Load in bits
     //
     boolean needToSave = true;
-    int imgVersion = this.getLayoutVersion();
+    int imgVersion;
     DataInputStream in = new DataInputStream(
                                              new BufferedInputStream(
                                                                      new FileInputStream(curFile)));
@@ -1064,7 +1064,8 @@
 
   private void initializeDistributedUpgrade() throws IOException {
     UpgradeManagerNamenode um = FSNamesystem.getFSNamesystem().upgradeManager;
-    um.initializeUpgrade();
+    if(! um.initializeUpgrade())
+      return;
     // write new upgrade state into disk
     FSNamesystem.getFSNamesystem().getFSImage().writeAll();
     NameNode.LOG.info("\n   Distributed upgrade for NameNode version " 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/FSNamesystem.java Mon Jul 16 10:43:33
2007
@@ -3409,6 +3409,11 @@
   // Distributed upgrade manager
   UpgradeManagerNamenode upgradeManager = new UpgradeManagerNamenode();
 
+  UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action 
+                                                 ) throws IOException {
+    return upgradeManager.distributedUpgradeProgress(action);
+  }
+
   UpgradeCommand processDistributedUpgradeCommand(UpgradeCommand comm) throws IOException
{
     return upgradeManager.processUpgradeCommand(comm);
   }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/NameNode.java Mon Jul 16 10:43:33 2007
@@ -555,6 +555,11 @@
     getFSImage().finalizeUpgrade();
   }
 
+  public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action
+                                                        ) throws IOException {
+    return namesystem.distributedUpgradeProgress(action);
+  }
+
   /**
    * Dumps namenode state into specified file
    */

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerDatanode.java Mon Jul
16 10:43:33 2007
@@ -50,23 +50,10 @@
     DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
         + getUpgradeVersion() + " to current LV " 
         + FSConstants.LAYOUT_VERSION + " is initialized.");
-    upgradeState = false;
-    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
-    if(nsUpgradeVersion >= getUpgradeVersion())
-      return;
-    String errorMsg = 
-        "\n   Datanode missed a distributed upgrade and will shutdown."
-      + "\n   namenode distributed upgrade version = " + nsUpgradeVersion
-      + "\n   expected version = " + getUpgradeVersion();
-    DataNode.LOG.fatal( errorMsg );
-    try {
-      dataNode.namenode.errorReport(dataNode.dnRegistration,
-                                    DatanodeProtocol.NOTIFY, errorMsg);
-    } catch( SocketTimeoutException e ) {  // namenode is busy
-      DataNode.LOG.info("Problem connecting to server: " 
-          + dataNode.getNameNodeAddr());
-    }
-    throw new IOException( errorMsg );
+    UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
+    curUO.setDatanode(dataNode);
+    upgradeState = curUO.preUpgradeAction(nsInfo);
+    // upgradeState is true if the data-node should start the upgrade itself
   }
 
   /**
@@ -95,7 +82,8 @@
       dataNode.namenode.processUpgradeCommand(broadcastCommand);
       return true;
     }
-    currentUpgrades = getDistributedUpgrades();
+    if(currentUpgrades == null)
+      currentUpgrades = getDistributedUpgrades();
     if(currentUpgrades == null) {
       DataNode.LOG.info("\n   Distributed upgrade for DataNode version " 
           + getUpgradeVersion() + " to current LV " 
@@ -103,11 +91,11 @@
           + "The upgrade object is not defined.");
       return false;
     }
-    upgradeState = true;
     if(currentUpgrades.size() > 1)
       throw new IOException(
           "More than one distributed upgrade objects registered for version " 
           + getUpgradeVersion());
+    upgradeState = true;
     UpgradeObjectDatanode curUO = (UpgradeObjectDatanode)currentUpgrades.first();
     curUO.setDatanode(dataNode);
     curUO.startUpgrade();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeManagerNamenode.java Mon Jul
16 10:43:33 2007
@@ -20,6 +20,8 @@
 import java.util.SortedSet;
 import java.io.IOException;
 
+import org.apache.hadoop.dfs.FSConstants.UpgradeAction;
+
 /**
  * Upgrade manager for name-nodes.
  *
@@ -100,6 +102,24 @@
     currentUpgrades = null;
     broadcastCommand = null;
     FSNamesystem.getFSNamesystem().leaveSafeMode(false);
+  }
+
+  UpgradeStatusReport distributedUpgradeProgress(FSConstants.UpgradeAction action 
+                                                ) throws IOException {
+    if(currentUpgrades == null)
+      return null;  // no upgrades are in progress
+    UpgradeObjectNamenode curUO = (UpgradeObjectNamenode)currentUpgrades.first();
+    boolean details = false;
+    switch(action) {
+    case GET_STATUS:
+      break;
+    case DETAILED_STATUS:
+      details = true;
+      break;
+    case FORCE_PROCEED:
+      curUO.forceProceed();
+    }
+    return curUO.getUpgradeStatusReport(details);
   }
 
   public static void main(String[] args) throws IOException {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObject.java Mon Jul 16 10:43:33
2007
@@ -17,10 +17,15 @@
  */
 package org.apache.hadoop.dfs;
 
+import java.io.IOException;
+
 import org.apache.hadoop.dfs.UpgradeObjectCollection.UOSignature;
 
 /**
  * Abstract upgrade object.
+ * 
+ * Contains default implementation of common methods of {@link Upgradeable}
+ * interface.
  */
 abstract class UpgradeObject implements Upgradeable {
   protected short status;
@@ -31,6 +36,11 @@
 
   public String getDescription() {
     return "Upgrade object for " + getType() + " layout version " + getVersion();
+  }
+
+  public UpgradeStatusReport getUpgradeStatusReport(boolean details) 
+                                                    throws IOException {
+    return new UpgradeStatusReport(getVersion(), getUpgradeStatus());
   }
 
   public int compareTo(Upgradeable o) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectDatanode.java Mon Jul
16 10:43:33 2007
@@ -19,6 +19,7 @@
 
 import org.apache.hadoop.util.StringUtils;
 import java.io.IOException;
+import java.net.SocketTimeoutException;
 
 /**
  * Base class for data-node upgrade objects.
@@ -44,6 +45,48 @@
    * @throws IOException
    */
   abstract void doUpgrade() throws IOException;
+
+  /**
+   * Specifies what to do before the upgrade is started.
+   * 
+   * The default implementation checks whether the data-node missed the upgrade
+   * and throws an exception if it did. This leads to the data-node shutdown.
+   * 
+   * Data-nodes usually start distributed upgrade when the name-node replies
+   * to its heartbeat with a start upgrade command.
+   * Sometimes though, e.g. when a data-node missed the upgrade and wants to
+   * catchup with the rest of the cluster, it is necessary to initiate the 
+   * upgrade directly on the data-node, since the name-node might not ever 
+   * start it. An override of this method should then return true.
+   * And the upgrade will start after data-ndoe registration but before sending
+   * its first heartbeat.
+   * 
+   * @param nsInfo name-node versions, verify that the upgrade
+   * object can talk to this name-node version if necessary.
+   * 
+   * @throws IOException
+   * @return true if data-node itself should start the upgrade or 
+   * false if it should wait until the name-node starts the upgrade.
+   */
+  boolean preUpgradeAction(NamespaceInfo nsInfo) throws IOException {
+    int nsUpgradeVersion = nsInfo.getDistributedUpgradeVersion();
+    if(nsUpgradeVersion >= getVersion())
+      return false; // name-node will perform the upgrade
+    // Missed the upgrade. Report problem to the name-node and throw exception
+    String errorMsg = 
+              "\n   Data-node missed a distributed upgrade and will shutdown."
+            + "\n   " + getDescription() + "."
+            + " Name-node version = " + nsInfo.getLayoutVersion() + ".";
+    DataNode.LOG.fatal( errorMsg );
+    try {
+      dataNode.namenode.errorReport(dataNode.dnRegistration,
+                                    DatanodeProtocol.NOTIFY, errorMsg);
+    } catch(SocketTimeoutException e) {  // namenode is busy
+      DataNode.LOG.info("Problem connecting to server: " 
+                        + dataNode.getNameNodeAddr());
+    }
+    throw new IOException(errorMsg);
+  }
 
   public void run() {
     assert dataNode != null : "UpgradeObjectDatanode.dataNode is null";

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeObjectNamenode.java Mon Jul
16 10:43:33 2007
@@ -53,4 +53,10 @@
   FSNamesystem getFSNamesystem() {
     return FSNamesystem.getFSNamesystem();
   }
+
+  public void forceProceed() throws IOException {
+    // do nothing by default
+    NameNode.LOG.info("forceProceed() is not defined for the upgrade. " 
+        + getDescription());
+  }
 }

Added: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeStatusReport.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeStatusReport.java?view=auto&rev=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeStatusReport.java (added)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/UpgradeStatusReport.java Mon Jul 16
10:43:33 2007
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.dfs;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+
+/**
+ * Base upgrade upgradeStatus class.
+ * Overload this class if specific status fields need to be reported.
+ * 
+ * Describes status of current upgrade.
+ */
+public class UpgradeStatusReport implements Writable {
+  protected int version;
+  protected short upgradeStatus;
+
+  public UpgradeStatusReport() {
+    this.version = 0;
+    this.upgradeStatus = 0;
+  }
+
+  public UpgradeStatusReport( int version, short status) {
+    this.version = version;
+    this.upgradeStatus = status;
+  }
+
+  /**
+   * Get the layout version of the currently running upgrade.
+   * @return layout version
+   */
+  public int getVersion() {
+    return this.version;
+  }
+
+  /**
+   * Get upgrade upgradeStatus as a percentage of the total upgrade done.
+   * 
+   * @see Upgradeable#getUpgradeStatus() 
+   */ 
+  public short getUpgradeStatus() {
+    return upgradeStatus;
+  }
+
+  /**
+   * Get upgradeStatus data as a text for reporting.
+   * Should be overloaded for a particular upgrade specific upgradeStatus data.
+   * 
+   * @param details true if upgradeStatus details need to be included, 
+   *                false otherwise
+   * @return text
+   */
+  public String getStatusText(boolean details) {
+    return "Distributed upgrade for version " + getVersion() 
+    + " is in progress. Status = " + getUpgradeStatus() + "%";
+  }
+
+  /**
+   * Print basic upgradeStatus details.
+   */
+  public String toString() {
+    return getStatusText(false);
+  }
+
+  /////////////////////////////////////////////////
+  // Writable
+  /////////////////////////////////////////////////
+  static {                                      // register a ctor
+    WritableFactories.setFactory
+      (UpgradeStatusReport.class,
+       new WritableFactory() {
+         public Writable newInstance() { return new UpgradeStatusReport(); }
+       });
+  }
+
+  /**
+   */
+  public void write(DataOutput out) throws IOException {
+    out.writeInt(this.version);
+    out.writeShort(this.upgradeStatus);
+  }
+
+  /**
+   */
+  public void readFields(DataInput in) throws IOException {
+    this.version = in.readInt();
+    this.upgradeStatus = in.readShort();
+  }
+}

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java?view=diff&rev=556678&r1=556677&r2=556678
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/dfs/Upgradeable.java Mon Jul 16 10:43:33
2007
@@ -83,4 +83,14 @@
    * @throws IOException
    */
   UpgradeCommand completeUpgrade() throws IOException;
+
+  /**
+   * Get status report for the upgrade.
+   * 
+   * @param details true if upgradeStatus details need to be included, 
+   *                false otherwise
+   * @return {@link UpgradeStatusReport}
+   * @throws IOException
+   */
+  UpgradeStatusReport getUpgradeStatusReport(boolean details) throws IOException;
 }



Mime
View raw message