hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t...@apache.org
Subject svn commit: r1095253 [1/4] - in /hadoop/hdfs/branches/HDFS-1073: ./ bin/ src/c++/libhdfs/ src/contrib/hdfsproxy/ src/java/ src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/had...
Date Wed, 20 Apr 2011 02:28:21 GMT
Author: todd
Date: Wed Apr 20 02:28:19 2011
New Revision: 1095253

URL: http://svn.apache.org/viewvc?rev=1095253&view=rev
Log:
Merge HDFS trunk r1095235 into HDFS-1073 branch.

Added:
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
      - copied unchanged from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestCmdDFS.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
      - copied unchanged from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/CLITestHelperDFS.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/util/
      - copied from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
      - copied unchanged from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/cli/util/CLICommandDFSAdmin.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
      - copied unchanged from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestResolveHdfsSymlink.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
      - copied unchanged from r1095244, hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplaceDatanodeOnFailure.java
Removed:
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/CmdFactoryDFS.java
Modified:
    hadoop/hdfs/branches/HDFS-1073/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
    hadoop/hdfs/branches/HDFS-1073/bin/hdfs
    hadoop/hdfs/branches/HDFS-1073/build.xml   (contents, props changed)
    hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/fs/Hdfs.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/BackupImage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileOutputStream.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSImageSerialization.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsElement.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineEditsViewer/EditsLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/DataTransferTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/fi/FiTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/BlockReceiverAspects.aj
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiDataTransferProtocol2.java
    hadoop/hdfs/branches/HDFS-1073/src/test/aop/org/apache/hadoop/hdfs/server/datanode/TestFiPipelineClose.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/TestHDFSCLI.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/cli/testHDFSConf.xml
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/fs/TestFcHdfsSymlink.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend2.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend4.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestTransferRbw.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlocksWithNotEnoughRacks.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java
    hadoop/hdfs/branches/HDFS-1073/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestStorageRestore.java
    hadoop/hdfs/branches/HDFS-1073/src/test/unit/org/apache/hadoop/hdfs/server/namenode/TestINodeFile.java
    hadoop/hdfs/branches/HDFS-1073/src/webapps/datanode/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/branches/HDFS-1073/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/branches/HDFS-1073/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -1,3 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-265:796829-820463
 /hadoop/hdfs/branches/branch-0.21:820487
+/hadoop/hdfs/trunk:1086482-1095244

Modified: hadoop/hdfs/branches/HDFS-1073/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/CHANGES.txt?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-1073/CHANGES.txt Wed Apr 20 02:28:19 2011
@@ -21,9 +21,19 @@ Trunk (unreleased changes)
 
     HDFS-1675. Support transferring RBW between datanodes. (szetszwo)
 
-    HDFS-1785. In BlockReceiver and DataXceiver, clientName.length() is used
-    multiple times for determining whether the source is a client or a
-    datanode.  (szetszwo)
+    HDFS-1761. Add a new DataTransferProtocol operation, Op.TRANSFER_BLOCK,
+    for transferring RBW/Finalized with acknowledgement and without using RPC.
+    (szetszwo)
+
+    HDFS_1630. Support fsedits checksum. (hairong)
+
+    HDFS-1606. Provide a stronger data guarantee in the write pipeline by
+    adding a new datanode when an existing datanode failed.  (szetszwo)
+
+    HDFS-1442. Api to get delegation token in Hdfs class. (jitendra)
+
+    HDFS-1070. Speedup namenode image loading and saving by storing only
+    local file names. (hairong)
 
   IMPROVEMENTS
 
@@ -72,9 +82,6 @@ Trunk (unreleased changes)
     HDFS-1736. Remove the dependency from DatanodeJspHelper to FsShell.
     (Daryn Sharp via szetszwo)
     
-    HDFS-1731. Amend previous commit for this JIRA to fix build on Cygwin.
-    (todd)
-
     HDFS-780. Revive TestFuseDFS. (eli)
 
     HDFS-1445. Batch the calls in DataStorage to FileUtil.createHardLink().
@@ -89,6 +96,31 @@ Trunk (unreleased changes)
     HDFS-1120. Make DataNode's block-to-device placement policy pluggable
     (Harsh J Chouraria via todd)
 
+    HDFS-1785. In BlockReceiver and DataXceiver, clientName.length() is used
+    multiple times for determining whether the source is a client or a
+    datanode.  (szetszwo)
+
+    HDFS-1789. Refactor frequently used codes from DFSOutputStream and
+    DataXceiver.  (szetszwo)
+
+    HDFS-1767. Namenode ignores non-initial block report from datanodes
+    when in safemode during startup. (Matt Foley via suresh)
+
+    HDFS-1817. Move pipeline_Fi_[39-51] from TestFiDataTransferProtocol
+    to TestFiPipelineClose.  (szetszwo)
+
+    HDFS-1760. In FSDirectory.getFullPathName(..), it is better to return "/"
+    for root directory instead of an empty string.  (Daryn Sharp via szetszwo)
+
+    HDFS-1833. Reduce repeated string constructions and unnecessary fields,
+    and fix comments in BlockReceiver.PacketResponder.  (szetszwo)
+
+    HDFS-1486. Generalize CLITest structure and interfaces to faciliate
+    upstream adoption (e.g. for web testing). (cos)
+
+    HDFS-1844. Move "fs -help" shell command tests from HDFS to COMMOM; see
+    also HADOOP-7230.  (Daryn Sharp via szetszwo)
+
   OPTIMIZATIONS
 
     HDFS-1458. Improve checkpoint performance by avoiding unnecessary image
@@ -141,6 +173,26 @@ Trunk (unreleased changes)
 
     HDFS-1770. TestFiRename fails due to invalid block size. (eli)
 
+    HDFS-1797. Fix new findbugs warning introduced by HDFS-1120 (todd)
+
+    HDFS-1611. Fix up some log messages in DFSClient and MBean registration
+    (Uma Maheswara Rao G via todd)
+
+    HDFS-1543. Reduce dev. cycle time by moving system testing artifacts from
+    default build and push to maven for HDFS (Luke Lu via cos)
+
+    HDFS-1818. TestHDFSCLI is failing on trunk after HADOOP-7202.
+    (Aaron T. Myers via todd)
+
+    HDFS-1828. TestBlocksWithNotEnoughRacks intermittently fails assert.
+    (Matt Foley via eli)
+
+    HDFS-1824. delay instantiation of file system object until it is
+     needed (linked to HADOOP-7207) (boryas)
+
+    HDFS-1831. Fix append bug in FileContext and implement CreateFlag
+    check (related to HADOOP-7223). (suresh)
+
 Release 0.22.0 - Unreleased
 
   NEW FEATURES
@@ -594,6 +646,17 @@ Release 0.22.0 - Unreleased
 
     HDFS-1625. Ignore disk space values in TestDataNodeMXBean.  (szetszwo)
 
+    HDFS-1781. Fix the path for jsvc in bin/hdfs.  (John George via szetszwo)
+
+    HDFS-1782. Fix an NPE in FSNamesystem.startFileInternal(..).
+    (John George via szetszwo)
+
+    HDFS-1821. Fix username resolution in NameNode.createSymlink(..) and
+    FSDirectory.addSymlink(..).  (John George via szetszwo)
+
+    HDFS-1806. TestBlockReport.blockReport_08() and _09() are timing-dependent
+    and likely to fail on fast servers. (Matt Foley via eli)
+
 Release 0.21.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/hdfs/branches/HDFS-1073/bin/hdfs
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/bin/hdfs?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/bin/hdfs (original)
+++ hadoop/hdfs/branches/HDFS-1073/bin/hdfs Wed Apr 20 02:28:19 2011
@@ -140,7 +140,7 @@ if [ "$starting_secure_dn" = "true" ]; t
    HADOOP_SECURE_DN_PID="$HADOOP_PID_DIR/hadoop_secure_dn.pid"
   fi
 
-  exec "$HADOOP_HOME/bin/jsvc" \
+  exec "$HADOOP_HDFS_HOME/bin/jsvc" \
            -Dproc_$COMMAND -outfile "$HADOOP_LOG_DIR/jsvc.out" \
            -errfile "$HADOOP_LOG_DIR/jsvc.err" \
            -pidfile "$HADOOP_SECURE_DN_PID" \

Modified: hadoop/hdfs/branches/HDFS-1073/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/build.xml?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/build.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/build.xml Wed Apr 20 02:28:19 2011
@@ -1546,7 +1546,7 @@
          uri="urn:maven-artifact-ant" classpathref="mvn-ant-task.classpath"/>
   </target>   
 
-  <target name="mvn-install" depends="mvn-taskdef,jar,jar-test,set-version,-mvn-system-install"
+  <target name="mvn-install" depends="mvn-taskdef,jar,jar-test,set-version"
      description="To install hadoop hdfs and test jars to local filesystem's m2 cache">
      <artifact:pom file="${hadoop-hdfs.pom}" id="hadoop.hdfs"/>
      <artifact:pom file="${hadoop-hdfs-test.pom}" id="hadoop.hdfs.test"/>
@@ -1559,6 +1559,9 @@
         <attach file="${hadoop-hdfs-test-sources.jar}" classifier="sources" />
      </artifact:install>
    </target>
+
+   <target name="mvn-si-install" depends="mvn-install,-mvn-system-install"
+           description="Install system integration tests jars as well"/>
   
    <target name="mvn-deploy" depends="mvn-taskdef, jar, jar-test,
      jar-system, jar-test-system, set-version, signanddeploy, simpledeploy"

Propchange: hadoop/hdfs/branches/HDFS-1073/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -2,3 +2,4 @@
 /hadoop/core/trunk/build.xml:779102
 /hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
 /hadoop/hdfs/branches/branch-0.21/build.xml:820487
+/hadoop/hdfs/trunk/build.xml:1086482-1095244

Propchange: hadoop/hdfs/branches/HDFS-1073/src/c++/libhdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -1,2 +1,3 @@
 /hadoop/core/branches/branch-0.19/mapred/src/c++/libhdfs:713112
 /hadoop/core/trunk/src/c++/libhdfs:776175-784663
+/hadoop/hdfs/trunk/src/c++/libhdfs:1086482-1095244

Propchange: hadoop/hdfs/branches/HDFS-1073/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -2,3 +2,4 @@
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
 /hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487
+/hadoop/hdfs/trunk/src/contrib/hdfsproxy:1086482-1095244

Propchange: hadoop/hdfs/branches/HDFS-1073/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Apr 20 02:28:19 2011
@@ -2,3 +2,4 @@
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
 /hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
 /hadoop/hdfs/branches/branch-0.21/src/java:820487
+/hadoop/hdfs/trunk/src/java:1086482-1095244

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/hdfs-default.xml Wed Apr 20 02:28:19 2011
@@ -318,6 +318,42 @@ creations/deletions), or "all".</descrip
 </property>
 
 <property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.enable</name>
+  <value>ture</value>
+  <description>
+    If there is a datanode/network failure in the write pipeline,
+    DFSClient will try to remove the failed datanode from the pipeline
+    and then continue writing with the remaining datanodes. As a result,
+    the number of datanodes in the pipeline is decreased.  The feature is
+    to add new datanodes to the pipeline.
+
+    This is a site-wise property to enable/disable the feature.
+
+    See also dfs.client.block.write.replace-datanode-on-failure.policy
+  </description>
+</property>
+
+<property>
+  <name>dfs.client.block.write.replace-datanode-on-failure.policy</name>
+  <value>DEFAULT</value>
+  <description>
+    This property is used only if the value of
+    dfs.client.block.write.replace-datanode-on-failure.enable is true.
+
+    ALWAYS: always add a new datanode when an existing datanode is removed.
+    
+    NEVER: never add a new datanode.
+
+    DEFAULT: 
+      Let r be the replication number.
+      Let n be the number of existing datanodes.
+      Add a new datanode only if r is greater than or equal to 3 and either
+      (1) floor(r/2) is greater than or equal to n; or
+      (2) r is greater than n and the block is hflushed/appended.
+  </description>
+</property>
+
+<property>
   <name>dfs.blockreport.intervalMsec</name>
   <value>21600000</value>
   <description>Determines block reporting interval in milliseconds.</description>

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/fs/Hdfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/fs/Hdfs.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/fs/Hdfs.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/fs/Hdfs.java Wed Apr 20 02:28:19 2011
@@ -25,6 +25,8 @@ import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
+import java.util.NoSuchElementException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -37,8 +39,13 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.util.Progressable;
 
 @InterfaceAudience.Private
@@ -249,7 +256,7 @@ public class Hdfs extends AbstractFileSy
       if (hasNext()) {
         return thisListing.getPartialListing()[i++];
       }
-      throw new java.util.NoSuchElementException("No more entry in " + src);
+      throw new NoSuchElementException("No more entry in " + src);
     }
   }
 
@@ -384,4 +391,43 @@ public class Hdfs extends AbstractFileSy
   public Path getLinkTarget(Path p) throws IOException { 
     return new Path(dfs.getLinkTarget(getUriPath(p)));
   }
+  
+  @Override //AbstractFileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    Token<DelegationTokenIdentifier> result = dfs
+        .getDelegationToken(renewer == null ? null : new Text(renewer));
+    result.setService(new Text(this.getCanonicalServiceName()));
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    tokenList.add(result);
+    return tokenList;
+  }
+
+  /**
+   * Renew an existing delegation token.
+   * 
+   * @param token delegation token obtained earlier
+   * @return the new expiration time
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public long renewDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    return dfs.renewDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
+
+  /**
+   * Cancel an existing delegation token.
+   * 
+   * @param token delegation token
+   * @throws InvalidToken
+   * @throws IOException
+   */
+  @SuppressWarnings("unchecked")
+  public void cancelDelegationToken(
+      Token<? extends AbstractDelegationTokenIdentifier> token)
+      throws InvalidToken, IOException {
+    dfs.cancelDelegationToken((Token<DelegationTokenIdentifier>) token);
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Apr 20 02:28:19 2011
@@ -96,7 +96,6 @@ import org.apache.hadoop.ipc.Client;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@@ -136,6 +135,7 @@ public class DFSClient implements FSCons
   SocketFactory socketFactory;
   int socketTimeout;
   final int writePacketSize;
+  final DataTransferProtocol.ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
   final FileSystem.Statistics stats;
   final int hdfsTimeout;    // timeout value for a DFS operation.
 
@@ -193,7 +193,7 @@ public class DFSClient implements FSCons
     InetSocketAddress addr = NetUtils.createSocketAddr(
       datanodeid.getHost() + ":" + datanodeid.getIpcPort());
     if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
-      ClientDatanodeProtocol.LOG.info("ClientDatanodeProtocol addr=" + addr);
+      ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr);
     }
     UserGroupInformation ticket = UserGroupInformation
         .createRemoteUser(locatedBlock.getBlock().toString());
@@ -249,6 +249,8 @@ public class DFSClient implements FSCons
     this.writePacketSize = 
       conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
                   DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
+    this.dtpReplaceDatanodeOnFailure = DataTransferProtocol.ReplaceDatanodeOnFailure.get(conf);
+
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
 
@@ -571,8 +573,9 @@ public class DFSClient implements FSCons
                              int buffersize)
       throws IOException {
     return create(src, FsPermission.getDefault(),
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE), 
-        replication, blockSize, progress, buffersize);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        buffersize);
   }
 
   /**
@@ -638,9 +641,29 @@ public class DFSClient implements FSCons
   }
   
   /**
+   * Append to an existing file if {@link CreateFlag#APPEND} is present
+   */
+  private OutputStream primitiveAppend(String src, EnumSet<CreateFlag> flag,
+      int buffersize, Progressable progress) throws IOException {
+    if (flag.contains(CreateFlag.APPEND)) {
+      HdfsFileStatus stat = getFileInfo(src);
+      if (stat == null) { // No file to append to
+        // New file needs to be created if create option is present
+        if (!flag.contains(CreateFlag.CREATE)) {
+          throw new FileNotFoundException("failed to append to non-existent file "
+              + src + " on client " + clientName);
+        }
+        return null;
+      }
+      return callAppend(stat, src, buffersize, progress);
+    }
+    return null;
+  }
+  
+  /**
    * Same as {{@link #create(String, FsPermission, EnumSet, short, long,
    *  Progressable, int)} except that the permission
-   *   is absolute (ie has already been masked with umask.
+   *  is absolute (ie has already been masked with umask.
    */
   public OutputStream primitiveCreate(String src, 
                              FsPermission absPermission,
@@ -653,9 +676,13 @@ public class DFSClient implements FSCons
                              int bytesPerChecksum)
       throws IOException, UnresolvedLinkException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(this, src, absPermission,
-        flag, createParent, replication, blockSize, progress, buffersize,
-        bytesPerChecksum);
+    CreateFlag.validate(flag);
+    OutputStream result = primitiveAppend(src, flag, buffersize, progress);
+    if (result == null) {
+      result = new DFSOutputStream(this, src, absPermission,
+          flag, createParent, replication, blockSize, progress, buffersize,
+          bytesPerChecksum);
+    }
     leasechecker.put(src, result);
     return result;
   }
@@ -697,23 +724,11 @@ public class DFSClient implements FSCons
     }
   }
 
-  /**
-   * Append to an existing HDFS file.  
-   * 
-   * @param src file name
-   * @param buffersize buffer size
-   * @param progress for reporting write-progress
-   * @return an output stream for writing into the file
-   * 
-   * @see ClientProtocol#append(String, String) 
-   */
-  OutputStream append(String src, int buffersize, Progressable progress)
-      throws IOException {
-    checkOpen();
-    HdfsFileStatus stat = null;
+  /** Method to get stream returned by append call */
+  private OutputStream callAppend(HdfsFileStatus stat, String src,
+      int buffersize, Progressable progress) throws IOException {
     LocatedBlock lastBlock = null;
     try {
-      stat = getFileInfo(src);
       lastBlock = namenode.append(src, clientName);
     } catch(RemoteException re) {
       throw re.unwrapRemoteException(AccessControlException.class,
@@ -723,9 +738,26 @@ public class DFSClient implements FSCons
                                      UnsupportedOperationException.class,
                                      UnresolvedPathException.class);
     }
-    OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
+    return new DFSOutputStream(this, src, buffersize, progress,
         lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                                      DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
+  }
+  
+  /**
+   * Append to an existing HDFS file.  
+   * 
+   * @param src file name
+   * @param buffersize buffer size
+   * @param progress for reporting write-progress
+   * @return an output stream for writing into the file
+   * 
+   * @see ClientProtocol#append(String, String) 
+   */
+  OutputStream append(String src, int buffersize, Progressable progress)
+      throws IOException {
+    checkOpen();
+    HdfsFileStatus stat = getFileInfo(src);
+    OutputStream result = callAppend(stat, src, buffersize, progress);
     leasechecker.put(src, result);
     return result;
   }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Wed Apr 20 02:28:19 2011
@@ -40,6 +40,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
+  public static final boolean DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT = true;
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY = "dfs.client.block.write.replace-datanode-on-failure.policy";
+  public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
   
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_KEY = "dfs.namenode.backup.address";
   public static final String  DFS_NAMENODE_BACKUP_ADDRESS_DEFAULT = "localhost:50100";

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Apr 20 02:28:19 2011
@@ -24,15 +24,17 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
-import java.io.InterruptedIOException;
 import java.io.IOException;
+import java.io.InterruptedIOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.LinkedList;
+import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.conf.Configuration;
@@ -47,21 +49,22 @@ import org.apache.hadoop.hdfs.protocol.B
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.UnresolvedPathException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PacketHeader;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
 import org.apache.hadoop.io.EnumSetWritable;
-import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.AccessControlException;
@@ -71,8 +74,6 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -97,9 +98,6 @@ import org.apache.hadoop.hdfs.security.t
  * starts sending packets from the dataQueue.
 ****************************************************************/
 class DFSOutputStream extends FSOutputSummer implements Syncable {
-  /**
-   * 
-   */
   private final DFSClient dfsClient;
   private Configuration conf;
   private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
@@ -296,10 +294,18 @@ class DFSOutputStream extends FSOutputSu
     private BlockConstructionStage stage;  // block construction stage
     private long bytesSent = 0; // number of bytes that've been sent
 
+    /** Nodes have been used in the pipeline before and have failed. */
+    private final List<DatanodeInfo> failed = new ArrayList<DatanodeInfo>();
+    /** Has the current block been hflushed? */
+    private boolean isHflushed = false;
+    /** Append on an existing block? */
+    private final boolean isAppend;
+
     /**
      * Default construction for file create
      */
     private DataStreamer() {
+      isAppend = false;
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -312,6 +318,7 @@ class DFSOutputStream extends FSOutputSu
      */
     private DataStreamer(LocatedBlock lastBlock, HdfsFileStatus stat,
         int bytesPerChecksum) throws IOException {
+      isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
       block = lastBlock.getBlock();
       bytesSent = block.getNumBytes();
@@ -751,6 +758,105 @@ class DFSOutputStream extends FSOutputSu
       return doSleep;
     }
 
+    private void setHflush() {
+      isHflushed = true;
+    }
+
+    private int findNewDatanode(final DatanodeInfo[] original
+        ) throws IOException {
+      if (nodes.length != original.length + 1) {
+        throw new IOException("Failed to add a datanode:"
+            + " nodes.length != original.length + 1, nodes="
+            + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+      }
+      for(int i = 0; i < nodes.length; i++) {
+        int j = 0;
+        for(; j < original.length && !nodes[i].equals(original[j]); j++);
+        if (j == original.length) {
+          return i;
+        }
+      }
+      throw new IOException("Failed: new datanode not found: nodes="
+          + Arrays.asList(nodes) + ", original=" + Arrays.asList(original));
+    }
+
+    private void addDatanode2ExistingPipeline() throws IOException {
+      if (DataTransferProtocol.LOG.isDebugEnabled()) {
+        DataTransferProtocol.LOG.debug("lastAckedSeqno = " + lastAckedSeqno);
+      }
+      /*
+       * Is data transfer necessary?  We have the following cases.
+       * 
+       * Case 1: Failure in Pipeline Setup
+       * - Append
+       *    + Transfer the stored replica, which may be a RBW or a finalized.
+       * - Create
+       *    + If no data, then no transfer is required.
+       *    + If there are data written, transfer RBW. This case may happens 
+       *      when there are streaming failure earlier in this pipeline.
+       *
+       * Case 2: Failure in Streaming
+       * - Append/Create:
+       *    + transfer RBW
+       * 
+       * Case 3: Failure in Close
+       * - Append/Create:
+       *    + no transfer, let NameNode replicates the block.
+       */
+      if (!isAppend && lastAckedSeqno < 0
+          && stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
+        //no data have been written
+        return;
+      } else if (stage == BlockConstructionStage.PIPELINE_CLOSE
+          || stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
+        //pipeline is closing
+        return;
+      }
+
+      //get a new datanode
+      final DatanodeInfo[] original = nodes;
+      final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
+          src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+          1, dfsClient.clientName);
+      nodes = lb.getLocations();
+
+      //find the new datanode
+      final int d = findNewDatanode(original);
+
+      //transfer replica
+      final DatanodeInfo src = d == 0? nodes[1]: nodes[d - 1];
+      final DatanodeInfo[] targets = {nodes[d]};
+      transfer(src, targets, lb.getBlockToken());
+    }
+
+    private void transfer(final DatanodeInfo src, final DatanodeInfo[] targets,
+        final Token<BlockTokenIdentifier> blockToken) throws IOException {
+      //transfer replica to the new datanode
+      Socket sock = null;
+      DataOutputStream out = null;
+      DataInputStream in = null;
+      try {
+        sock = createSocketForPipeline(src, 2, dfsClient);
+        final long writeTimeout = dfsClient.getDatanodeWriteTimeout(2);
+        out = new DataOutputStream(new BufferedOutputStream(
+            NetUtils.getOutputStream(sock, writeTimeout),
+            DataNode.SMALL_BUFFER_SIZE));
+
+        //send the TRANSFER_BLOCK request
+        DataTransferProtocol.Sender.opTransferBlock(out, block,
+            dfsClient.clientName, targets, blockToken);
+
+        //ack
+        in = new DataInputStream(NetUtils.getInputStream(sock));
+        if (SUCCESS != DataTransferProtocol.Status.read(in)) {
+          throw new IOException("Failed to add a datanode");
+        }
+      } finally {
+        IOUtils.closeStream(in);
+        IOUtils.closeStream(out);
+        IOUtils.closeSocket(sock);
+      }
+    }
 
     /**
      * Open a DataOutputStream to a DataNode pipeline so that 
@@ -794,6 +900,8 @@ class DFSOutputStream extends FSOutputSu
           DFSClient.LOG.warn("Error Recovery for block " + block +
               " in pipeline " + pipelineMsg + 
               ": bad datanode " + nodes[errorIndex].getName());
+          failed.add(nodes[errorIndex]);
+
           DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
@@ -804,6 +912,12 @@ class DFSOutputStream extends FSOutputSu
           errorIndex = -1;
         }
 
+        // Check if replace-datanode policy is satisfied.
+        if (dfsClient.dtpReplaceDatanodeOnFailure.satisfy(blockReplication,
+            nodes, isAppend, isHflushed)) {
+          addDatanode2ExistingPipeline();
+        }
+
         // get a new generation stamp and an access token
         LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
@@ -889,18 +1003,7 @@ class DFSOutputStream extends FSOutputSu
 
       boolean result = false;
       try {
-        if(DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Connecting to " + nodes[0].getName());
-        }
-        InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
-        s = dfsClient.socketFactory.createSocket();
-        int timeoutValue = dfsClient.getDatanodeReadTimeout(nodes.length);
-        NetUtils.connect(s, target, timeoutValue);
-        s.setSoTimeout(timeoutValue);
-        s.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
-        if(DFSClient.LOG.isDebugEnabled()) {
-          DFSClient.LOG.debug("Send buf size " + s.getSendBufferSize());
-        }
+        s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
         long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
 
         //
@@ -1036,6 +1139,30 @@ class DFSOutputStream extends FSOutputSu
     }
   }
 
+  /**
+   * Create a socket for a write pipeline
+   * @param first the first datanode 
+   * @param length the pipeline length
+   * @param client
+   * @return the socket connected to the first datanode
+   */
+  static Socket createSocketForPipeline(final DatanodeInfo first,
+      final int length, final DFSClient client) throws IOException {
+    if(DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Connecting to datanode " + first.getName());
+    }
+    final InetSocketAddress isa = NetUtils.createSocketAddr(first.getName());
+    final Socket sock = client.socketFactory.createSocket();
+    final int timeout = client.getDatanodeReadTimeout(length);
+    NetUtils.connect(sock, isa, timeout);
+    sock.setSoTimeout(timeout);
+    sock.setSendBufferSize(DFSClient.DEFAULT_DATA_SOCKET_SIZE);
+    if(DFSClient.LOG.isDebugEnabled()) {
+      DFSClient.LOG.debug("Send buf size " + sock.getSendBufferSize());
+    }
+    return sock;
+  }
+
   private void isClosed() throws IOException {
     if (closed) {
       IOException e = lastException;
@@ -1352,6 +1479,12 @@ class DFSOutputStream extends FSOutputSu
           throw ioe;
         }
       }
+
+      synchronized(this) {
+        if (streamer != null) {
+          streamer.setHflush();
+        }
+      }
     } catch (InterruptedIOException interrupt) {
       // This kind of error doesn't mean that the stream itself is broken - just the
       // flushing thread got interrupted. So, we shouldn't close down the writer,
@@ -1566,7 +1699,7 @@ class DFSOutputStream extends FSOutputSu
   /**
    * Returns the access token currently used by streamer, for testing only
    */
-  Token<BlockTokenIdentifier> getBlockToken() {
+  synchronized Token<BlockTokenIdentifier> getBlockToken() {
     return streamer.getBlockToken();
   }
 

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Apr 20 02:28:19 2011
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.ArrayList;
 import java.util.EnumSet;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -242,9 +243,9 @@ public class DistributedFileSystem exten
     Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
     return new FSDataOutputStream(dfs.create(getPathName(f), permission,
-        overwrite ? EnumSet.of(CreateFlag.OVERWRITE) : EnumSet.of(CreateFlag.CREATE),
-        replication, blockSize, progress, bufferSize),
-        statistics);
+        overwrite ? EnumSet.of(CreateFlag.CREATE, CreateFlag.OVERWRITE)
+            : EnumSet.of(CreateFlag.CREATE), replication, blockSize, progress,
+        bufferSize), statistics);
   }
   
   @SuppressWarnings("deprecation")
@@ -266,6 +267,9 @@ public class DistributedFileSystem exten
       EnumSet<CreateFlag> flag, int bufferSize, short replication,
       long blockSize, Progressable progress) throws IOException {
     statistics.incrementWriteOps(1);
+    if (flag.contains(CreateFlag.OVERWRITE)) {
+      flag.add(CreateFlag.CREATE);
+    }
     return new FSDataOutputStream(dfs.create(getPathName(f), permission, flag,
         false, replication, blockSize, progress, bufferSize), statistics);
   }
@@ -810,6 +814,14 @@ public class DistributedFileSystem exten
       throws IOException {
     return dfs.getDelegationToken(renewer);
   }
+  
+  @Override // FileSystem
+  public List<Token<?>> getDelegationTokens(String renewer) throws IOException {
+    List<Token<?>> tokenList = new ArrayList<Token<?>>();
+    Token<DelegationTokenIdentifier> token = this.getDelegationToken(renewer);
+    tokenList.add(token);
+    return tokenList;
+  }
 
   /**
    * Renew an existing delegation token.

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Apr 20 02:28:19 2011
@@ -67,9 +67,9 @@ public interface ClientProtocol extends 
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 65: Add listCorruptFileBlocks to ClientProtocol
+   * 66: Add getAdditionalDatanode(..)
    */
-  public static final long versionID = 65L;
+  public static final long versionID = 66L;
   
   ///////////////////////////////////////
   // File contents
@@ -298,6 +298,30 @@ public interface ClientProtocol extends 
       NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
       IOException;
 
+  /** 
+   * Get a datanode for an existing pipeline.
+   * 
+   * @param src the file being written
+   * @param blk the block being written
+   * @param existings the existing nodes in the pipeline
+   * @param excludes the excluded nodes
+   * @param numAdditionalNodes number of additional datanodes
+   * @param clientName the name of the client
+   * 
+   * @return the located block.
+   * 
+   * @throws AccessControlException If access is denied
+   * @throws FileNotFoundException If file <code>src</code> is not found
+   * @throws SafeModeException create not allowed in safemode
+   * @throws UnresolvedLinkException If <code>src</code> contains a symlink
+   * @throws IOException If an I/O error occurred
+   */
+  public LocatedBlock getAdditionalDatanode(final String src, final Block blk,
+      final DatanodeInfo[] existings, final DatanodeInfo[] excludes,
+      final int numAdditionalNodes, final String clientName
+      ) throws AccessControlException, FileNotFoundException,
+          SafeModeException, UnresolvedLinkException, IOException;
+
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Apr 20 02:28:19 2011
@@ -25,8 +25,13 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -39,17 +44,17 @@ import org.apache.hadoop.security.token.
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
 public interface DataTransferProtocol {
-  
+  public static final Log LOG = LogFactory.getLog(DataTransferProtocol.class);
   
   /** Version for data transfers between clients and datanodes
    * This should change when serialization of DatanodeInfo, not just
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 20:
-   *    Added TRANSFER_RBW
+   * Version 22:
+   *    Add a new feature to replace datanode on failure.
    */
-  public static final int DATA_TRANSFER_VERSION = 20;
+  public static final int DATA_TRANSFER_VERSION = 22;
 
   /** Operation */
   public enum Op {
@@ -58,7 +63,8 @@ public interface DataTransferProtocol {
     READ_METADATA((byte)82),
     REPLACE_BLOCK((byte)83),
     COPY_BLOCK((byte)84),
-    BLOCK_CHECKSUM((byte)85);
+    BLOCK_CHECKSUM((byte)85),
+    TRANSFER_BLOCK((byte)86);
 
     /** The code for this operation. */
     public final byte code;
@@ -144,8 +150,10 @@ public interface DataTransferProtocol {
     PIPELINE_CLOSE_RECOVERY,
     // pipeline set up for block creation
     PIPELINE_SETUP_CREATE,
-    // similar to replication but transferring rbw instead of finalized
-    TRANSFER_RBW;
+    // transfer RBW for adding datanodes
+    TRANSFER_RBW,
+    // transfer Finalized for adding datanodes
+    TRANSFER_FINALIZED;
     
     final static private byte RECOVERY_BIT = (byte)1;
     
@@ -264,14 +272,23 @@ public interface DataTransferProtocol {
       if (src != null) {
         src.write(out);
       }
-      out.writeInt(targets.length - 1);
-      for (int i = 1; i < targets.length; i++) {
-        targets[i].write(out);
-      }
+      write(out, 1, targets);
+      blockToken.write(out);
+    }
 
+    /** Send {@link Op#TRANSFER_BLOCK} */
+    public static void opTransferBlock(DataOutputStream out, Block blk,
+        String client, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken) throws IOException {
+      op(out, Op.TRANSFER_BLOCK);
+
+      blk.writeId(out);
+      Text.writeString(out, client);
+      write(out, 0, targets);
       blockToken.write(out);
+      out.flush();
     }
-    
+
     /** Send OP_REPLACE_BLOCK */
     public static void opReplaceBlock(DataOutputStream out,
         Block blk, String storageId, DatanodeInfo src,
@@ -306,6 +323,16 @@ public interface DataTransferProtocol {
       blockToken.write(out);
       out.flush();
     }
+
+    /** Write an array of {@link DatanodeInfo} */
+    private static void write(final DataOutputStream out,
+        final int start, 
+        final DatanodeInfo[] datanodeinfos) throws IOException {
+      out.writeInt(datanodeinfos.length - start);
+      for (int i = start; i < datanodeinfos.length; i++) {
+        datanodeinfos[i].write(out);
+      }
+    }
   }
 
   /** Receiver */
@@ -340,6 +367,9 @@ public interface DataTransferProtocol {
       case BLOCK_CHECKSUM:
         opBlockChecksum(in);
         break;
+      case TRANSFER_BLOCK:
+        opTransferBlock(in);
+        break;
       default:
         throw new IOException("Unknown op " + op + " in data stream");
       }
@@ -377,14 +407,7 @@ public interface DataTransferProtocol {
       final String client = Text.readString(in); // working on behalf of this client
       final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
 
-      final int nTargets = in.readInt();
-      if (nTargets < 0) {
-        throw new IOException("Mislabelled incoming datastream.");
-      }
-      final DatanodeInfo targets[] = new DatanodeInfo[nTargets];
-      for (int i = 0; i < targets.length; i++) {
-        targets[i] = DatanodeInfo.read(in);
-      }
+      final DatanodeInfo targets[] = readDatanodeInfos(in);
       final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
 
       opWriteBlock(in, blk, pipelineSize, stage,
@@ -401,6 +424,27 @@ public interface DataTransferProtocol {
         DatanodeInfo[] targets, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
+    /** Receive {@link Op#TRANSFER_BLOCK} */
+    private void opTransferBlock(DataInputStream in) throws IOException {
+      final Block blk = new Block();
+      blk.readId(in);
+      final String client = Text.readString(in);
+      final DatanodeInfo targets[] = readDatanodeInfos(in);
+      final Token<BlockTokenIdentifier> blockToken = readBlockToken(in);
+
+      opTransferBlock(in, blk, client, targets, blockToken);
+    }
+
+    /**
+     * Abstract {@link Op#TRANSFER_BLOCK} method.
+     * For {@link BlockConstructionStage#TRANSFER_RBW}
+     * or {@link BlockConstructionStage#TRANSFER_FINALIZED}.
+     */
+    protected abstract void opTransferBlock(DataInputStream in, Block blk,
+        String client, DatanodeInfo[] targets,
+        Token<BlockTokenIdentifier> blockToken)
+        throws IOException;
+
     /** Receive OP_REPLACE_BLOCK */
     private void opReplaceBlock(DataInputStream in) throws IOException {
       final Block blk = new Block();
@@ -454,6 +498,21 @@ public interface DataTransferProtocol {
         Block blk, Token<BlockTokenIdentifier> blockToken)
         throws IOException;
 
+    /** Read an array of {@link DatanodeInfo} */
+    private static DatanodeInfo[] readDatanodeInfos(final DataInputStream in
+        ) throws IOException {
+      final int n = in.readInt();
+      if (n < 0) {
+        throw new IOException("Mislabelled incoming datastream: "
+            + n + " = n < 0");
+      }
+      final DatanodeInfo[] datanodeinfos= new DatanodeInfo[n];
+      for (int i = 0; i < datanodeinfos.length; i++) {
+        datanodeinfos[i] = DatanodeInfo.read(in);
+      }
+      return datanodeinfos;
+    }
+
     /** Read an AccessToken */
     static private Token<BlockTokenIdentifier> readBlockToken(DataInputStream in
         ) throws IOException {
@@ -695,4 +754,93 @@ public interface DataTransferProtocol {
     }
   }
 
+  /**
+   * The setting of replace-datanode-on-failure feature.
+   */
+  public enum ReplaceDatanodeOnFailure {
+    /** The feature is disabled in the entire site. */
+    DISABLE,
+    /** Never add a new datanode. */
+    NEVER,
+    /**
+     * DEFAULT policy:
+     *   Let r be the replication number.
+     *   Let n be the number of existing datanodes.
+     *   Add a new datanode only if r >= 3 and either
+     *   (1) floor(r/2) >= n; or
+     *   (2) r > n and the block is hflushed/appended.
+     */
+    DEFAULT,
+    /** Always add a new datanode when an existing datanode is removed. */
+    ALWAYS;
+
+    /** Check if the feature is enabled. */
+    public void checkEnabled() {
+      if (this == DISABLE) {
+        throw new UnsupportedOperationException(
+            "This feature is disabled.  Please refer to "
+            + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY
+            + " configuration property.");
+      }
+    }
+
+    /** Is the policy satisfied? */
+    public boolean satisfy(
+        final short replication, final DatanodeInfo[] existings,
+        final boolean isAppend, final boolean isHflushed) {
+      final int n = existings == null? 0: existings.length;
+      if (n == 0 || n >= replication) {
+        //don't need to add datanode for any policy.
+        return false;
+      } else if (this == DISABLE || this == NEVER) {
+        return false;
+      } else if (this == ALWAYS) {
+        return true;
+      } else {
+        //DEFAULT
+        if (replication < 3) {
+          return false;
+        } else {
+          if (n <= (replication/2)) {
+            return true;
+          } else {
+            return isAppend || isHflushed;
+          }
+        }
+      }
+    }
+
+    /** Get the setting from configuration. */
+    public static ReplaceDatanodeOnFailure get(final Configuration conf) {
+      final boolean enabled = conf.getBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_DEFAULT);
+      if (!enabled) {
+        return DISABLE;
+      }
+
+      final String policy = conf.get(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT);
+      for(int i = 1; i < values().length; i++) {
+        final ReplaceDatanodeOnFailure rdof = values()[i];
+        if (rdof.name().equalsIgnoreCase(policy)) {
+          return rdof;
+        }
+      }
+      throw new HadoopIllegalArgumentException("Illegal configuration value for "
+          + DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY
+          + ": " + policy);
+    }
+
+    /** Write the setting to configuration. */
+    public void write(final Configuration conf) {
+      conf.setBoolean(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY,
+          this != DISABLE);
+      conf.set(
+          DFSConfigKeys.DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_KEY,
+          name());
+    }
+  }
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Apr 20 02:28:19 2011
@@ -88,7 +88,7 @@ public interface FSConstants {
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -28;
+  public static final int LAYOUT_VERSION = -31;
   // Current version: 
-  // -28: add persistent transaction IDs
+  // -31: add persistent transaction ids
 }

Modified: hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1095253&r1=1095252&r2=1095253&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/branches/HDFS-1073/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Wed Apr 20 02:28:19 2011
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hdfs.pro
 import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
 
 import java.io.BufferedOutputStream;
+import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
@@ -54,7 +55,7 @@ import org.apache.hadoop.util.StringUtil
  * may copies it to another site. If a throttler is provided,
  * streaming throttling is also supported.
  **/
-class BlockReceiver implements java.io.Closeable, FSConstants {
+class BlockReceiver implements Closeable, FSConstants {
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
@@ -83,14 +84,15 @@ class BlockReceiver implements java.io.C
   /** The client name.  It is empty if a datanode is the client */
   private final String clientname;
   private final boolean isClient; 
-  private final boolean isDatanode; 
+  private final boolean isDatanode;
 
   /** the block to receive */
   private final Block block; 
   /** the replica to write */
   private final ReplicaInPipelineInterface replicaInfo;
   /** pipeline stage */
-  private final BlockConstructionStage initialStage;
+  private final BlockConstructionStage stage;
+  private final boolean isTransfer;
 
   BlockReceiver(final Block block, final DataInputStream in,
       final String inAddr, final String myAddr,
@@ -112,8 +114,19 @@ class BlockReceiver implements java.io.C
 
       //for datanode, we have
       //1: clientName.length() == 0, and
-      //2: stage == null, PIPELINE_SETUP_CREATE or TRANSFER_RBW
-      this.initialStage = stage;
+      //2: stage == null or PIPELINE_SETUP_CREATE
+      this.stage = stage;
+      this.isTransfer = stage == BlockConstructionStage.TRANSFER_RBW
+          || stage == BlockConstructionStage.TRANSFER_FINALIZED;
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(getClass().getSimpleName() + ": " + block
+            + "\n  isClient  =" + isClient + ", clientname=" + clientname
+            + "\n  isDatanode=" + isDatanode + ", srcDataNode=" + srcDataNode
+            + "\n  inAddr=" + inAddr + ", myAddr=" + myAddr
+            );
+      }
+
       //
       // Open local disk out
       //
@@ -143,6 +156,11 @@ class BlockReceiver implements java.io.C
           }
           block.setGenerationStamp(newGs);
           break;
+        case TRANSFER_RBW:
+        case TRANSFER_FINALIZED:
+          // this is a transfer destination
+          replicaInfo = datanode.data.createTemporary(block);
+          break;
         default: throw new IOException("Unsupported stage " + stage + 
               " while receiving block " + block + " from " + inAddr);
         }
@@ -152,7 +170,7 @@ class BlockReceiver implements java.io.C
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
       
-      final boolean isCreate = isDatanode 
+      final boolean isCreate = isDatanode || isTransfer 
           || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
       streams = replicaInfo.createStreams(isCreate,
           this.bytesPerChecksum, this.checksumSize);
@@ -629,7 +647,7 @@ class BlockReceiver implements java.io.C
       DataInputStream mirrIn,   // input from next datanode
       DataOutputStream replyOut,  // output to previous datanode
       String mirrAddr, DataTransferThrottler throttlerArg,
-      int numTargets) throws IOException {
+      DatanodeInfo[] downstreams) throws IOException {
 
       boolean responderClosed = false;
       mirrorOut = mirrOut;
@@ -637,12 +655,10 @@ class BlockReceiver implements java.io.C
       throttler = throttlerArg;
 
     try {
-      if (isClient) {
+      if (isClient && !isTransfer) {
         responder = new Daemon(datanode.threadGroup, 
-                               new PacketResponder(this, block, mirrIn, 
-                                                   replyOut, numTargets,
-                                                   Thread.currentThread()));
-        responder.start(); // start thread to processes reponses
+            new PacketResponder(replyOut, mirrIn, downstreams));
+        responder.start(); // start thread to processes responses
       }
 
       /* 
@@ -658,24 +674,27 @@ class BlockReceiver implements java.io.C
         responderClosed = true;
       }
 
-      // if this write is for a replication request (and not
-      // from a client), then finalize block. For client-writes, 
-      // the block is finalized in the PacketResponder.
-      if (isDatanode) {
+      // If this write is for a replication or transfer-RBW/Finalized,
+      // then finalize block or convert temporary to RBW.
+      // For client-writes, the block is finalized in the PacketResponder.
+      if (isDatanode || isTransfer) {
         // close the block/crc files
         close();
+        block.setNumBytes(replicaInfo.getNumBytes());
 
-        if (initialStage != BlockConstructionStage.TRANSFER_RBW) {
+        if (stage == BlockConstructionStage.TRANSFER_RBW) {
+          // for TRANSFER_RBW, convert temporary to RBW
+          datanode.data.convertTemporaryToRbw(block);
+        } else {
+          // for isDatnode or TRANSFER_FINALIZED
           // Finalize the block. Does this fsync()?
-          block.setNumBytes(replicaInfo.getNumBytes());
           datanode.data.finalizeBlock(block);
         }
         datanode.myMetrics.blocksWritten.inc();
       }
 
     } catch (IOException ioe) {
-      LOG.info("Exception in receiveBlock for block " + block + 
-               " " + ioe);
+      LOG.info("Exception in receiveBlock for " + block, ioe);
       throw ioe;
     } finally {
       if (!responderClosed) { // Abnormal termination of the flow above
@@ -700,8 +719,7 @@ class BlockReceiver implements java.io.C
    * if this write is for a replication request (and not from a client)
    */
   private void cleanupBlock() throws IOException {
-    if (isDatanode
-        && initialStage != BlockConstructionStage.TRANSFER_RBW) {
+    if (isDatanode) {
       datanode.data.unfinalizeBlock(block);
     }
   }
@@ -783,51 +801,71 @@ class BlockReceiver implements java.io.C
     }
   }
   
+  private static enum PacketResponderType {
+    NON_PIPELINE, LAST_IN_PIPELINE, HAS_DOWNSTREAM_IN_PIPELINE
+  }
   
   /**
    * Processed responses from downstream datanodes in the pipeline
    * and sends back replies to the originator.
    */
-  class PacketResponder implements Runnable, FSConstants {   
+  class PacketResponder implements Runnable, Closeable, FSConstants {   
 
-    //packet waiting for ack
-    private LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** queue for packets waiting for ack */
+    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>(); 
+    /** the thread that spawns this responder */
+    private final Thread receiverThread = Thread.currentThread();
+    /** is this responder running? */
     private volatile boolean running = true;
-    private Block block;
-    DataInputStream mirrorIn;   // input from downstream datanode
-    DataOutputStream replyOut;  // output to upstream datanode
-    private int numTargets;     // number of downstream datanodes including myself
-    private BlockReceiver receiver; // The owner of this responder.
-    private Thread receiverThread; // the thread that spawns this responder
 
+    /** input from the next downstream datanode */
+    private final DataInputStream downstreamIn;
+    /** output to upstream datanode/client */
+    private final DataOutputStream upstreamOut;
+
+    /** The type of this responder */
+    private final PacketResponderType type;
+    /** for log and error messages */
+    private final String myString; 
+
+    @Override
     public String toString() {
-      return "PacketResponder " + numTargets + " for Block " + this.block;
+      return myString;
     }
 
-    PacketResponder(BlockReceiver receiver, Block b, DataInputStream in, 
-                    DataOutputStream out, int numTargets,
-                    Thread receiverThread) {
-      this.receiverThread = receiverThread;
-      this.receiver = receiver;
-      this.block = b;
-      mirrorIn = in;
-      replyOut = out;
-      this.numTargets = numTargets;
+    PacketResponder(final DataOutputStream upstreamOut,
+        final DataInputStream downstreamIn,
+        final DatanodeInfo[] downstreams) {
+      this.downstreamIn = downstreamIn;
+      this.upstreamOut = upstreamOut;
+
+      this.type = downstreams == null? PacketResponderType.NON_PIPELINE
+          : downstreams.length == 0? PacketResponderType.LAST_IN_PIPELINE
+              : PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE;
+
+      final StringBuilder b = new StringBuilder(getClass().getSimpleName())
+          .append(": ").append(block).append(", type=").append(type);
+      if (type != PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE) {
+        b.append(", downstreams=").append(downstreams.length)
+            .append(":").append(Arrays.asList(downstreams));
+      }
+      this.myString = b.toString();
     }
 
     /**
      * enqueue the seqno that is still be to acked by the downstream datanode.
      * @param seqno
      * @param lastPacketInBlock
-     * @param lastByteInPacket
+     * @param offsetInBlock
      */
-    synchronized void enqueue(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    synchronized void enqueue(final long seqno,
+        final boolean lastPacketInBlock, final long offsetInBlock) {
       if (running) {
+        final Packet p = new Packet(seqno, lastPacketInBlock, offsetInBlock);
         if(LOG.isDebugEnabled()) {
-          LOG.debug("PacketResponder " + numTargets + " adding seqno " + seqno +
-                    " to ack queue.");
+          LOG.debug(myString + ": enqueue " + p);
         }
-        ackQueue.addLast(new Packet(seqno, lastPacketInBlock, lastByteInPacket));
+        ackQueue.addLast(p);
         notifyAll();
       }
     }
@@ -835,7 +873,8 @@ class BlockReceiver implements java.io.C
     /**
      * wait for all pending packets to be acked. Then shutdown thread.
      */
-    synchronized void close() {
+    @Override
+    public synchronized void close() {
       while (running && ackQueue.size() != 0 && datanode.shouldRun) {
         try {
           wait();
@@ -844,8 +883,7 @@ class BlockReceiver implements java.io.C
         }
       }
       if(LOG.isDebugEnabled()) {
-        LOG.debug("PacketResponder " + numTargets +
-                 " for block " + block + " Closing down.");
+        LOG.debug(myString + ": closing");
       }
       running = false;
       notifyAll();
@@ -867,21 +905,21 @@ class BlockReceiver implements java.io.C
             PipelineAck ack = new PipelineAck();
             long seqno = PipelineAck.UNKOWN_SEQNO;
             try {
-              if (numTargets != 0 && !mirrorError) {// not the last DN & no mirror error
+              if (type != PacketResponderType.LAST_IN_PIPELINE
+                  && !mirrorError) {
                 // read an ack from downstream datanode
-                ack.readFields(mirrorIn);
+                ack.readFields(downstreamIn);
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("PacketResponder " + numTargets + " got " + ack);
+                  LOG.debug(myString + " got " + ack);
                 }
                 seqno = ack.getSeqno();
               }
-              if (seqno != PipelineAck.UNKOWN_SEQNO || numTargets == 0) {
+              if (seqno != PipelineAck.UNKOWN_SEQNO
+                  || type == PacketResponderType.LAST_IN_PIPELINE) {
                 synchronized (this) {
                   while (running && datanode.shouldRun && ackQueue.size() == 0) {
                     if (LOG.isDebugEnabled()) {
-                      LOG.debug("PacketResponder " + numTargets + 
-                                " seqno = " + seqno +
-                                " for block " + block +
+                      LOG.debug(myString + ": seqno=" + seqno +
                                 " waiting for local datanode to finish write.");
                     }
                     wait();
@@ -891,11 +929,10 @@ class BlockReceiver implements java.io.C
                   }
                   pkt = ackQueue.getFirst();
                   expected = pkt.seqno;
-                  if (numTargets > 0 && seqno != expected) {
-                    throw new IOException("PacketResponder " + numTargets +
-                                          " for block " + block +
-                                          " expected seqno:" + expected +
-                                          " received:" + seqno);
+                  if (type == PacketResponderType.HAS_DOWNSTREAM_IN_PIPELINE
+                      && seqno != expected) {
+                    throw new IOException(myString + "seqno: expected="
+                        + expected + ", received=" + seqno);
                   }
                   lastPacketInBlock = pkt.lastPacketInBlock;
                 }
@@ -910,8 +947,7 @@ class BlockReceiver implements java.io.C
                 // notify client of the error
                 // and wait for the client to shut down the pipeline
                 mirrorError = true;
-                LOG.info("PacketResponder " + block + " " + numTargets + 
-                      " Exception " + StringUtils.stringifyException(ioe));
+                LOG.info(myString, ioe);
               }
             }
 
@@ -923,8 +959,7 @@ class BlockReceiver implements java.io.C
                * because this datanode has a problem. The upstream datanode
                * will detect that this datanode is bad, and rightly so.
                */
-              LOG.info("PacketResponder " + block +  " " + numTargets +
-                       " : Thread is interrupted.");
+              LOG.info(myString + ": Thread is interrupted.");
               running = false;
               continue;
             }
@@ -932,7 +967,7 @@ class BlockReceiver implements java.io.C
             // If this is the last packet in block, then close block
             // file and finalize the block before responding success
             if (lastPacketInBlock) {
-              receiver.close();
+              BlockReceiver.this.close();
               final long endTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0;
               block.setNumBytes(replicaInfo.getNumBytes());
               datanode.data.finalizeBlock(block);
@@ -940,13 +975,12 @@ class BlockReceiver implements java.io.C
               if (ClientTraceLog.isInfoEnabled() && isClient) {
                 long offset = 0;
                 ClientTraceLog.info(String.format(DN_CLIENTTRACE_FORMAT,
-                      receiver.inAddr, receiver.myAddr, block.getNumBytes(),
-                      "HDFS_WRITE", receiver.clientname, offset,
+                      inAddr, myAddr, block.getNumBytes(),
+                      "HDFS_WRITE", clientname, offset,
                       datanode.dnRegistration.getStorageID(), block, endTime-startTime));
               } else {
-                LOG.info("Received block " + block + 
-                         " of size " + block.getNumBytes() + 
-                         " from " + receiver.inAddr);
+                LOG.info("Received block " + block + " of size "
+                    + block.getNumBytes() + " from " + inAddr);
               }
             }
 
@@ -957,7 +991,8 @@ class BlockReceiver implements java.io.C
               replies[0] = SUCCESS;
               replies[1] = ERROR;
             } else {
-              short ackLen = numTargets == 0 ? 0 : ack.getNumOfReplies();
+              short ackLen = type == PacketResponderType.LAST_IN_PIPELINE? 0
+                  : ack.getNumOfReplies();
               replies = new Status[1+ackLen];
               replies[0] = SUCCESS;
               for (int i=0; i<ackLen; i++) {
@@ -967,20 +1002,18 @@ class BlockReceiver implements java.io.C
             PipelineAck replyAck = new PipelineAck(expected, replies);
             
             // send my ack back to upstream datanode
-            replyAck.write(replyOut);
-            replyOut.flush();
+            replyAck.write(upstreamOut);
+            upstreamOut.flush();
             if (LOG.isDebugEnabled()) {
-              LOG.debug("PacketResponder " + numTargets + 
-                        " for block " + block +
-                        " responded an ack: " + replyAck);
+              LOG.debug(myString + ", replyAck=" + replyAck);
             }
             if (pkt != null) {
               // remove the packet from the ack queue
               removeAckHead();
               // update bytes acked
               if (replyAck.isSuccess() && 
-                  pkt.lastByteInBlock>replicaInfo.getBytesAcked()) {
-                replicaInfo.setBytesAcked(pkt.lastByteInBlock);
+                  pkt.offsetInBlock > replicaInfo.getBytesAcked()) {
+                replicaInfo.setBytesAcked(pkt.offsetInBlock);
               }
             }
         } catch (IOException e) {
@@ -991,8 +1024,7 @@ class BlockReceiver implements java.io.C
             } catch (IOException ioe) {
               LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
             }
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption
               receiverThread.interrupt();
@@ -1000,15 +1032,13 @@ class BlockReceiver implements java.io.C
           }
         } catch (Throwable e) {
           if (running) {
-            LOG.info("PacketResponder " + block + " " + numTargets + 
-                     " Exception " + StringUtils.stringifyException(e));
+            LOG.info(myString, e);
             running = false;
             receiverThread.interrupt();
           }
         }
       }
-      LOG.info("PacketResponder " + numTargets + 
-               " for block " + block + " terminating");
+      LOG.info(myString + " terminating");
     }
     
     /**
@@ -1025,15 +1055,23 @@ class BlockReceiver implements java.io.C
   /**
    * This information is cached by the Datanode in the ackQueue.
    */
-  static private class Packet {
-    long seqno;
-    boolean lastPacketInBlock;
-    long lastByteInBlock;
+  private static class Packet {
+    final long seqno;
+    final boolean lastPacketInBlock;
+    final long offsetInBlock;
 
-    Packet(long seqno, boolean lastPacketInBlock, long lastByteInPacket) {
+    Packet(long seqno, boolean lastPacketInBlock, long offsetInBlock) {
       this.seqno = seqno;
       this.lastPacketInBlock = lastPacketInBlock;
-      this.lastByteInBlock = lastByteInPacket;
+      this.offsetInBlock = offsetInBlock;
+    }
+
+    @Override
+    public String toString() {
+      return getClass().getSimpleName() + "(seqno=" + seqno
+        + ", lastPacketInBlock=" + lastPacketInBlock
+        + ", offsetInBlock=" + offsetInBlock
+        + ")";
     }
   }
 }



Mime
View raw message