hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1091515 - in /hadoop/hdfs/trunk: ./ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/datanode/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/java/org/apache/...
Date Tue, 12 Apr 2011 17:37:48 GMT
Author: szetszwo
Date: Tue Apr 12 17:37:48 2011
New Revision: 1091515

URL: http://svn.apache.org/viewvc?rev=1091515&view=rev
Log:
HDFS-1606. Provide a stronger data guarantee in the write pipeline by adding a new datanode when an existing datanode failed.

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/hdfs-default.xml
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Apr 12 17:37:48 2011
@@ -27,6 +27,9 @@ Trunk (unreleased changes)
 
     HDFS_1630. Support fsedits checksum. (hairong)
 
+    HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+    adding a new datanode when an existing datanode failed.  (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1510. Added test-patch.properties required by test-patch.sh (nigel)

Modified: hadoop/hdfs/trunk/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/hdfs-default.xml?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/trunk/src/java/hdfs-default.xml Tue Apr 12 17:37:48 2011
@@ -318,6 +318,42 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+  <value>ture</value>
+  <description>
+    If there is a datanode/network failure in the write pipeline,
+    DFSClient will try to remove the failed datanode from the pipeline
+    and then continue writing with the remaining datanodes. As a result,
+    the number of datanodes in the pipeline is decreased.  The feature is
+    to add new datanodes to the pipeline.
+
+    This is a site-wise property to enable/disable the feature.
+
+    See also dfs.client.block.write.replace-datanode-on-failure.policy
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+  <value>DEFAULT</value>
+  <description>
+    This property is used only if the value of
+    dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+    ALWAYS: always add a new datanode when an existing datanode is removed.
+    
+    NEVER: never add a new datanode.
+
+    DEFAULT: 
+      Let r be the replication number.
+      Let n be the number of existing datanodes.
+      Add a new datanode only if r is greater than or equal to 3 and either
+      (1) floor(r/2) is greater than or equal to n; or
+      (2) r is greater than n and the block is hflushed/appended.
+  </description>
+</property>
+
+<property>
   <name>dfs.blockreport.intervalMsec</name>
   <value>21600000</value>
   <description>Determines block reporting interval in milliseconds.</description>

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Apr 12 17:37:48 2011
@@ -96,7 +96,6 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -136,6 +135,7 @@ public class DFSClient implements FSCons
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
+  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
 
@@ -249,6 +249,8 @@ public class DFSClient implements FSCons
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Apr 12 17:37:48 2011
@@ -40,6 +40,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+  public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Apr 12 17:37:48 2011
@@ -31,8 +31,10 @@ import java.net.Socket;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -96,9 +98,6 @@ import org.apache.hadoop.util.StringUtil
  * starts sending packets from the dataQueue.
 ****************************************************************/
 class DFSOutputStream extends FSOutputSummer implements Syncable {
-  /**
-   * 
-   */
   private final DFSClient dfsClient;
   private Configuration conf;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -295,10 +294,18 @@ class DFSOutputStream extends FSOutputSu
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
 
+    /** Nodes have been used in the pipeline before and have failed. */
+    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** Has the current block been hflushed? */
+    private boolean isHflushed = false;
+    /** Append on an existing block? */
+    private final boolean isAppend;
+
     /**
      * Default construction for file create
      */
     private DataStreamer() {
+      isAppend = false;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -311,6 +318,7 @@ class DFSOutputStream extends FSOutputSu
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
+      isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
@@ -750,6 +758,105 @@ class DFSOutputStream extends FSOutputSu
       return doSleep;
     }
 
+    private void setHflush() {
+      isHflushed = true;
+    }
+
+    private int findNewDatanode(final DatanodeInfo[] original
+        ) throws IOException {
+      if (nodes.length != original.length + 1) {
+        throw new IOException("Failed to add a datanode:"
+            + " nodes.length != original.length + 1, nodes="
+            + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+      }
+      for(int i = 0; i < nodes.length; i++) {
+        int j = 0;
+        for(; j < original.length && !nodes[i].equals(original[j]); j++);
+        if (j == original.length) {
+          return i;
+        }
+      }
+      throw new IOException("Failed: new datanode not found: nodes="
+          + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+    }
+
+    private void addDatanode2ExistingPipeline() throws IOException {
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+      }
+      /*
+       * Is data transfer necessary?  We have the following cases.
+       * 
+       * Case 1: Failure in Pipeline Setup
+       * - Append
+       *    + Transfer the stored replica, which may be a RBW or a finalized.
+       * - Create
+       *    + If no data, then no transfer is required.
+       *    + If there are data written, transfer RBW. This case may happens 
+       *      when there are streaming failure earlier in this pipeline.
+       *
+       * Case 2: Failure in Streaming
+       * - Append/Create:
+       *    + transfer RBW
+       * 
+       * Case 3: Failure in Close
+       * - Append/Create:
+       *    + no transfer, let NameNode replicates the block.
+       */
+      if (!isAppend && lastAckedSeqno < 0
+          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+        //no data have been written
+        return;
+      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        //pipeline is closing
+        return;
+      }
+
+      //get a new datanode
+      final DatanodeInfo[] original = nodes;
+      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+          src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+          1, dfsClient.clientName);
+      nodes = lb.getLocations();
+
+      //find the new datanode
+      final int d = findNewDatanode(original);
+
+      //transfer replica
+      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+      final DatanodeInfo[] targets = {nodes[d]};
+      transfer(src, targets, lb.getBlockToken());
+    }
+
+    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      //transfer replica to the new datanode
+      Socket sock = null;
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock = createSocketForPipeline(src, 2, dfsClient);
+        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+        out = new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock, writeTimeout),
+            DataNode.SMALL_BUFFER_SIZE));
+
+        //send the TRANSFER_BLOCK request
+        DataTransferProtocol.Sender.opTransferBlock(out, block,
+            dfsClient.clientName, targets, blockToken);
+
+        //ack
+        in = new DataInputStream(NetUtils.getInputStream(sock));
+        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+          throw new IOException("Failed to add a datanode");
+        }
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(sock);
+      }
+    }
 
     /**
      * Open a DataOutputStream to a DataNode pipeline so that 
@@ -793,6 +900,8 @@ class DFSOutputStream extends FSOutputSu
           DFSClient.LOG.warn("Error Recovery for block " + block +
               " in pipeline " + pipelineMsg + 
               ": bad datanode " + nodes[errorIndex].getName());
+          failed.add(nodes[errorIndex]);
+
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -803,6 +912,12 @@ class DFSOutputStream extends FSOutputSu
           errorIndex = -1;
         }
 
+        // Check if replace-datanode policy is satisfied.
+        if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+            nodes, isAppend, isHflushed)) {
+          addDatanode2ExistingPipeline();
+        }
+
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
@@ -888,7 +1003,7 @@ class DFSOutputStream extends FSOutputSu
 
       boolean result = false;
       try {
-        s = createSocketForPipeline(nodes, dfsClient);
+        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
         long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
 
         //
@@ -1026,18 +1141,19 @@ class DFSOutputStream extends FSOutputSu
 
   /**
    * Create a socket for a write pipeline
-   * @param datanodes the datanodes on the pipeline 
+   * @param first the first datanode 
+   * @param length the pipeline length
    * @param client
    * @return the socket connected to the first datanode
    */
-  static Socket createSocketForPipeline(final DatanodeInfo[] datanodes,
-      final DFSClient client) throws IOException {
+  static Socket createSocketForPipeline(final DatanodeInfo first,
+      final int length, final DFSClient client) throws IOException {
     if(DFSClient.LOG.isDebugEnabled()) {
-      DFSClient.LOG.debug("Connecting to datanode " + datanodes[0].getName());
+      DFSClient.LOG.debug("Connecting to datanode " + first.getName());
     }
-    final InetSocketAddress isa = NetUtils.createSocketAddr(datanodes[0].getName());
+    final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
     final Socket sock = client.socketFactory.createSocket();
-    final int timeout = client.getDatanodeReadTimeout(datanodes.length);
+    final int timeout = client.getDatanodeReadTimeout(length);
     NetUtils.connect(sock, isa, timeout);
     sock.setSoTimeout(timeout);
     sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
@@ -1363,6 +1479,12 @@ class DFSOutputStream extends FSOutputSu
           throw ioe;
         }
       }
+
+      synchronized(this) {
+        if (streamer != null) {
+          streamer.setHflush();
+        }
+      }
     } catch (InterruptedIOException interrupt) {
       // This kind of error doesn't mean that the stream itself is broken - just the
       // flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1577,7 +1699,7 @@ class DFSOutputStream extends FSOutputSu
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  Token<BlockTokenIdentifier> getBlockToken() {
+  synchronized Token<BlockTokenIdentifier> getBlockToken() {
     return streamer.getBlockToken();
   }
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Apr 12 17:37:48 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 65: Add listCorruptFileBlocks to ClientProtocol
+   * 66: Add getAdditionalDatanode(..)
    */
-  public static final long versionID = 65L;
+  public static final long versionID = 66L;
   
   ///////////////////////////////////////
   // File contents
@@ -298,6 +298,30 @@ public interface ClientProtocol extends 
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
 
+  /** 
+   * Get a datanode for an existing pipeline.
+   * 
+   * @param src the file being written
+   * @param blk the block being written
+   * @param existings the existing nodes in the pipeline
+   * @param excludes the excluded nodes
+   * @param numAdditionalNodes number of additional datanodes
+   * @param clientName the name of the client
+   * 
+   * @return the located block.
+   * 
+   * @throws AccessControlException If access is denied
+   * @throws FileNotFoundException If file <code>src</code> is not found
+   * @throws SafeModeException create not allowed in safemode
+   * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws IOException If an I/O error occurred
+   */
+  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws AccessControlException, FileNotFoundException,
+          SafeModeException, UnresolvedLinkException, IOException;
+
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Tue Apr 12 17:37:48 2011
@@ -25,8 +25,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -39,17 +44,17 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface DataTransferProtocol {
-  
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
   
   /** Version for data transfers between clients and datanodes
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 21:
-   *    Added a new operation Op.TRANSFER_BLOCK.
+   * Version 22:
+   *    Add a new feature to replace datanode on failure.
    */
-  public static final int DATA_TRANSFER_VERSION = 21;
+  public static final int DATA_TRANSFER_VERSION = 22;
 
   /** Operation */
   public enum Op {
@@ -749,4 +754,93 @@ public interface DataTransferProtocol {
     }
   }
 
+  /**
+   * The setting of replace-datanode-on-failure feature.
+   */
+  public enum ReplaceDatanodeOnFailure {
+    /** The feature is disabled in the entire site. */
+    DISABLE,
+    /** Never add a new datanode. */
+    NEVER,
+    /**
+     * DEFAULT policy:
+     *   Let r be the replication number.
+     *   Let n be the number of existing datanodes.
+     *   Add a new datanode only if r >= 3 and either
+     *   (1) floor(r/2) >= n; or
+     *   (2) r > n and the block is hflushed/appended.
+     */
+    DEFAULT,
+    /** Always add a new datanode when an existing datanode is removed. */
+    ALWAYS;
+
+    /** Check if the feature is enabled. */
+    public void checkEnabled() {
+      if (this == DISABLE) {
+        throw new UnsupportedOperationException(
+            "This feature is disabled.  Please refer to "
+            + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+            + " configuration property.");
+      }
+    }
+
+    /** Is the policy satisfied? */
+    public boolean satisfy(
+        final short replication, final DatanodeInfo[] existings,
+        final boolean isAppend, final boolean isHflushed) {
+      final int n = existings == null? 0: existings.length;
+      if (n == 0 || n >= replication) {
+        //don't need to add datanode for any policy.
+        return false;
+      } else if (this == DISABLE || this == NEVER) {
+        return false;
+      } else if (this == ALWAYS) {
+        return true;
+      } else {
+        //DEFAULT
+        if (replication < 3) {
+          return false;
+        } else {
+          if (n <= (replication/2)) {
+            return true;
+          } else {
+            return isAppend || isHflushed;
+          }
+        }
+      }
+    }
+
+    /** Get the setting from configuration. */
+    public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+      final boolean enabled = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+      if (!enabled) {
+        return DISABLE;
+      }
+
+      final String policy = conf.get(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+      for(int i = 1; i < values().length; i++) {
+        final ReplaceDatanodeOnFailure rdof = values()[i];
+        if (rdof.name().equalsIgnoreCase(policy)) {
+          return rdof;
+        }
+      }
+      throw new HadoopIllegalArgumentException("Illegal configuration value for "
+          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+          + ": " + policy);
+    }
+
+    /** Write the setting to configuration. */
+    public void write(final Configuration conf) {
+      conf.setBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          this != DISABLE);
+      conf.set(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          name());
+    }
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Apr 12 17:37:48 2011
@@ -1358,8 +1358,9 @@ public class DataNode extends Configured
      */
     DataTransfer(DatanodeInfo targets[], Block b, BlockConstructionStage stage,
         final String clientname) throws IOException {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getClass().getSimpleName() + ": " + b
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug(getClass().getSimpleName() + ": "
+            + b + " (numBytes=" + b.getNumBytes() + ")"
             + ", stage=" + stage
             + ", clientname=" + clientname
             + ", targests=" + Arrays.asList(targets));
@@ -2016,12 +2017,9 @@ public class DataNode extends Configured
    *          the stored GS and the visible length. 
    * @param targets
    * @param client
-   * @return whether the replica is an RBW
    */
-  boolean transferReplicaForPipelineRecovery(final Block b,
+  void transferReplicaForPipelineRecovery(final Block b,
       final DatanodeInfo[] targets, final String client) throws IOException {
-    checkWriteAccess(b);
-
     final long storedGS;
     final long visible;
     final BlockConstructionStage stage;
@@ -2033,7 +2031,8 @@ public class DataNode extends Configured
       } else if (data.isValidBlock(b)) {
         stage = BlockConstructionStage.TRANSFER_FINALIZED;
       } else {
-        throw new IOException(b + " is not a RBW or a Finalized");
+        final String r = data.getReplicaString(b.getBlockId());
+        throw new IOException(b + " is neither a RBW nor a Finalized, r=" + r);
       }
 
       storedGS = data.getStoredBlock(b.getBlockId()).getGenerationStamp();
@@ -2051,7 +2050,6 @@ public class DataNode extends Configured
     if (targets.length > 0) {
       new DataTransfer(targets, b, stage, client).run();
     }
-    return stage == BlockConstructionStage.TRANSFER_RBW;
   }
 
   // Determine a Datanode's streaming address

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Tue Apr 12 17:37:48 2011
@@ -154,7 +154,7 @@ class DataXceiver extends DataTransferPr
         datanode.socketWriteTimeout);
     DataOutputStream out = new DataOutputStream(
                  new BufferedOutputStream(baseStream, SMALL_BUFFER_SIZE));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.READ_BLOCK,
         BlockTokenSecretManager.AccessMode.READ);
   
@@ -269,7 +269,7 @@ class DataXceiver extends DataTransferPr
         new BufferedOutputStream(
             NetUtils.getOutputStream(s, datanode.socketWriteTimeout),
             SMALL_BUFFER_SIZE));
-    checkAccess(isClient? replyOut: null, block, blockToken,
+    checkAccess(replyOut, isClient, block, blockToken,
         DataTransferProtocol.Op.WRITE_BLOCK,
         BlockTokenSecretManager.AccessMode.WRITE);
 
@@ -430,13 +430,14 @@ class DataXceiver extends DataTransferPr
       final Block blk, final String client,
       final DatanodeInfo[] targets,
       final Token<BlockTokenIdentifier> blockToken) throws IOException {
-    final DataOutputStream out = new DataOutputStream(
-        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, blk, blockToken,
+    checkAccess(null, true, blk, blockToken,
         DataTransferProtocol.Op.TRANSFER_BLOCK,
         BlockTokenSecretManager.AccessMode.COPY);
 
     updateCurrentThreadName(DataTransferProtocol.Op.TRANSFER_BLOCK + " " + blk);
+
+    final DataOutputStream out = new DataOutputStream(
+        NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
     try {
       datanode.transferReplicaForPipelineRecovery(blk, targets, client);
       SUCCESS.write(out);
@@ -453,7 +454,7 @@ class DataXceiver extends DataTransferPr
       Token<BlockTokenIdentifier> blockToken) throws IOException {
     final DataOutputStream out = new DataOutputStream(
         NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
-    checkAccess(out, block, blockToken,
+    checkAccess(out, true, block, blockToken,
         DataTransferProtocol.Op.BLOCK_CHECKSUM,
         BlockTokenSecretManager.AccessMode.READ);
 
@@ -711,7 +712,7 @@ class DataXceiver extends DataTransferPr
     }
   }
 
-  private void checkAccess(final DataOutputStream out, 
+  private void checkAccess(DataOutputStream out, final boolean reply, 
       final Block blk,
       final Token<BlockTokenIdentifier> t,
       final DataTransferProtocol.Op op,
@@ -721,7 +722,11 @@ class DataXceiver extends DataTransferPr
         datanode.blockTokenSecretManager.checkAccess(t, null, blk, mode);
       } catch(InvalidToken e) {
         try {
-          if (out != null) {
+          if (reply) {
+            if (out == null) {
+              out = new DataOutputStream(
+                  NetUtils.getOutputStream(s, datanode.socketWriteTimeout));
+            }
             ERROR_ACCESS_TOKEN.write(out);
             if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
               Text.writeString(out, datanode.dnRegistration.getName());

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Tue Apr 12 17:37:48 2011
@@ -2033,6 +2033,12 @@ public class FSDataset implements FSCons
     return volumeMap.get(blockId);
   }
 
+  @Override 
+  public synchronized String getReplicaString(long blockId) {
+    final Replica r = volumeMap.get(blockId);
+    return r == null? "null": r.toString();
+  }
+
   @Override // FSDatasetInterface
   public synchronized ReplicaRecoveryInfo initReplicaRecovery(
       RecoveringBlock rBlock) throws IOException {

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Tue Apr 12 17:37:48 2011
@@ -105,6 +105,11 @@ public interface FSDatasetInterface exte
   public Replica getReplica(long blockId);
 
   /**
+   * @return replica meta information
+   */
+  public String getReplicaString(long blockId);
+
+  /**
    * @return the generation stamp stored with the block.
    */
   public Block getStoredBlock(long blkid) throws IOException;

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Apr 12 17:37:48 2011
@@ -251,6 +251,8 @@ public class FSNamesystem implements FSC
   private FsServerDefaults serverDefaults;
   // allow appending to hdfs files
   private boolean supportAppends = true;
+  private DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure = 
+      DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
 
   private volatile SafeModeInfo safeMode;  // safe mode information
   private Host2NodesMap host2DataNodeMap = new Host2NodesMap();
@@ -521,6 +523,8 @@ public class FSNamesystem implements FSC
         + " blockKeyUpdateInterval=" + blockKeyUpdateInterval / (60 * 1000)
         + " min(s), blockTokenLifetime=" + blockTokenLifetime / (60 * 1000)
         + " min(s)");
+
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
   }
 
   /**
@@ -1635,6 +1639,53 @@ public class FSNamesystem implements FSC
     return b;
   }
 
+  /** @see NameNode#getAdditionalDatanode(String, Block, DatanodeInfo[], DatanodeInfo[], int, String) */
+  LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings,  final HashMap<Node, Node> excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    //check if the feature is enabled
+    dtpReplaceDatanodeOnFailure.checkEnabled();
+
+    final DatanodeDescriptor clientnode;
+    final long preferredblocksize;
+    readLock();
+    try {
+      //check safe mode
+      if (isInSafeMode()) {
+        throw new SafeModeException("Cannot add datanode; src=" + src
+            + ", blk=" + blk, safeMode);
+      }
+
+      //check lease
+      final INodeFileUnderConstruction file = checkLease(src, clientName);
+      clientnode = file.getClientNode();
+      preferredblocksize = file.getPreferredBlockSize();
+    } finally {
+      readUnlock();
+    }
+
+    //find datanode descriptors
+    final List<DatanodeDescriptor> chosen = new ArrayList<DatanodeDescriptor>();
+    for(DatanodeInfo d : existings) {
+      final DatanodeDescriptor descriptor = getDatanode(d);
+      if (descriptor != null) {
+        chosen.add(descriptor);
+      }
+    }
+
+    // choose new datanodes.
+    final DatanodeInfo[] targets = blockManager.replicator.chooseTarget(
+        src, numAdditionalNodes, clientnode, chosen, true,
+        excludes, preferredblocksize);
+    final LocatedBlock lb = new LocatedBlock(blk, targets);
+    if (isBlockTokenEnabled) {
+      lb.setBlockToken(blockTokenSecretManager.generateToken(lb.getBlock(), 
+          EnumSet.of(BlockTokenSecretManager.AccessMode.COPY)));
+    }
+    return lb;
+  }
+
   /**
    * The client would like to let go of the given block
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Apr 12 17:37:48 2011
@@ -17,13 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.security.PrivilegedExceptionAction;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,9 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsServerDefaults;
 import org.apache.hadoop.fs.Options;
@@ -79,9 +78,6 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
-import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
-import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.Text;
@@ -841,6 +837,33 @@ public class NameNode implements Namenod
     return locatedBlock;
   }
 
+  @Override
+  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("getAdditionalDatanode: src=" + src
+          + ", blk=" + blk
+          + ", existings=" + Arrays.asList(existings)
+          + ", excludes=" + Arrays.asList(excludes)
+          + ", numAdditionalNodes=" + numAdditionalNodes
+          + ", clientName=" + clientName);
+    }
+
+    myMetrics.numGetAdditionalDatanodeOps.inc();
+
+    HashMap<Node, Node> excludeSet = null;
+    if (excludes != null) {
+      excludeSet = new HashMap<Node, Node>(excludes.length);
+      for (Node node : excludes) {
+        excludeSet.put(node, node);
+      }
+    }
+    return namesystem.getAdditionalDatanode(src, blk,
+        existings, excludeSet, numAdditionalNodes, clientName);
+  }
+
   /**
    * The client needs to give up on the block.
    */

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Tue Apr 12 17:37:48 2011
@@ -72,6 +72,8 @@ public class NameNodeMetrics implements 
                           new MetricsTimeVaryingInt("FileInfoOps", registry);
     public MetricsTimeVaryingInt numAddBlockOps = 
                           new MetricsTimeVaryingInt("AddBlockOps", registry);
+    public final MetricsTimeVaryingInt numGetAdditionalDatanodeOps
+        = new MetricsTimeVaryingInt("GetAdditionalDatanodeOps", registry);
     public MetricsTimeVaryingInt numcreateSymlinkOps = 
                           new MetricsTimeVaryingInt("CreateSymlinkOps", registry);
     public MetricsTimeVaryingInt numgetLinkTargetOps = 

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java Tue Apr 12 17:37:48 2011
@@ -343,7 +343,7 @@ public class DataTransferTestUtil {
       if (!test.isSuccess() && p.contains(index, id)) {
         FiTestUtil.LOG.info(toString(id));
         if (maxDuration <= 0) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }
@@ -391,7 +391,7 @@ public class DataTransferTestUtil {
         + minDuration + "," + maxDuration + ")";
         FiTestUtil.LOG.info(s);
         if (maxDuration <= 1) {
-          for(; true; FiTestUtil.sleep(1000)); //sleep forever
+          for(; FiTestUtil.sleep(1000); ); //sleep forever until interrupt
         } else {
           FiTestUtil.sleep(minDuration, maxDuration);
         }

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java Tue Apr 12 17:37:48 2011
@@ -73,14 +73,17 @@ public class FiTestUtil {
 
   /**
    * Sleep.
-   * If there is an InterruptedException, re-throw it as a RuntimeException.
+   * @return true if sleep exits normally; false if InterruptedException.
    */
-  public static void sleep(long ms) {
+  public static boolean sleep(long ms) {
+    LOG.info("Sleep " + ms + " ms");
     try {
       Thread.sleep(ms);
     } catch (InterruptedException e) {
-      throw new RuntimeException(e);
+      LOG.info("Sleep is interrupted", e);
+      return false;
     }
+    return true;
   }
 
   /**

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java Tue Apr 12 17:37:48 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.da
 
 import java.io.IOException;
 
+import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
@@ -35,7 +36,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.log4j.Level;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -57,6 +60,10 @@ public class TestFiDataTransferProtocol 
         REPLICATION, BLOCKSIZE);
   }
 
+  {
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
   /**
    * 1. create files with dfs
    * 2. write 1 byte
@@ -66,7 +73,7 @@ public class TestFiDataTransferProtocol 
    */
   static void write1byte(String methodName) throws IOException {
     final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
-        ).numDataNodes(REPLICATION).build();
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");

Modified: hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java (original)
+++ hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java Tue Apr 12 17:37:48 2011
@@ -23,13 +23,13 @@ import java.util.Random;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fi.DataTransferTestUtil;
-import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownDoosAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownOomAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.CountdownSleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.DataTransferTest;
 import org.apache.hadoop.fi.DataTransferTestUtil.SleepAction;
 import org.apache.hadoop.fi.DataTransferTestUtil.VerificationAction;
+import org.apache.hadoop.fi.FiTestUtil;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -37,9 +37,8 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.server.datanode.BlockReceiver;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
 import org.apache.log4j.Level;
-
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -71,6 +70,7 @@ public class TestFiDataTransferProtocol2
   {
     ((Log4JLogger) BlockReceiver.LOG).getLogger().setLevel(Level.ALL);
     ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
   }
   /**
    * 1. create files with dfs
@@ -88,7 +88,8 @@ public class TestFiDataTransferProtocol2
     FiTestUtil.LOG.info("size=" + size + ", nPackets=" + nPackets
         + ", lastPacketSize=" + lastPacketSize);
 
-    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION).build();
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).numDataNodes(REPLICATION + 1).build();
     final FileSystem dfs = cluster.getFileSystem();
     try {
       final Path p = new Path("/" + methodName + "/foo");

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Apr 12 17:37:48 2011
@@ -403,7 +403,8 @@ public class DFSTestUtil {
   public static DataTransferProtocol.Status transferRbw(final Block b, 
       final DFSClient dfsClient, final DatanodeInfo... datanodes) throws IOException {
     Assert.assertEquals(2, datanodes.length);
-    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes, dfsClient);
+    final Socket s = DFSOutputStream.createSocketForPipeline(datanodes[0],
+        datanodes.length, dfsClient);
     final long writeTimeout = dfsClient.getDatanodeWriteTimeout(datanodes.length);
     final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
         NetUtils.getOutputStream(s, writeTimeout),

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java Tue Apr 12 17:37:48 2011
@@ -59,7 +59,7 @@ public class TestFileAppend2 extends Tes
 
   private byte[] fileContents = null;
 
-  int numDatanodes = 5;
+  int numDatanodes = 6;
   int numberOfFiles = 50;
   int numThreads = 10;
   int numAppendsPerThread = 20;
@@ -350,7 +350,7 @@ public class TestFileAppend2 extends Tes
       // Insert them into a linked list.
       //
       for (int i = 0; i < numberOfFiles; i++) {
-        short replication = (short)(AppendTestUtil.nextInt(numDatanodes) + 1);
+        final int replication = AppendTestUtil.nextInt(numDatanodes - 2) + 1;
         Path testFile = new Path("/" + i + ".dat");
         FSDataOutputStream stm =
             AppendTestUtil.createFile(fs, testFile, replication);

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java Tue Apr 12 17:37:48 2011
@@ -148,7 +148,7 @@ public class TestFileAppend4 {
    */
   @Test(timeout=60000)
   public void testRecoverFinalizedBlock() throws Throwable {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
  
     try {
       cluster.waitActive();
@@ -219,7 +219,7 @@ public class TestFileAppend4 {
    */
   @Test(timeout=60000)
   public void testCompleteOtherLeaseHoldersFile() throws Throwable {
-    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
  
     try {
       cluster.waitActive();
@@ -294,8 +294,7 @@ public class TestFileAppend4 {
    * Mockito answer helper that triggers one latch as soon as the
    * method is called, then waits on another before continuing.
    */
-  @SuppressWarnings("unchecked")
-  private static class DelayAnswer implements Answer {
+  private static class DelayAnswer implements Answer<Object> {
     private final CountDownLatch fireLatch = new CountDownLatch(1);
     private final CountDownLatch waitLatch = new CountDownLatch(1);
  

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java Tue Apr 12 17:37:48 2011
@@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs;
 
 import java.io.IOException;
 import java.io.OutputStream;
-import java.security.PrivilegedExceptionAction;
 
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
@@ -71,7 +70,7 @@ public class TestReadWhileWriting {
       final int half = BLOCK_SIZE/2;
 
       //a. On Machine M1, Create file. Write half block of data.
-      //   Invoke (DFSOutputStream).fsync() on the dfs file handle.
+      //   Invoke DFSOutputStream.hflush() on the dfs file handle.
       //   Do not close file yet.
       {
         final FSDataOutputStream out = fs.create(p, true,

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java?rev=1091515&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java Tue Apr 12 17:37:48 2011
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * This class tests that a file need not be closed before its
+ * data can be read by another client.
+ */
+public class TestReplaceDatanodeOnFailure {
+  static final Log LOG = AppendTestUtil.LOG;
+
+  static final String DIR = "/" + TestReplaceDatanodeOnFailure.class.getSimpleName() + "/";
+  static final short REPLICATION = 3;
+  final private static String RACK0 = "/rack0";
+  final private static String RACK1 = "/rack1";
+
+  {
+    ((Log4JLogger)DataTransferProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /** Test DEFAULT ReplaceDatanodeOnFailure policy. */
+  @Test
+  public void testDefaultPolicy() throws Exception {
+    final DataTransferProtocol.ReplaceDatanodeOnFailure p
+        = DataTransferProtocol.ReplaceDatanodeOnFailure.DEFAULT;
+
+    final DatanodeInfo[] infos = new DatanodeInfo[5];
+    final DatanodeInfo[][] datanodes = new DatanodeInfo[infos.length + 1][];
+    datanodes[0] = new DatanodeInfo[0];
+    for(int i = 0; i < infos.length; ) {
+      infos[i] = new DatanodeInfo(new DatanodeID("dn" + i));
+      i++;
+      datanodes[i] = new DatanodeInfo[i];
+      System.arraycopy(infos, 0, datanodes[i], 0, datanodes[i].length);
+    }
+
+    final boolean[] isAppend   = {true, true, false, false};
+    final boolean[] isHflushed = {true, false, true, false};
+
+    for(short replication = 1; replication <= infos.length; replication++) {
+      for(int nExistings = 0; nExistings < datanodes.length; nExistings++) {
+        final DatanodeInfo[] existings = datanodes[nExistings];
+        Assert.assertEquals(nExistings, existings.length);
+
+        for(int i = 0; i < isAppend.length; i++) {
+          for(int j = 0; j < isHflushed.length; j++) {
+            final int half = replication/2;
+            final boolean enoughReplica = replication <= nExistings;
+            final boolean noReplica = nExistings == 0;
+            final boolean replicationL3 = replication < 3;
+            final boolean existingsLEhalf = nExistings <= half;
+            final boolean isAH = isAppend[i] || isHflushed[j];
+  
+            final boolean expected;
+            if (enoughReplica || noReplica || replicationL3) {
+              expected = false;
+            } else {
+              expected = isAH || existingsLEhalf;
+            }
+            
+            final boolean computed = p.satisfy(
+                replication, existings, isAppend[i], isHflushed[j]);
+            try {
+              Assert.assertEquals(expected, computed);
+            } catch(AssertionError e) {
+              final String s = "replication=" + replication
+                           + "\nnExistings =" + nExistings
+                           + "\nisAppend   =" + isAppend[i]
+                           + "\nisHflushed =" + isHflushed[j];
+              throw new RuntimeException(s, e);
+            }
+          }
+        }
+      }
+    }
+  }
+
+  /** Test replace datanode on failure. */
+  @Test
+  public void testReplaceDatanodeOnFailure() throws Exception {
+    final Configuration conf = new HdfsConfiguration();
+    
+    //always replace a datanode
+    DataTransferProtocol.ReplaceDatanodeOnFailure.ALWAYS.write(conf);
+
+    final String[] racks = new String[REPLICATION];
+    Arrays.fill(racks, RACK0);
+    final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf
+        ).racks(racks).numDataNodes(REPLICATION).build();
+
+    try {
+      final DistributedFileSystem fs = (DistributedFileSystem)cluster.getFileSystem();
+      final Path dir = new Path(DIR);
+      
+      final SlowWriter[] slowwriters = new SlowWriter[10];
+      for(int i = 1; i <= slowwriters.length; i++) {
+        //create slow writers in different speed
+        slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i), i*200L);
+      }
+
+      for(SlowWriter s : slowwriters) {
+        s.start();
+      }
+
+      // Let slow writers write something.
+      // Some of them are too slow and will be not yet started. 
+      sleepSeconds(1);
+
+      //start new datanodes
+      cluster.startDataNodes(conf, 2, true, null, new String[]{RACK1, RACK1});
+      //stop an old datanode
+      cluster.stopDataNode(AppendTestUtil.nextInt(REPLICATION));
+      
+      //Let the slow writer writes a few more seconds
+      //Everyone should have written something.
+      sleepSeconds(5);
+
+      //check replication and interrupt.
+      for(SlowWriter s : slowwriters) {
+        s.checkReplication();
+        s.interruptRunning();
+      }
+
+      //close files
+      for(SlowWriter s : slowwriters) {
+        s.joinAndClose();
+      }
+
+      //Verify the file
+      LOG.info("Verify the file");
+      for(int i = 0; i < slowwriters.length; i++) {
+        LOG.info(slowwriters[i].filepath + ": length="
+            + fs.getFileStatus(slowwriters[i].filepath).getLen());
+        FSDataInputStream in = null;
+        try {
+          in = fs.open(slowwriters[i].filepath);
+          for(int j = 0, x; (x = in.read()) != -1; j++) {
+            Assert.assertEquals(j, x);
+          }
+        }
+        finally {
+          IOUtils.closeStream(in);
+        }
+      }
+    } finally {
+      if (cluster != null) {cluster.shutdown();}
+    }
+  }
+
+  static void sleepSeconds(final int waittime) throws InterruptedException {
+    LOG.info("Wait " + waittime + " seconds");
+    Thread.sleep(waittime * 1000L);
+  }
+
+  static class SlowWriter extends Thread {
+    final Path filepath;
+    private FSDataOutputStream out = null;
+    final long sleepms;
+    private volatile boolean running = true;
+    
+    SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms
+        ) throws IOException {
+      super(SlowWriter.class.getSimpleName() + ":" + filepath);
+      this.filepath = filepath;
+      this.out = fs.create(filepath, REPLICATION);
+      this.sleepms = sleepms;
+    }
+
+    @Override
+    public void run() {
+      int i = 0;
+      try {
+        sleep(sleepms);
+        for(; running; i++) {
+          LOG.info(getName() + " writes " + i);
+          out.write(i);
+          out.hflush();
+          sleep(sleepms);
+        }
+      } catch(InterruptedException e) {
+        LOG.info(getName() + " interrupted:" + e);
+      } catch(IOException e) {
+        throw new RuntimeException(getName(), e);
+      } finally {
+        LOG.info(getName() + " terminated: i=" + i);
+      }
+    }
+
+    void interruptRunning() {
+      running = false;
+      interrupt();
+    }
+
+    void joinAndClose() throws InterruptedException {
+      LOG.info(getName() + " join and close");
+      join();
+      IOUtils.closeStream(out);
+    }
+
+    void checkReplication() throws IOException {
+      final DFSOutputStream dfsout = (DFSOutputStream)out.getWrappedStream();
+      Assert.assertEquals(REPLICATION, dfsout.getNumCurrentReplicas());
+    }        
+  }
+}

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1091515&r1=1091514&r2=1091515&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Tue Apr 12 17:37:48 2011
@@ -406,6 +406,12 @@ public class SimulatedFSDataset  impleme
     return blockMap.get(new Block(blockId));
   }
 
+  @Override 
+  public synchronized String getReplicaString(long blockId) {
+    final Replica r = blockMap.get(new Block(blockId));
+    return r == null? "null": r.toString();
+  }
+
   @Override
   public Block getStoredBlock(long blkid) throws IOException {
     Block b = new Block(blkid);



Mime
View raw message