hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1574259 [3/7] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/ src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/ src/main/java/ sr...
Date Tue, 04 Mar 2014 23:53:29 GMT
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Tue Mar  4 23:53:26 2014
@@ -18,29 +18,29 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
-import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.util.Daemon;
 
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Properties;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 /**
  * Manages storage for the set of BlockPoolSlices which share a particular 
  * block pool id, on this DataNode.
@@ -58,14 +58,24 @@ import org.apache.hadoop.util.Daemon;
  */
 @InterfaceAudience.Private
 public class BlockPoolSliceStorage extends Storage {
-  private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern
-      .compile("^(.*)"
-          + "(\\/BP-[0-9]+\\-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\-[0-9]+\\/.*)$");
+  static final String TRASH_ROOT_DIR = "trash";
+
+  private static final String BLOCK_POOL_ID_PATTERN_BASE =
+      "/BP-\\d+-\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d+/";
+
+  private static final Pattern BLOCK_POOL_PATH_PATTERN = Pattern.compile(
+      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(.*)$");
+
+  private static final Pattern BLOCK_POOL_CURRENT_PATH_PATTERN = Pattern.compile(
+      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + STORAGE_DIR_CURRENT + ")(.*)$");
+
+  private static final Pattern BLOCK_POOL_TRASH_PATH_PATTERN = Pattern.compile(
+      "^(.*)(" + BLOCK_POOL_ID_PATTERN_BASE + ")(" + TRASH_ROOT_DIR + ")(.*)$");
 
   private String blockpoolID = ""; // id of the blockpool
 
   public BlockPoolSliceStorage(StorageInfo storageInfo, String bpid) {
-    super(NodeType.DATA_NODE, storageInfo);
+    super(storageInfo);
     blockpoolID = bpid;
   }
 
@@ -93,9 +103,7 @@ public class BlockPoolSliceStorage exten
    */
   void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
-    assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() 
-        : "Block-pool and name-node layout versions must be the same.";
-
+    LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
     // 1. For each BP data directory analyze the state and
     // check whether all is consistent before transitioning.
     this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
@@ -143,8 +151,6 @@ public class BlockPoolSliceStorage exten
     // while others could be up-to-date for the regular startup.
     for (int idx = 0; idx < getNumStorageDirs(); idx++) {
       doTransition(getStorageDir(idx), nsInfo, startOpt);
-      assert getLayoutVersion() == nsInfo.getLayoutVersion() 
-          : "Data-node and name-node layout versions must be the same.";
       assert getCTime() == nsInfo.getCTime() 
           : "Data-node and name-node CTimes must be the same.";
     }
@@ -167,7 +173,7 @@ public class BlockPoolSliceStorage exten
 
   /**
    * Format a block pool slice storage. 
-   * @param sd the block pool storage
+   * @param bpSdir the block pool storage
    * @param nsInfo the name space info
    * @throws IOException Signals that an I/O exception has occurred.
    */
@@ -175,11 +181,10 @@ public class BlockPoolSliceStorage exten
     LOG.info("Formatting block pool " + blockpoolID + " directory "
         + bpSdir.getCurrentDir());
     bpSdir.clearDirectory(); // create directory
-    this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
+    this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     this.cTime = nsInfo.getCTime();
     this.namespaceID = nsInfo.getNamespaceID();
     this.blockpoolID = nsInfo.getBlockPoolID();
-    this.storageType = NodeType.DATA_NODE;
     writeProperties(bpSdir);
   }
 
@@ -206,7 +211,7 @@ public class BlockPoolSliceStorage exten
     
     if (!blockpoolID.equals("") && !blockpoolID.equals(bpid)) {
       throw new InconsistentFSStateException(storage,
-          "Unexepcted blockpoolID " + bpid + " . Expected " + blockpoolID);
+          "Unexpected blockpoolID " + bpid + ". Expected " + blockpoolID);
     }
     blockpoolID = bpid;
   }
@@ -230,7 +235,6 @@ public class BlockPoolSliceStorage exten
    * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime Regular
    * startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
    * 
-   * @param dn DataNode to which this storage belongs to
    * @param sd storage directory <SD>/current/<bpid>
    * @param nsInfo namespace info
    * @param startOpt startup option
@@ -238,12 +242,18 @@ public class BlockPoolSliceStorage exten
    */
   private void doTransition(StorageDirectory sd,
       NamespaceInfo nsInfo, StartupOption startOpt) throws IOException {
-    if (startOpt == StartupOption.ROLLBACK)
+    if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
-    
+    } else {
+      // Restore all the files in the trash. The restored files are retained
+      // during rolling upgrade rollback. They are deleted during rolling
+      // upgrade downgrade.
+      int restored = restoreBlockFilesFromTrash(getTrashRootDir(sd));
+      LOG.info("Restored " + restored + " block files from trash.");
+    }
     readProperties(sd);
     checkVersionUpgradable(this.layoutVersion);
-    assert this.layoutVersion >= HdfsConstants.LAYOUT_VERSION 
+    assert this.layoutVersion >= HdfsConstants.DATANODE_LAYOUT_VERSION 
        : "Future version is not allowed";
     if (getNamespaceID() != nsInfo.getNamespaceID()) {
       throw new IOException("Incompatible namespaceIDs in "
@@ -257,11 +267,11 @@ public class BlockPoolSliceStorage exten
           + nsInfo.getBlockPoolID() + "; datanode blockpoolID = "
           + blockpoolID);
     }
-    if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION
+    if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION
         && this.cTime == nsInfo.getCTime()) {
       return; // regular startup
     }
-    if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
+    if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
       doUpgrade(sd, nsInfo); // upgrade
       return;
@@ -294,7 +304,8 @@ public class BlockPoolSliceStorage exten
    */
   void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
     // Upgrading is applicable only to release with federation or after
-    if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+    if (!DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       return;
     }
     LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
@@ -322,18 +333,20 @@ public class BlockPoolSliceStorage exten
     File bpTmpDir = bpSd.getPreviousTmp();
     assert !bpTmpDir.exists() : "previous.tmp directory must not exist.";
     
-    // 2. Rename <SD>/curernt/<bpid>/current to <SD>/curernt/<bpid>/previous.tmp
+    // 2. Rename <SD>/current/<bpid>/current to
+    //    <SD>/current/<bpid>/previous.tmp
     rename(bpCurDir, bpTmpDir);
     
     // 3. Create new <SD>/current with block files hardlinks and VERSION
     linkAllBlocks(bpTmpDir, bpCurDir);
-    this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
+    this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     assert this.namespaceID == nsInfo.getNamespaceID() 
         : "Data-node and name-node layout versions must be the same.";
     this.cTime = nsInfo.getCTime();
     writeProperties(bpSd);
     
-    // 4.rename <SD>/curernt/<bpid>/previous.tmp to <SD>/curernt/<bpid>/previous
+    // 4.rename <SD>/current/<bpid>/previous.tmp to
+    // <SD>/current/<bpid>/previous
     rename(bpTmpDir, bpPrevDir);
     LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot()
         + " is complete");
@@ -349,7 +362,8 @@ public class BlockPoolSliceStorage exten
    * @throws IOException if the directory is not empty or it can not be removed
    */
   private void cleanupDetachDir(File detachDir) throws IOException {
-    if (!LayoutVersion.supports(Feature.APPEND_RBW_DIR, layoutVersion)
+    if (!DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion)
         && detachDir.exists() && detachDir.isDirectory()) {
 
       if (FileUtil.list(detachDir).length != 0) {
@@ -363,6 +377,43 @@ public class BlockPoolSliceStorage exten
     }
   }
 
+  /**
+   * Restore all files from the trash directory to their corresponding
+   * locations under current/
+   */
+  private int restoreBlockFilesFromTrash(File trashRoot)
+      throws  IOException {
+    int filesRestored = 0;
+    File[] children = trashRoot.exists() ? trashRoot.listFiles() : null;
+    if (children == null) {
+      return 0;
+    }
+
+    File restoreDirectory = null;
+    for (File child : children) {
+      if (child.isDirectory()) {
+        // Recurse to process subdirectories.
+        filesRestored += restoreBlockFilesFromTrash(child);
+        continue;
+      }
+
+      if (restoreDirectory == null) {
+        restoreDirectory = new File(getRestoreDirectory(child));
+        if (!restoreDirectory.exists() && !restoreDirectory.mkdirs()) {
+          throw new IOException("Failed to create directory " + restoreDirectory);
+        }
+      }
+
+      final File newChild = new File(restoreDirectory, child.getName());
+      if (!child.renameTo(newChild)) {
+        throw new IOException("Failed to rename " + child + " to " + newChild);
+      }
+      ++filesRestored;
+    }
+    FileUtil.fullyDelete(trashRoot);
+    return filesRestored;
+  }
+
   /*
    * Roll back to old snapshot at the block pool level
    * If previous directory exists: 
@@ -389,13 +440,13 @@ public class BlockPoolSliceStorage exten
     // the namespace state or can be further upgraded to it.
     // In another word, we can only roll back when ( storedLV >= software LV)
     // && ( DN.previousCTime <= NN.ctime)
-    if (!(prevInfo.getLayoutVersion() >= HdfsConstants.LAYOUT_VERSION && 
+    if (!(prevInfo.getLayoutVersion() >= HdfsConstants.DATANODE_LAYOUT_VERSION && 
         prevInfo.getCTime() <= nsInfo.getCTime())) { // cannot rollback
       throw new InconsistentFSStateException(bpSd.getRoot(),
           "Cannot rollback to a newer state.\nDatanode previous state: LV = "
               + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
               + " is newer than the namespace state: LV = "
-              + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
+              + HdfsConstants.DATANODE_LAYOUT_VERSION + " CTime = " + nsInfo.getCTime());
     }
     
     LOG.info("Rolling back storage directory " + bpSd.getRoot()
@@ -478,9 +529,6 @@ public class BlockPoolSliceStorage exten
 
   /**
    * gets the data node storage directory based on block pool storage
-   * 
-   * @param bpRoot
-   * @return
    */
   private static String getDataNodeStorageRoot(String bpRoot) {
     Matcher matcher = BLOCK_POOL_PATH_PATTERN.matcher(bpRoot);
@@ -510,4 +558,66 @@ public class BlockPoolSliceStorage exten
   public boolean isPreUpgradableLayout(StorageDirectory sd) throws IOException {
     return false;
   }
+
+  private File getTrashRootDir(StorageDirectory sd) {
+    return new File(sd.getRoot(), TRASH_ROOT_DIR);
+  }
+
+  /**
+   * Get a target subdirectory under trash/ for a given block file that is being
+   * deleted.
+   *
+   * The subdirectory structure under trash/ mirrors that under current/ to keep
+   * implicit memory of where the files are to be restored (if necessary).
+   *
+   * @return the trash directory for a given block file that is being deleted.
+   */
+  public String getTrashDirectory(File blockFile) {
+    Matcher matcher = BLOCK_POOL_CURRENT_PATH_PATTERN.matcher(blockFile.getParent());
+    String trashDirectory = matcher.replaceFirst("$1$2" + TRASH_ROOT_DIR + "$4");
+    return trashDirectory;
+  }
+
+  /**
+   * Get a target subdirectory under current/ for a given block file that is being
+   * restored from trash.
+   *
+   * The subdirectory structure under trash/ mirrors that under current/ to keep
+   * implicit memory of where the files are to be restored.
+   *
+   * @return the target directory to restore a previously deleted block file.
+   */
+  @VisibleForTesting
+  String getRestoreDirectory(File blockFile) {
+    Matcher matcher = BLOCK_POOL_TRASH_PATH_PATTERN.matcher(blockFile.getParent());
+    String restoreDirectory = matcher.replaceFirst("$1$2" + STORAGE_DIR_CURRENT + "$4");
+    LOG.info("Restoring " + blockFile + " to " + restoreDirectory);
+    return restoreDirectory;
+  }
+
+  /**
+   * Delete all files and directories in the trash directories.
+   */
+  public void restoreTrash() {
+    for (StorageDirectory sd : storageDirs) {
+      File trashRoot = getTrashRootDir(sd);
+      try {
+        restoreBlockFilesFromTrash(trashRoot);
+        FileUtil.fullyDelete(getTrashRootDir(sd));
+      } catch (IOException ioe) {
+        LOG.warn("Restoring trash failed for storage directory " + sd);
+      }
+    }
+  }
+
+  /** trash is enabled if at least one storage directory contains trash root */
+  @VisibleForTesting
+  public boolean trashEnabled() {
+    for (StorageDirectory sd : storageDirs) {
+      if (getTrashRootDir(sd).exists()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Tue Mar  4 23:53:26 2014
@@ -23,8 +23,10 @@ import java.io.BufferedOutputStream;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.File;
 import java.io.FileDescriptor;
 import java.io.FileOutputStream;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.d
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.util.DataTransferThrottler;
 import org.apache.hadoop.io.IOUtils;
@@ -52,6 +55,7 @@ import org.apache.hadoop.io.nativeio.Nat
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Time;
 
 import com.google.common.annotations.VisibleForTesting;
 
@@ -116,6 +120,7 @@ class BlockReceiver implements Closeable
   private final boolean isTransfer;
 
   private boolean syncOnClose;
+  private long restartBudget;
 
   BlockReceiver(final ExtendedBlock block, final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -135,6 +140,7 @@ class BlockReceiver implements Closeable
       this.clientname = clientname;
       this.isDatanode = clientname.length() == 0;
       this.isClient = !this.isDatanode;
+      this.restartBudget = datanode.getDnConf().restartReplicaExpiry;
 
       //for datanode, we have
       //1: clientName.length() == 0, and
@@ -723,19 +729,71 @@ class BlockReceiver implements Closeable
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception for " + block, ioe);
-      throw ioe;
+      if (datanode.isRestarting()) {
+        // Do not throw if shutting down for restart. Otherwise, it will cause
+        // premature termination of responder.
+        LOG.info("Shutting down for restart (" + block + ").");
+      } else {
+        LOG.info("Exception for " + block, ioe);
+        throw ioe;
+      }
     } finally {
-      if (!responderClosed) { // Abnormal termination of the flow above
-        IOUtils.closeStream(this);
+      // Clear the previous interrupt state of this thread.
+      Thread.interrupted();
+
+      // If a shutdown for restart was initiated, upstream needs to be notified.
+      // There is no need to do anything special if the responder was closed
+      // normally.
+      if (!responderClosed) { // Data transfer was not complete.
         if (responder != null) {
+          // In case this datanode is shutting down for quick restart,
+          // send a special ack upstream.
+          if (datanode.isRestarting() && isClient && !isTransfer) {
+            File blockFile = ((ReplicaInPipeline)replicaInfo).getBlockFile();
+            File restartMeta = new File(blockFile.getParent()  + 
+                File.pathSeparator + "." + blockFile.getName() + ".restart");
+            if (restartMeta.exists() && !restartMeta.delete()) {
+              LOG.warn("Failed to delete restart meta file: " +
+                  restartMeta.getPath());
+            }
+            try {
+              FileWriter out = new FileWriter(restartMeta);
+              // write out the current time.
+              out.write(Long.toString(Time.now() + restartBudget));
+              out.flush();
+              out.close();
+            } catch (IOException ioe) {
+              // The worst case is not recovering this RBW replica. 
+              // Client will fall back to regular pipeline recovery.
+            }
+            try {
+              ((PacketResponder) responder.getRunnable()).
+                  sendOOBResponse(PipelineAck.getRestartOOBStatus());
+              // Even if the connection is closed after the ack packet is
+              // flushed, the client can react to the connection closure 
+              // first. Insert a delay to lower the chance of client 
+              // missing the OOB ack.
+              Thread.sleep(1000);
+            } catch (InterruptedException ie) {
+              // It is already going down. Ignore this.
+            } catch (IOException ioe) {
+              LOG.info("Error sending OOB Ack.", ioe);
+            }
+          }
           responder.interrupt();
         }
+        IOUtils.closeStream(this);
         cleanupBlock();
       }
       if (responder != null) {
         try {
-          responder.join(datanode.getDnConf().getXceiverStopTimeout());
+          responder.interrupt();
+          // join() on the responder should timeout a bit earlier than the
+          // configured deadline. Otherwise, the join() on this thread will
+          // likely timeout as well.
+          long joinTimeout = datanode.getDnConf().getXceiverStopTimeout();
+          joinTimeout = joinTimeout > 1  ? joinTimeout*8/10 : joinTimeout;
+          responder.join(joinTimeout);
           if (responder.isAlive()) {
             String msg = "Join on responder thread " + responder
                 + " timed out";
@@ -744,7 +802,10 @@ class BlockReceiver implements Closeable
           }
         } catch (InterruptedException e) {
           responder.interrupt();
-          throw new IOException("Interrupted receiveBlock");
+          // do not throw if shutting down for restart.
+          if (!datanode.isRestarting()) {
+            throw new IOException("Interrupted receiveBlock");
+          }
         }
         responder = null;
       }
@@ -862,6 +923,7 @@ class BlockReceiver implements Closeable
     private final PacketResponderType type;
     /** for log and error messages */
     private final String myString; 
+    private boolean sending = false;
 
     @Override
     public String toString() {
@@ -887,7 +949,9 @@ class BlockReceiver implements Closeable
     }
 
     private boolean isRunning() {
-      return running && datanode.shouldRun;
+      // When preparing for a restart, it should continue to run until
+      // interrupted by the receiver thread.
+      return running && (datanode.shouldRun || datanode.isRestarting());
     }
     
     /**
@@ -903,44 +967,97 @@ class BlockReceiver implements Closeable
       if(LOG.isDebugEnabled()) {
         LOG.debug(myString + ": enqueue " + p);
       }
-      synchronized(this) {
+      synchronized(ackQueue) {
         if (running) {
           ackQueue.addLast(p);
-          notifyAll();
+          ackQueue.notifyAll();
+        }
+      }
+    }
+
+    /**
+     * Send an OOB response. If all acks have been sent already for the block
+     * and the responder is about to close, the delivery is not guaranteed.
+     * This is because the other end can close the connection independently.
+     * An OOB coming from downstream will be automatically relayed upstream
+     * by the responder. This method is used only by originating datanode.
+     *
+     * @param ackStatus the type of ack to be sent
+     */
+    void sendOOBResponse(final Status ackStatus) throws IOException,
+        InterruptedException {
+      if (!running) {
+        LOG.info("Cannot send OOB response " + ackStatus + 
+            ". Responder not running.");
+        return;
+      }
+
+      synchronized(this) {
+        if (sending) {
+          wait(PipelineAck.getOOBTimeout(ackStatus));
+          // Didn't get my turn in time. Give up.
+          if (sending) {
+            throw new IOException("Could not send OOB reponse in time: "
+                + ackStatus);
+          }
+        }
+        sending = true;
+      }
+
+      LOG.info("Sending an out of band ack of type " + ackStatus);
+      try {
+        sendAckUpstreamUnprotected(null, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
+            ackStatus);
+      } finally {
+        // Let others send ack. Unless there are miltiple OOB send
+        // calls, there can be only one waiter, the responder thread.
+        // In any case, only one needs to be notified.
+        synchronized(this) {
+          sending = false;
+          notify();
         }
       }
     }
     
     /** Wait for a packet with given {@code seqno} to be enqueued to ackQueue */
-    synchronized Packet waitForAckHead(long seqno) throws InterruptedException {
-      while (isRunning() && ackQueue.size() == 0) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(myString + ": seqno=" + seqno +
-                    " waiting for local datanode to finish write.");
+    Packet waitForAckHead(long seqno) throws InterruptedException {
+      synchronized(ackQueue) {
+        while (isRunning() && ackQueue.size() == 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(myString + ": seqno=" + seqno +
+                      " waiting for local datanode to finish write.");
+          }
+          ackQueue.wait();
         }
-        wait();
+        return isRunning() ? ackQueue.getFirst() : null;
       }
-      return isRunning() ? ackQueue.getFirst() : null;
     }
 
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
     @Override
-    public synchronized void close() {
-      while (isRunning() && ackQueue.size() != 0) {
-        try {
-          wait();
-        } catch (InterruptedException e) {
-          running = false;
-          Thread.currentThread().interrupt();
+    public void close() {
+      synchronized(ackQueue) {
+        while (isRunning() && ackQueue.size() != 0) {
+          try {
+            ackQueue.wait();
+          } catch (InterruptedException e) {
+            running = false;
+            Thread.currentThread().interrupt();
+          }
         }
+        if(LOG.isDebugEnabled()) {
+          LOG.debug(myString + ": closing");
+        }
+        running = false;
+        ackQueue.notifyAll();
       }
-      if(LOG.isDebugEnabled()) {
-        LOG.debug(myString + ": closing");
+
+      synchronized(this) {
+        running = false;
+        notifyAll();
       }
-      running = false;
-      notifyAll();
     }
 
     /**
@@ -968,6 +1085,14 @@ class BlockReceiver implements Closeable
               if (LOG.isDebugEnabled()) {
                 LOG.debug(myString + " got " + ack);
               }
+              // Process an OOB ACK.
+              Status oobStatus = ack.getOOBStatus();
+              if (oobStatus != null) {
+                LOG.info("Relaying an out of band ack of type " + oobStatus);
+                sendAckUpstream(ack, PipelineAck.UNKOWN_SEQNO, 0L, 0L,
+                    Status.SUCCESS);
+                continue;
+              }
               seqno = ack.getSeqno();
             }
             if (seqno != PipelineAck.UNKOWN_SEQNO
@@ -1025,6 +1150,9 @@ class BlockReceiver implements Closeable
              * status back to the client because this datanode has a problem.
              * The upstream datanode will detect that this datanode is bad, and
              * rightly so.
+             *
+             * The receiver thread can also interrupt this thread for sending
+             * an out-of-band response upstream.
              */
             LOG.info(myString + ": Thread is interrupted.");
             running = false;
@@ -1094,17 +1222,64 @@ class BlockReceiver implements Closeable
     }
     
     /**
+     * The wrapper for the unprotected version. This is only called by
+     * the responder's run() method.
+     *
      * @param ack Ack received from downstream
      * @param seqno sequence number of ack to be sent upstream
      * @param totalAckTimeNanos total ack time including all the downstream
      *          nodes
      * @param offsetInBlock offset in block for the data in packet
+     * @param myStatus the local ack status
      */
     private void sendAckUpstream(PipelineAck ack, long seqno,
         long totalAckTimeNanos, long offsetInBlock,
         Status myStatus) throws IOException {
+      try {
+        // Wait for other sender to finish. Unless there is an OOB being sent,
+        // the responder won't have to wait.
+        synchronized(this) {
+          while(sending) {
+            wait();
+          }
+          sending = true;
+        }
+
+        try {
+          if (!running) return;
+          sendAckUpstreamUnprotected(ack, seqno, totalAckTimeNanos,
+              offsetInBlock, myStatus);
+        } finally {
+          synchronized(this) {
+            sending = false;
+            notify();
+          }
+        }
+      } catch (InterruptedException ie) {
+        // The responder was interrupted. Make it go down without
+        // interrupting the receiver(writer) thread.  
+        running = false;
+      }
+    }
+
+    /**
+     * @param ack Ack received from downstream
+     * @param seqno sequence number of ack to be sent upstream
+     * @param totalAckTimeNanos total ack time including all the downstream
+     *          nodes
+     * @param offsetInBlock offset in block for the data in packet
+     * @param myStatus the local ack status
+     */
+    private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
+        long totalAckTimeNanos, long offsetInBlock, Status myStatus)
+        throws IOException {
       Status[] replies = null;
-      if (mirrorError) { // ack read error
+      if (ack == null) {
+        // A new OOB response is being sent from this node. Regardless of
+        // downstream nodes, reply should contain one reply.
+        replies = new Status[1];
+        replies[0] = myStatus;
+      } else if (mirrorError) { // ack read error
         replies = MIRROR_ERROR_STATUS;
       } else {
         short ackLen = type == PacketResponderType.LAST_IN_PIPELINE ? 0 : ack
@@ -1130,7 +1305,6 @@ class BlockReceiver implements Closeable
           && offsetInBlock > replicaInfo.getBytesAcked()) {
         replicaInfo.setBytesAcked(offsetInBlock);
       }
-
       // send my ack back to upstream datanode
       replyAck.write(upstreamOut);
       upstreamOut.flush();
@@ -1152,9 +1326,11 @@ class BlockReceiver implements Closeable
      * 
      * This should be called only when the ack queue is not empty
      */
-    private synchronized void removeAckHead() {
-      ackQueue.removeFirst();
-      notifyAll();
+    private void removeAckHead() {
+      synchronized(ackQueue) {
+        ackQueue.removeFirst();
+        ackQueue.notifyAll();
+      }
     }
   }
   

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Mar  4 23:53:26 2014
@@ -46,6 +46,8 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -82,6 +84,7 @@ public class DNConf {
   final String encryptionAlgorithm;
   
   final long xceiverStopTimeout;
+  final long restartReplicaExpiry;
 
   final long maxLockedMemory;
 
@@ -157,6 +160,10 @@ public class DNConf {
     this.maxLockedMemory = conf.getLong(
         DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
         DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+
+    this.restartReplicaExpiry = conf.getLong(
+        DFS_DATANODE_RESTART_REPLICA_EXPIRY_KEY,
+        DFS_DATANODE_RESTART_REPLICA_EXPIRY_DEFAULT) * 1000L;
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Mar  4 23:53:26 2014
@@ -17,41 +17,10 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.FileInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.io.PrintStream;
-import java.net.InetSocketAddress;
-import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.UnknownHostException;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
-import java.nio.channels.SocketChannel;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.management.ObjectName;
-
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Joiner;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.BlockingService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -68,37 +37,15 @@ import org.apache.hadoop.hdfs.HDFSPolicy
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.net.DomainPeerServer;
 import org.apache.hadoop.hdfs.net.TcpPeerServer;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
-import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
-import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
-import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
-import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
-import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
-import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.protocol.datatransfer.*;
 import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
 import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
 import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
-import org.apache.hadoop.hdfs.protocolPB.PBHelper;
-import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
+import org.apache.hadoop.hdfs.protocolPB.*;
+import org.apache.hadoop.hdfs.security.token.block.*;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
-import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@@ -112,11 +59,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
-import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
-import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.apache.hadoop.hdfs.server.protocol.*;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.hdfs.web.resources.Param;
 import org.apache.hadoop.http.HttpConfig;
@@ -139,21 +82,24 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.util.GenericOptionsParser;
-import org.apache.hadoop.util.JvmPauseMonitor;
-import org.apache.hadoop.util.ServicePlugin;
-import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.util.VersionInfo;
 import org.mortbay.util.ajax.JSON;
 
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Joiner;
-import com.google.common.base.Preconditions;
-import com.google.protobuf.BlockingService;
+import javax.management.ObjectName;
+import java.io.*;
+import java.lang.management.ManagementFactory;
+import java.net.*;
+import java.nio.channels.ClosedByInterruptException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.channels.SocketChannel;
+import java.security.PrivilegedExceptionAction;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
+import static org.apache.hadoop.util.ExitUtil.terminate;
 
 /**********************************************************
  * DataNode is a class (and program) that stores a set of
@@ -210,7 +156,14 @@ public class DataNode extends Configured
   static final Log ClientTraceLog =
     LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
   
-  private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
+  private static final String USAGE =
+      "Usage: java DataNode [-regular | -rollback | -rollingupgrade rollback]\n" +
+      "    -regular                 : Normal DataNode startup (default).\n" +
+      "    -rollback                : Rollback a standard upgrade.\n" +
+      "    -rollingupgrade rollback : Rollback a rolling upgrade operation.\n" +
+      "  Refer to HDFS documentation for the difference between standard\n" +
+      "  and rolling upgrades.";
+
   static final int CURRENT_BLOCK_FORMAT_VERSION = 1;
 
   /**
@@ -222,6 +175,8 @@ public class DataNode extends Configured
   }
   
   volatile boolean shouldRun = true;
+  volatile boolean shutdownForUpgrade = false;
+  private boolean shutdownInProgress = false;
   private BlockPoolManager blockPoolManager;
   volatile FsDatasetSpi<? extends FsVolumeSpi> data = null;
   private String clusterId = null;
@@ -265,6 +220,7 @@ public class DataNode extends Configured
   private SecureResources secureResources = null;
   private List<StorageLocation> dataDirs;
   private Configuration conf;
+  private String confVersion;
   private final long maxNumberOfBlocksToLog;
 
   private final List<String> usersWithLocalPathAccess;
@@ -293,6 +249,11 @@ public class DataNode extends Configured
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED, 
         DFSConfigKeys.DFS_HDFS_BLOCKS_METADATA_ENABLED_DEFAULT);
 
+    confVersion = "core-" +
+        conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
+        ",hdfs-" +
+        conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED");
+
     // Determine whether we should try to pass file descriptors to clients.
     if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,
               DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT)) {
@@ -1234,9 +1195,31 @@ public class DataNode extends Configured
     // offerServices may be modified.
     BPOfferService[] bposArray = this.blockPoolManager == null ? null
         : this.blockPoolManager.getAllNamenodeThreads();
-    this.shouldRun = false;
+    // If shutdown is not for restart, set shouldRun to false early. 
+    if (!shutdownForUpgrade) {
+      shouldRun = false;
+    }
+
+    // When shutting down for restart, DataXceiverServer is interrupted
+    // in order to avoid any further acceptance of requests, but the peers
+    // for block writes are not closed until the clients are notified.
+    if (dataXceiverServer != null) {
+      ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
+      this.dataXceiverServer.interrupt();
+    }
+
+    // Record the time of initial notification
+    long timeNotified = Time.now();
+
+    if (localDataXceiverServer != null) {
+      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
+      this.localDataXceiverServer.interrupt();
+    }
+
+    // Terminate directory scanner and block scanner
     shutdownPeriodicScanners();
-    
+
+    // Stop the web server
     if (infoServer != null) {
       try {
         infoServer.stop();
@@ -1244,26 +1227,24 @@ public class DataNode extends Configured
         LOG.warn("Exception shutting down DataNode", e);
       }
     }
-    if (ipcServer != null) {
-      ipcServer.stop();
-    }
     if (pauseMonitor != null) {
       pauseMonitor.stop();
     }
+
+    // shouldRun is set to false here to prevent certain threads from exiting
+    // before the restart prep is done.
+    this.shouldRun = false;
     
-    if (dataXceiverServer != null) {
-      ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
-      this.dataXceiverServer.interrupt();
-    }
-    if (localDataXceiverServer != null) {
-      ((DataXceiverServer) this.localDataXceiverServer.getRunnable()).kill();
-      this.localDataXceiverServer.interrupt();
-    }
     // wait for all data receiver threads to exit
     if (this.threadGroup != null) {
       int sleepMs = 2;
       while (true) {
-        this.threadGroup.interrupt();
+        // When shutting down for restart, wait 2.5 seconds before forcing
+        // termination of receiver threads.
+        if (!this.shutdownForUpgrade || 
+            (this.shutdownForUpgrade && (Time.now() - timeNotified > 2500))) {
+          this.threadGroup.interrupt();
+        }
         LOG.info("Waiting for threadgroup to exit, active threads is " +
                  this.threadGroup.activeCount());
         if (this.threadGroup.activeCount() == 0) {
@@ -1293,7 +1274,13 @@ public class DataNode extends Configured
       } catch (InterruptedException ie) {
       }
     }
-    
+   
+   // IPC server needs to be shutdown late in the process, otherwise
+   // shutdown command response won't get sent.
+   if (ipcServer != null) {
+      ipcServer.stop();
+    }
+
     if(blockPoolManager != null) {
       try {
         this.blockPoolManager.shutDownAll(bposArray);
@@ -1320,6 +1307,13 @@ public class DataNode extends Configured
       dataNodeInfoBeanName = null;
     }
     if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
+    LOG.info("Shutdown complete.");
+    synchronized(this) {
+      // it is already false, but setting it again to avoid a findbug warning.
+      this.shouldRun = false;
+      // Notify the main thread.
+      notifyAll();
+    }
   }
   
   
@@ -1792,6 +1786,7 @@ public class DataNode extends Configured
   /** Instantiate & Start a single datanode daemon and wait for it to finish.
    *  If this thread is specifically interrupted, it will stop waiting.
    */
+  @VisibleForTesting
   public static DataNode createDataNode(String args[],
                                  Configuration conf) throws IOException {
     return createDataNode(args, conf, null);
@@ -1800,6 +1795,7 @@ public class DataNode extends Configured
   /** Instantiate & Start a single datanode daemon and wait for it to finish.
    *  If this thread is specifically interrupted, it will stop waiting.
    */
+  @VisibleForTesting
   @InterfaceAudience.Private
   public static DataNode createDataNode(String args[], Configuration conf,
       SecureResources resources) throws IOException {
@@ -1818,7 +1814,11 @@ public class DataNode extends Configured
             && blockPoolManager.getAllNamenodeThreads().length == 0) {
           shouldRun = false;
         }
-        Thread.sleep(2000);
+        // Terminate if shutdown is complete or 2 seconds after all BPs
+        // are shutdown.
+        synchronized(this) {
+          wait(2000);
+        }
       } catch (InterruptedException ex) {
         LOG.warn("Received exception in Datanode#join: " + ex);
       }
@@ -1910,25 +1910,28 @@ public class DataNode extends Configured
    *
    * @return false if passed argements are incorrect
    */
-  private static boolean parseArguments(String args[], 
-                                        Configuration conf) {
-    int argsLen = (args == null) ? 0 : args.length;
+  @VisibleForTesting
+  static boolean parseArguments(String args[], Configuration conf) {
     StartupOption startOpt = StartupOption.REGULAR;
-    for(int i=0; i < argsLen; i++) {
-      String cmd = args[i];
+    int i = 0;
+
+    if (args != null && args.length != 0) {
+      String cmd = args[i++];
       if ("-r".equalsIgnoreCase(cmd) || "--rack".equalsIgnoreCase(cmd)) {
         LOG.error("-r, --rack arguments are not supported anymore. RackID " +
             "resolution is handled by the NameNode.");
-        terminate(1);
-      } else if ("-rollback".equalsIgnoreCase(cmd)) {
+        return false;
+      } else if (StartupOption.ROLLBACK.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.ROLLBACK;
-      } else if ("-regular".equalsIgnoreCase(cmd)) {
+      } else if (StartupOption.REGULAR.getName().equalsIgnoreCase(cmd)) {
         startOpt = StartupOption.REGULAR;
-      } else
+      } else {
         return false;
+      }
     }
+
     setStartupOption(conf, startOpt);
-    return true;
+    return (args == null || i == args.length);    // Fail if more than one cmd specified!
   }
 
   private static void setStartupOption(Configuration conf, StartupOption opt) {
@@ -1936,8 +1939,9 @@ public class DataNode extends Configured
   }
 
   static StartupOption getStartupOption(Configuration conf) {
-    return StartupOption.valueOf(conf.get(DFS_DATANODE_STARTUP_KEY,
-                                          StartupOption.REGULAR.toString()));
+    String value = conf.get(DFS_DATANODE_STARTUP_KEY,
+                            StartupOption.REGULAR.toString());
+    return StartupOption.getEnum(value);
   }
 
   /**
@@ -1968,11 +1972,15 @@ public class DataNode extends Configured
 
 
   public static void secureMain(String args[], SecureResources resources) {
+    int errorCode = 0;
     try {
       StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
       DataNode datanode = createDataNode(args, null, resources);
-      if (datanode != null)
+      if (datanode != null) {
         datanode.join();
+      } else {
+        errorCode = 1;
+      }
     } catch (Throwable e) {
       LOG.fatal("Exception in secureMain", e);
       terminate(1, e);
@@ -1982,7 +1990,7 @@ public class DataNode extends Configured
       // condition was not met. Also, In secure mode, control will go to Jsvc
       // and Datanode process hangs if it does not exit.
       LOG.warn("Exiting Datanode");
-      terminate(0);
+      terminate(errorCode);
     }
   }
   
@@ -2446,6 +2454,43 @@ public class DataNode extends Configured
     data.deleteBlockPool(blockPoolId, force);
   }
 
+  @Override // ClientDatanodeProtocol
+  public synchronized void shutdownDatanode(boolean forUpgrade) throws IOException {
+    LOG.info("shutdownDatanode command received (upgrade=" + forUpgrade +
+        "). Shutting down Datanode...");
+
+    // Shutdown can be called only once.
+    if (shutdownInProgress) {
+      throw new IOException("Shutdown already in progress.");
+    }
+    shutdownInProgress = true;
+    shutdownForUpgrade = forUpgrade;
+
+    // Asynchronously start the shutdown process so that the rpc response can be
+    // sent back.
+    Thread shutdownThread = new Thread() {
+      @Override public void run() {
+        if (!shutdownForUpgrade) {
+          // Delay the shutdown a bit if not doing for restart.
+          try {
+            Thread.sleep(1000);
+          } catch (InterruptedException ie) { }
+        }
+        shutdown();
+      }
+    };
+
+    shutdownThread.setDaemon(true);
+    shutdownThread.start();
+  }
+
+  @Override //ClientDatanodeProtocol
+  public DatanodeLocalInfo getDatanodeInfo() {
+    long uptime = ManagementFactory.getRuntimeMXBean().getUptime()/1000;
+    return new DatanodeLocalInfo(VersionInfo.getVersion(),
+        confVersion, uptime);
+  }
+
   /**
    * @param addr rpc address of the namenode
    * @return true if the datanode is connected to a NameNode at the
@@ -2471,6 +2516,10 @@ public class DataNode extends Configured
     return bp != null ? bp.isAlive() : false;
   }
 
+  boolean isRestarting() {
+    return shutdownForUpgrade;
+  }
+
   /**
    * A datanode is considered to be fully started if all the BP threads are
    * alive and all the block pools are initialized.
@@ -2519,6 +2568,11 @@ public class DataNode extends Configured
     return shouldRun;
   }
 
+  @VisibleForTesting
+  DataStorage getStorage() {
+    return storage;
+  }
+
   public ShortCircuitRegistry getShortCircuitRegistry() {
     return shortCircuitRegistry;
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Tue Mar  4 23:53:26 2014
@@ -18,27 +18,15 @@
 
 package org.apache.hadoop.hdfs.server.datanode;
 
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileLock;
-import java.util.*;
-
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.HardLink;
-import org.apache.hadoop.fs.LocalFileSystem;
-import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
-import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
@@ -50,6 +38,11 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DiskChecker;
 
+import java.io.*;
+import java.nio.channels.FileLock;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
 /** 
  * Data storage information file.
  * <p>
@@ -66,6 +59,13 @@ public class DataStorage extends Storage
   public final static String STORAGE_DIR_FINALIZED = "finalized";
   public final static String STORAGE_DIR_TMP = "tmp";
 
+  // Set of bpids for which 'trash' is currently enabled.
+  // When trash is enabled block files are moved under a separate
+  // 'trash' folder instead of being deleted right away. This can
+  // be useful during rolling upgrades, for example.
+  // The set is backed by a concurrent HashMap.
+  private Set<String> trashEnabledBpids;
+
   /**
    * Datanode UUID that this storage is currently attached to. This
    *  is the same as the legacy StorageID for datanodes that were
@@ -84,14 +84,16 @@ public class DataStorage extends Storage
 
   DataStorage() {
     super(NodeType.DATA_NODE);
+    trashEnabledBpids = Collections.newSetFromMap(
+        new ConcurrentHashMap<String, Boolean>());
   }
   
-  public StorageInfo getBPStorage(String bpid) {
+  public BlockPoolSliceStorage getBPStorage(String bpid) {
     return bpStorageMap.get(bpid);
   }
   
   public DataStorage(StorageInfo storageInfo) {
-    super(NodeType.DATA_NODE, storageInfo);
+    super(storageInfo);
   }
 
   public synchronized String getDatanodeUuid() {
@@ -108,6 +110,43 @@ public class DataStorage extends Storage
       sd.setStorageUuid(DatanodeStorage.generateUuid());
     }
   }
+
+  /**
+   * Enable trash for the specified block pool storage.
+   */
+  public void enableTrash(String bpid) {
+    if (trashEnabledBpids.add(bpid)) {
+      LOG.info("Enabled trash for bpid " + bpid);
+    }
+  }
+
+  public void restoreTrash(String bpid) {
+    if (trashEnabledBpids.contains(bpid)) {
+      getBPStorage(bpid).restoreTrash();
+      trashEnabledBpids.remove(bpid);
+      LOG.info("Restored trash for bpid " + bpid);
+    }
+  }
+
+  public boolean trashEnabled(String bpid) {
+    return trashEnabledBpids.contains(bpid);
+  }
+
+  /**
+   * If rolling upgrades are in progress then do not delete block files
+   * immediately. Instead we move the block files to an intermediate
+   * 'trash' directory. If there is a subsequent rollback, then the block
+   * files will be restored from trash.
+   *
+   * @return trash directory if rolling upgrade is in progress, null
+   *         otherwise.
+   */
+  public String getTrashDirectoryForBlockFile(String bpid, File blockFile) {
+    if (trashEnabledBpids.contains(bpid)) {
+      return ((BlockPoolSliceStorage) getBPStorage(bpid)).getTrashDirectory(blockFile);
+    }
+    return null;
+  }
   
   /**
    * Analyze storage directories.
@@ -131,10 +170,8 @@ public class DataStorage extends Storage
       // DN storage has been initialized, no need to do anything
       return;
     }
-    assert HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion() :
-      "Data-node version " + HdfsConstants.LAYOUT_VERSION + 
-      " and name-node layout version " + nsInfo.getLayoutVersion() + 
-      " must be the same.";
+    LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+        + " and name-node layout version: " + nsInfo.getLayoutVersion());
     
     // 1. For each data directory calculate its state and 
     // check whether all is consistent before transitioning.
@@ -186,15 +223,13 @@ public class DataStorage extends Storage
     // while others could be uptodate for the regular startup.
     for(int idx = 0; idx < getNumStorageDirs(); idx++) {
       doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-      assert this.getLayoutVersion() == nsInfo.getLayoutVersion() :
-        "Data-node and name-node layout versions must be the same.";
       createStorageID(getStorageDir(idx));
     }
     
     // 3. Update all storages. Some of them might have just been formatted.
     this.writeAll();
     
-    // 4. mark DN storage is initilized
+    // 4. mark DN storage is initialized
     this.initialized = true;
   }
 
@@ -261,7 +296,7 @@ public class DataStorage extends Storage
   void format(StorageDirectory sd, NamespaceInfo nsInfo,
               String datanodeUuid) throws IOException {
     sd.clearDirectory(); // create directory
-    this.layoutVersion = HdfsConstants.LAYOUT_VERSION;
+    this.layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     this.clusterID = nsInfo.getClusterID();
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
@@ -297,7 +332,8 @@ public class DataStorage extends Storage
     }
 
     // Set NamespaceID in version before federation
-    if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+    if (!DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       props.setProperty("namespaceID", String.valueOf(namespaceID));
     }
   }
@@ -321,11 +357,12 @@ public class DataStorage extends Storage
       setLayoutVersion(props, sd);
     }
     setcTime(props, sd);
-    setStorageType(props, sd);
+    checkStorageType(props, sd);
     setClusterId(props, layoutVersion, sd);
     
     // Read NamespaceID in version before federation
-    if (!LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+    if (!DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       setNamespaceID(props, sd);
     }
     
@@ -414,11 +451,12 @@ public class DataStorage extends Storage
     }
     readProperties(sd);
     checkVersionUpgradable(this.layoutVersion);
-    assert this.layoutVersion >= HdfsConstants.LAYOUT_VERSION :
+    assert this.layoutVersion >= HdfsConstants.DATANODE_LAYOUT_VERSION :
       "Future version is not allowed";
     
     boolean federationSupported = 
-      LayoutVersion.supports(Feature.FEDERATION, layoutVersion);
+      DataNodeLayoutVersion.supports(
+          LayoutVersion.Feature.FEDERATION, layoutVersion);
     // For pre-federation version - validate the namespaceID
     if (!federationSupported &&
         getNamespaceID() != nsInfo.getNamespaceID()) {
@@ -440,24 +478,22 @@ public class DataStorage extends Storage
     // meaningful at BlockPoolSliceStorage level. 
 
     // regular start up. 
-    if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION)
+    if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION)
       return; // regular startup
     
     // do upgrade
-    if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION) {
+    if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
       doUpgrade(sd, nsInfo);  // upgrade
       return;
     }
     
-    // layoutVersion < LAYOUT_VERSION. I.e. stored layout version is newer
+    // layoutVersion < DATANODE_LAYOUT_VERSION. I.e. stored layout version is newer
     // than the version supported by datanode. This should have been caught
     // in readProperties(), even if rollback was not carried out or somehow
     // failed.
     throw new IOException("BUG: The stored LV = " + this.getLayoutVersion()
-                          + " is newer than the supported LV = "
-                          + HdfsConstants.LAYOUT_VERSION
-                          + " or name node LV = "
-                          + nsInfo.getLayoutVersion());
+        + " is newer than the supported LV = "
+        + HdfsConstants.DATANODE_LAYOUT_VERSION);
   }
 
   /**
@@ -485,12 +521,14 @@ public class DataStorage extends Storage
   void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
     // If the existing on-disk layout version supportes federation, simply
     // update its layout version.
-    if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
+    if (DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       // The VERSION file is already read in. Override the layoutVersion 
       // field and overwrite the file.
       LOG.info("Updating layout version from " + layoutVersion + " to "
-          + nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
-      layoutVersion = nsInfo.getLayoutVersion();
+          + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage "
+          + sd.getRoot());
+      layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
       writeProperties(sd);
       return;
     }
@@ -498,7 +536,7 @@ public class DataStorage extends Storage
     LOG.info("Upgrading storage directory " + sd.getRoot()
              + ".\n   old LV = " + this.getLayoutVersion()
              + "; old CTime = " + this.getCTime()
-             + ".\n   new LV = " + nsInfo.getLayoutVersion()
+             + ".\n   new LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
              + "; new CTime = " + nsInfo.getCTime());
     
     File curDir = sd.getCurrentDir();
@@ -528,7 +566,7 @@ public class DataStorage extends Storage
     linkAllBlocks(tmpDir, bbwDir, new File(curBpDir, STORAGE_DIR_CURRENT));
     
     // 4. Write version file under <SD>/current
-    layoutVersion = HdfsConstants.LAYOUT_VERSION;
+    layoutVersion = HdfsConstants.DATANODE_LAYOUT_VERSION;
     clusterID = nsInfo.getClusterID();
     writeProperties(sd);
     
@@ -548,7 +586,8 @@ public class DataStorage extends Storage
    * @throws IOException if the directory is not empty or it can not be removed
    */
   private void cleanupDetachDir(File detachDir) throws IOException {
-    if (!LayoutVersion.supports(Feature.APPEND_RBW_DIR, layoutVersion) &&
+    if (!DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.APPEND_RBW_DIR, layoutVersion) &&
         detachDir.exists() && detachDir.isDirectory() ) {
       
         if (FileUtil.list(detachDir).length != 0 ) {
@@ -584,19 +623,13 @@ public class DataStorage extends Storage
     File prevDir = sd.getPreviousDir();
     // This is a regular startup or a post-federation rollback
     if (!prevDir.exists()) {
-      // The current datanode version supports federation and the layout
-      // version from namenode matches what the datanode supports. An invalid
-      // rollback may happen if namenode didn't rollback and datanode is
-      // running a wrong version.  But this will be detected in block pool
-      // level and the invalid VERSION content will be overwritten when
-      // the error is corrected and rollback is retried.
-      if (LayoutVersion.supports(Feature.FEDERATION,
-          HdfsConstants.LAYOUT_VERSION) && 
-          HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion()) {
-        readProperties(sd, nsInfo.getLayoutVersion());
+      if (DataNodeLayoutVersion.supports(LayoutVersion.Feature.FEDERATION,
+          HdfsConstants.DATANODE_LAYOUT_VERSION)) {
+        readProperties(sd, HdfsConstants.DATANODE_LAYOUT_VERSION);
         writeProperties(sd);
-        LOG.info("Layout version rolled back to " +
-            nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
+        LOG.info("Layout version rolled back to "
+            + HdfsConstants.DATANODE_LAYOUT_VERSION + " for storage "
+            + sd.getRoot());
       }
       return;
     }
@@ -605,16 +638,17 @@ public class DataStorage extends Storage
 
     // We allow rollback to a state, which is either consistent with
     // the namespace state or can be further upgraded to it.
-    if (!(prevInfo.getLayoutVersion() >= HdfsConstants.LAYOUT_VERSION
+    if (!(prevInfo.getLayoutVersion() >= HdfsConstants.DATANODE_LAYOUT_VERSION
           && prevInfo.getCTime() <= nsInfo.getCTime()))  // cannot rollback
       throw new InconsistentFSStateException(sd.getRoot(),
           "Cannot rollback to a newer state.\nDatanode previous state: LV = "
               + prevInfo.getLayoutVersion() + " CTime = " + prevInfo.getCTime()
               + " is newer than the namespace state: LV = "
-              + nsInfo.getLayoutVersion() + " CTime = " + nsInfo.getCTime());
+              + HdfsConstants.DATANODE_LAYOUT_VERSION + " CTime = "
+              + nsInfo.getCTime());
     LOG.info("Rolling back storage directory " + sd.getRoot()
-             + ".\n   target LV = " + nsInfo.getLayoutVersion()
-             + "; target CTime = " + nsInfo.getCTime());
+        + ".\n   target LV = " + HdfsConstants.DATANODE_LAYOUT_VERSION
+        + "; target CTime = " + nsInfo.getCTime());
     File tmpDir = sd.getRemovedTmp();
     assert !tmpDir.exists() : "removed.tmp directory must not exist.";
     // rename current to tmp
@@ -677,9 +711,11 @@ public class DataStorage extends Storage
   
   /*
    * Finalize the upgrade for a block pool
+   * This also empties trash created during rolling upgrade and disables
+   * trash functionality.
    */
   void finalizeUpgrade(String bpID) throws IOException {
-    // To handle finalizing a snapshot taken at datanode level while 
+    // To handle finalizing a snapshot taken at datanode level while
     // upgrading to federation, if datanode level snapshot previous exists, 
     // then finalize it. Else finalize the corresponding BP.
     for (StorageDirectory sd : storageDirs) {
@@ -710,7 +746,8 @@ public class DataStorage extends Storage
     HardLink hardLink = new HardLink();
     // do the link
     int diskLayoutVersion = this.getLayoutVersion();
-    if (LayoutVersion.supports(Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
+    if (DataNodeLayoutVersion.supports(
+        LayoutVersion.Feature.APPEND_RBW_DIR, diskLayoutVersion)) {
       // hardlink finalized blocks in tmpDir/finalized
       linkBlocks(new File(fromDir, STORAGE_DIR_FINALIZED), 
           new File(toDir, STORAGE_DIR_FINALIZED), diskLayoutVersion, hardLink);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Mar  4 23:53:26 2014
@@ -166,8 +166,8 @@ class DataXceiver extends Receiver imple
     int opsProcessed = 0;
     Op op = null;
 
-    dataXceiverServer.addPeer(peer);
     try {
+      dataXceiverServer.addPeer(peer, Thread.currentThread());
       peer.setWriteTimeout(datanode.getDnConf().socketWriteTimeout);
       InputStream input = socketIn;
       if ((!peer.hasSecureChannel()) && dnConf.encryptDataTransfer) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Tue Mar  4 23:53:26 2014
@@ -20,8 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.HashMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.conf.Configuration;
@@ -45,7 +44,8 @@ class DataXceiverServer implements Runna
   
   private final PeerServer peerServer;
   private final DataNode datanode;
-  private final Set<Peer> peers = new HashSet<Peer>();
+  private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();
+  private boolean closed = false;
   
   /**
    * Maximal number of concurrent xceivers per node.
@@ -127,7 +127,7 @@ class DataXceiverServer implements Runna
   @Override
   public void run() {
     Peer peer = null;
-    while (datanode.shouldRun) {
+    while (datanode.shouldRun && !datanode.shutdownForUpgrade) {
       try {
         peer = peerServer.accept();
 
@@ -147,7 +147,7 @@ class DataXceiverServer implements Runna
       } catch (AsynchronousCloseException ace) {
         // another thread closed our listener socket - that's expected during shutdown,
         // but not in other circumstances
-        if (datanode.shouldRun) {
+        if (datanode.shouldRun && !datanode.shutdownForUpgrade) {
           LOG.warn(datanode.getDisplayName() + ":DataXceiverServer: ", ace);
         }
       } catch (IOException ie) {
@@ -170,31 +170,53 @@ class DataXceiverServer implements Runna
         datanode.shouldRun = false;
       }
     }
-    synchronized (this) {
-      for (Peer p : peers) {
-        IOUtils.cleanup(LOG, p);
-      }
-    }
+
+    // Close the server to stop reception of more requests.
     try {
       peerServer.close();
+      closed = true;
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName()
           + " :DataXceiverServer: close exception", ie);
     }
+
+    // if in restart prep stage, notify peers before closing them.
+    if (datanode.shutdownForUpgrade) {
+      restartNotifyPeers();
+      // Each thread needs some time to process it. If a thread needs
+      // to send an OOB message to the client, but blocked on network for
+      // long time, we need to force its termination.
+      LOG.info("Shutting down DataXceiverServer before restart");
+      // Allow roughly up to 2 seconds.
+      for (int i = 0; getNumPeers() > 0 && i < 10; i++) {
+        try {
+          Thread.sleep(200);
+        } catch (InterruptedException e) {
+          // ignore
+        }
+      }
+    }
+    // Close all peers.
+    closeAllPeers();
   }
 
   void kill() {
-    assert datanode.shouldRun == false :
-      "shoudRun should be set to false before killing";
+    assert (datanode.shouldRun == false || datanode.shutdownForUpgrade) :
+      "shoudRun should be set to false or restarting should be true"
+      + " before killing";
     try {
       this.peerServer.close();
+      this.closed = true;
     } catch (IOException ie) {
       LOG.warn(datanode.getDisplayName() + ":DataXceiverServer.kill(): ", ie);
     }
   }
   
-  synchronized void addPeer(Peer peer) {
-    peers.add(peer);
+  synchronized void addPeer(Peer peer, Thread t) throws IOException {
+    if (closed) {
+      throw new IOException("Server closed.");
+    }
+    peers.put(peer, t);
   }
 
   synchronized void closePeer(Peer peer) {
@@ -202,6 +224,31 @@ class DataXceiverServer implements Runna
     IOUtils.cleanup(null, peer);
   }
 
+  // Notify all peers of the shutdown and restart.
+  // datanode.shouldRun should still be true and datanode.restarting should
+  // be set true before calling this method.
+  synchronized void restartNotifyPeers() {
+    assert (datanode.shouldRun == true && datanode.shutdownForUpgrade);
+    for (Peer p : peers.keySet()) {
+      // interrupt each and every DataXceiver thread.
+      peers.get(p).interrupt();
+    }
+  }
+
+  // Close all peers and clear the map.
+  synchronized void closeAllPeers() {
+    LOG.info("Closing all peers.");
+    for (Peer p : peers.keySet()) {
+      IOUtils.cleanup(LOG, p);
+    }
+    peers.clear();
+  }
+
+  // Return the number of peers.
+  synchronized int getNumPeers() {
+    return peers.size();
+  }
+
   synchronized void releasePeer(Peer peer) {
     peers.remove(peer);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Mar  4 23:53:26 2014
@@ -412,5 +412,22 @@ public interface FsDatasetSpi<V extends 
    */
   public HdfsBlocksMetadata getHdfsBlocksMetadata(String bpid,
       long[] blockIds) throws IOException;
+
+  /**
+   * Enable 'trash' for the given dataset. When trash is enabled, files are
+   * moved to a separate trash directory instead of being deleted immediately.
+   * This can be useful for example during rolling upgrades.
+   */
+  public void enableTrash(String bpid);
+
+  /**
+   * Restore trash
+   */
+  public void restoreTrash(String bpid);
+
+  /**
+   * @return true when trash is enabled
+   */
+  public boolean trashEnabled(String bpid);
 }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java Tue Mar  4 23:53:26 2014
@@ -21,9 +21,12 @@ import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.RandomAccessFile;
+import java.util.Scanner;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.DU;
@@ -36,11 +39,14 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.Time;
 
 /**
  * A block pool slice represents a portion of a block pool stored on a volume.  
@@ -56,6 +62,9 @@ class BlockPoolSlice {
   private final LDir finalizedDir; // directory store Finalized replica
   private final File rbwDir; // directory store RBW replica
   private final File tmpDir; // directory store Temporary replica
+  private static String DU_CACHE_FILE = "dfsUsed";
+  private volatile boolean dfsUsedSaved = false;
+  private static final int SHUTDOWN_HOOK_PRIORITY = 30;
   
   // TODO:FEDERATION scalability issue - a thread per DU is needed
   private final DU dfsUsage;
@@ -106,8 +115,21 @@ class BlockPoolSlice {
         throw new IOException("Mkdirs failed to create " + tmpDir.toString());
       }
     }
-    this.dfsUsage = new DU(bpDir, conf);
+    // Use cached value initially if available. Or the following call will
+    // block until the initial du command completes.
+    this.dfsUsage = new DU(bpDir, conf, loadDfsUsed());
     this.dfsUsage.start();
+
+    // Make the dfs usage to be saved during shutdown.
+    ShutdownHookManager.get().addShutdownHook(
+      new Runnable() {
+        @Override
+        public void run() {
+          if (!dfsUsedSaved) {
+            saveDfsUsed();
+          }
+        }
+      }, SHUTDOWN_HOOK_PRIORITY);
   }
 
   File getDirectory() {
@@ -131,6 +153,79 @@ class BlockPoolSlice {
     return dfsUsage.getUsed();
   }
   
+   /**
+   * Read in the cached DU value and return it if it is less than 600 seconds
+   * old (DU update interval). Slight imprecision of dfsUsed is not critical
+   * and skipping DU can significantly shorten the startup time.
+   * If the cached value is not available or too old, -1 is returned.
+   */
+  long loadDfsUsed() {
+    long cachedDfsUsed;
+    long mtime;
+    Scanner sc;
+
+    try {
+      sc = new Scanner(new File(currentDir, DU_CACHE_FILE));
+    } catch (FileNotFoundException fnfe) {
+      return -1;
+    }
+
+    try {
+      // Get the recorded dfsUsed from the file.
+      if (sc.hasNextLong()) {
+        cachedDfsUsed = sc.nextLong();
+      } else {
+        return -1;
+      }
+      // Get the recorded mtime from the file.
+      if (sc.hasNextLong()) {
+        mtime = sc.nextLong();
+      } else {
+        return -1;
+      }
+
+      // Return the cached value if mtime is okay.
+      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
+        FsDatasetImpl.LOG.info("Cached dfsUsed found for " + currentDir + ": " +
+            cachedDfsUsed);
+        return cachedDfsUsed;
+      }
+      return -1;
+    } finally {
+      sc.close();
+    }
+  }
+
+  /**
+   * Write the current dfsUsed to the cache file.
+   */
+  void saveDfsUsed() {
+    File outFile = new File(currentDir, DU_CACHE_FILE);
+    if (outFile.exists() && !outFile.delete()) {
+      FsDatasetImpl.LOG.warn("Failed to delete old dfsUsed file in " +
+        outFile.getParent());
+    }
+
+    FileWriter out = null;
+    try {
+      long used = getDfsUsed();
+      if (used > 0) {
+        out = new FileWriter(outFile);
+        // mtime is written last, so that truncated writes won't be valid.
+        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
+        out.flush();
+        out.close();
+        out = null;
+      }
+    } catch (IOException ioe) {
+      // If write failed, the volume might be bad. Since the cache file is
+      // not critical, log the error and continue.
+      FsDatasetImpl.LOG.warn("Failed to write dfsUsed to " + outFile, ioe);
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+
   /**
    * Temporary files. They get moved to the finalized block directory when
    * the block is finalized.
@@ -191,9 +286,39 @@ class BlockPoolSlice {
         newReplica = new FinalizedReplica(blockId, 
             blockFile.length(), genStamp, volume, blockFile.getParentFile());
       } else {
-        newReplica = new ReplicaWaitingToBeRecovered(blockId,
-            validateIntegrityAndSetLength(blockFile, genStamp), 
-            genStamp, volume, blockFile.getParentFile());
+
+        boolean loadRwr = true;
+        File restartMeta = new File(blockFile.getParent()  +
+            File.pathSeparator + "." + blockFile.getName() + ".restart");
+        Scanner sc = null;
+        try {
+          sc = new Scanner(restartMeta);
+          // The restart meta file exists
+          if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+            // It didn't expire. Load the replica as a RBW.
+            newReplica = new ReplicaBeingWritten(blockId,
+                validateIntegrityAndSetLength(blockFile, genStamp), 
+                genStamp, volume, blockFile.getParentFile(), null);
+            loadRwr = false;
+          }
+          sc.close();
+          if (restartMeta.delete()) {
+            FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
+              restartMeta.getPath());
+          }
+        } catch (FileNotFoundException fnfe) {
+          // nothing to do here
+        } finally {
+          if (sc != null) {
+            sc.close();
+          }
+        }
+        // Restart meta doesn't exist or expired.
+        if (loadRwr) {
+          newReplica = new ReplicaWaitingToBeRecovered(blockId,
+              validateIntegrityAndSetLength(blockFile, genStamp), 
+              genStamp, volume, blockFile.getParentFile());
+        }
       }
 
       ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
@@ -296,6 +421,8 @@ class BlockPoolSlice {
   }
   
   void shutdown() {
+    saveDfsUsed();
+    dfsUsedSaved = true;
     dfsUsage.shutdown();
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1574259&r1=1574258&r2=1574259&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Tue Mar  4 23:53:26 2014
@@ -153,29 +153,34 @@ class FsDatasetAsyncDiskService {
    * dfsUsed statistics accordingly.
    */
   void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
-      ExtendedBlock block) {
+      ExtendedBlock block, String trashDirectory) {
     LOG.info("Scheduling " + block.getLocalBlock()
         + " file " + blockFile + " for deletion");
     ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
-        volume, blockFile, metaFile, block);
+        volume, blockFile, metaFile, block, trashDirectory);
     execute(volume.getCurrentDir(), deletionTask);
   }
   
   /** A task for deleting a block file and its associated meta file, as well
-   *  as decrement the dfs usage of the volume. 
+   *  as decrement the dfs usage of the volume.
+   *  Optionally accepts a trash directory. If one is specified then the files
+   *  are moved to trash instead of being deleted. If none is specified then the
+   *  files are deleted immediately.
    */
   class ReplicaFileDeleteTask implements Runnable {
     final FsVolumeImpl volume;
     final File blockFile;
     final File metaFile;
     final ExtendedBlock block;
+    final String trashDirectory;
     
     ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile,
-        File metaFile, ExtendedBlock block) {
+        File metaFile, ExtendedBlock block, String trashDirectory) {
       this.volume = volume;
       this.blockFile = blockFile;
       this.metaFile = metaFile;
       this.block = block;
+      this.trashDirectory = trashDirectory;
     }
 
     @Override
@@ -186,12 +191,39 @@ class FsDatasetAsyncDiskService {
           + " and meta file " + metaFile + " from volume " + volume;
     }
 
+    private boolean deleteFiles() {
+      return blockFile.delete() && (metaFile.delete() || !metaFile.exists());
+    }
+
+    private boolean moveFiles() {
+      File trashDirFile = new File(trashDirectory);
+      if (!trashDirFile.exists() && !trashDirFile.mkdirs()) {
+        LOG.error("Failed to create trash directory " + trashDirectory);
+        return false;
+      }
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Moving files " + blockFile.getName() + " and " +
+            metaFile.getName() + " to trash.");
+      }
+
+      File newBlockFile = new File(trashDirectory, blockFile.getName());
+      File newMetaFile = new File(trashDirectory, metaFile.getName());
+      return (blockFile.renameTo(newBlockFile) &&
+              metaFile.renameTo(newMetaFile));
+    }
+
     @Override
     public void run() {
       long dfsBytes = blockFile.length() + metaFile.length();
-      if (!blockFile.delete() || (!metaFile.delete() && metaFile.exists())) {
-        LOG.warn("Unexpected error trying to delete block "
-            + block.getBlockPoolId() + " " + block.getLocalBlock()
+      boolean result;
+
+      result = (trashDirectory == null) ? deleteFiles() : moveFiles();
+
+      if (!result) {
+        LOG.warn("Unexpected error trying to "
+            + (trashDirectory == null ? "delete" : "move")
+            + " block " + block.getBlockPoolId() + " " + block.getLocalBlock()
             + " at file " + blockFile + ". Ignored.");
       } else {
         if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){



Mime
View raw message