hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hair...@apache.org
Subject svn commit: r820497 [1/7] - in /hadoop/hdfs/trunk: ./ .eclipse.templates/.launches/ src/contrib/hdfsproxy/ src/docs/src/documentation/content/xdocs/ src/java/ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apach...
Date Wed, 30 Sep 2009 23:39:33 GMT
Author: hairong
Date: Wed Sep 30 23:39:30 2009
New Revision: 820497

URL: http://svn.apache.org/viewvc?rev=820497&view=rev
Log:
HDFS-265. Merge -c 820487 https://svn.apache.org/repos/asf/hadoop/hdfs/branches/branch-0.21

Added:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FinalizedReplica.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/Replica.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaAlreadyExistsException.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBeingWritten.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaUnderRecovery.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaWaitingToBeRecovered.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicasMap.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfoUnderConstruction.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryCommand.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/ReplicaRecoveryInfo.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/fi/FiHFlushTestUtil.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/HFlushAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/TestFiHFlush.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestHFlush.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReadWhileWriting.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDatanodeRestart.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestWriteToReplica.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestBlockUnderConstruction.java
Removed:
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockAlreadyExistsException.java
Modified:
    hadoop/hdfs/trunk/   (props changed)
    hadoop/hdfs/trunk/.eclipse.templates/.launches/AllTests.launch   (props changed)
    hadoop/hdfs/trunk/.eclipse.templates/.launches/DataNode.launch   (props changed)
    hadoop/hdfs/trunk/.eclipse.templates/.launches/NameNode.launch   (props changed)
    hadoop/hdfs/trunk/.eclipse.templates/.launches/SpecificTestTemplate.launch   (props changed)
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/build.xml   (props changed)
    hadoop/hdfs/trunk/src/contrib/hdfsproxy/   (props changed)
    hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml
    hadoop/hdfs/trunk/src/java/   (props changed)
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockMissingException.java   (props changed)
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/HdfsConstants.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/common/Storage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java   (contents, props changed)
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockInfo.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlocksMap.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/INodeFileUnderConstruction.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/LeaseManager.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/DFSClientAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/protocol/ClientProtocolAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/DataTransferProtocolAspects.aj
    hadoop/hdfs/trunk/src/test/aop/org/apache/hadoop/hdfs/server/datanode/FSDatasetAspects.aj
    hadoop/hdfs/trunk/src/test/findbugsExcludeFile.xml
    hadoop/hdfs/trunk/src/test/hdfs/   (props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/fs/TestHDFSFileContextMainOperations.java   (props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/AppendTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DFSTestUtil.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/DataNodeCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockMissingException.java   (contents, props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlockReport.java   (contents, props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestBlocksScheduledCounter.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSShell.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileAppend3.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCorruption.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestFileCreation.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestInjectionForSimulatedStorage.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestLeaseRecovery2.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestRenameWhileOpen.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestReplication.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java   (props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestAccessTokenWithDFS.java   (props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestFsck.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestLargeDirectoryDelete.java   (props changed)
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/namenode/TestOverReplicatedBlocks.java
    hadoop/hdfs/trunk/src/webapps/datanode/   (props changed)
    hadoop/hdfs/trunk/src/webapps/hdfs/   (props changed)
    hadoop/hdfs/trunk/src/webapps/secondary/   (props changed)

Propchange: hadoop/hdfs/trunk/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Sep 30 23:39:30 2009
@@ -3,6 +3,5 @@
 logs
 .classpath
 .externalToolBuilders
-.launches
 .project
 .settings

Propchange: hadoop/hdfs/trunk/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1 +1,3 @@
 /hadoop/core/branches/branch-0.19/hdfs:713112
+/hadoop/hdfs/branches/HDFS-265:796829-820463
+/hadoop/hdfs/branches/branch-0.21:820487

Propchange: hadoop/hdfs/trunk/.eclipse.templates/.launches/AllTests.launch
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Propchange: hadoop/hdfs/trunk/.eclipse.templates/.launches/DataNode.launch
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Propchange: hadoop/hdfs/trunk/.eclipse.templates/.launches/NameNode.launch
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Propchange: hadoop/hdfs/trunk/.eclipse.templates/.launches/SpecificTestTemplate.launch
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Wed Sep 30 23:39:30 2009
@@ -26,6 +26,21 @@
     HDFS-602. DistributedFileSystem mkdirs throws FileAlreadyExistsException
     instead of FileNotFoundException. (Boris Shkolnik via suresh)
 
+    HDFS-544. Add a "rbw" subdir to DataNode data directory. (hairong)
+
+    HDFS-576. Block report includes under-construction replicas. (shv)
+
+    HDFS-636. SafeMode counts complete blocks only. (shv)
+
+    HDFS-644. Lease recovery, concurrency support. (shv)
+
+    HDFS-570. Get last block length from a data-node when opening a file
+    being written to. (Tsz Wo (Nicholas), SZE via shv)
+
+    HDFS-657. Remove unused legacy data-node protocol methods. (shv)
+
+    HDFS-658. Block recovery for primary data-node. (shv)
+
   NEW FEATURES
 
     HDFS-436. Introduce AspectJ framework for HDFS code and tests.
@@ -64,6 +79,37 @@
 
     HDFS-610. Support o.a.h.fs.FileContext.  (Sanjay Radia via szetszwo)
 
+    HDFS-536. Support hflush at DFSClient. (hairong)
+
+    HDFS-517. Introduce BlockInfoUnderConstruction to reflect block replica
+    states while writing. (shv)
+
+    HDFS-565. Introduce block committing logic during new block allocation
+    and file close. (shv)
+
+    HDFS-537. DataNode exposes a replica's meta info to BlockReceiver for the
+    support of dfs writes/hflush. It also updates a replica's bytes received,
+    bytes on disk, and bytes acked after receiving a packet. (hairong)
+
+    HDFS-585. Datanode should serve up to visible length of a replica for read
+    requests.  (szetszwo)
+
+    HDFS-604. Block report processing for append. (shv)
+
+    HDFS-619. Support replica recovery initialization in datanode for the new
+    append design.  (szetszwo)
+
+    HDFS-592. Allow clients to fetch a new generation stamp from NameNode for
+    pipeline recovery. (hairong)
+
+    HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup
+    for append. (hairong)
+
+    HDFS-627. Support replica update in data-node.
+    (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
+
+    HDFS-642. Support pipeline close and close error recovery. (hairong)
+
   IMPROVEMENTS
 
     HDFS-381. Remove blocks from DataNode maps when corresponding file
@@ -191,6 +237,31 @@
     HDFS-641. Move all of the components that depend on map/reduce to 
     map/reduce. (omalley)
 
+    HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
+    (hairong)
+
+    HDFS-562. Add a test for NameNode.getBlockLocations(..) to check read from
+    un-closed file.  (szetszwo)
+
+    HDFS-543. Break FSDatasetInterface#writToBlock() into writeToRemporary,
+    writeToRBW, ad append. (hairong)
+
+    HDFS-603. Add a new interface, Replica, which is going to replace the use
+    of Block in datanode.  (szetszwo)
+
+    HDFS-589. Change block write protocol to support pipeline recovery.
+    (hairong)
+
+    HDFS-652. Replace BlockInfo.isUnderConstruction() with isComplete() (shv)
+
+    HDFS-648. Change some methods in AppendTestUtil to public.  (Konstantin
+    Boudnik via szetszwo)
+
+    HDFS-662. Unnecessary info message from DFSClient. (hairong)
+
+    HDFS-518. Create new tests for Append's hflush. (Konstantin Boudnik
+    via szetszwo)
+
   BUG FIXES
 
     HDFS-76. Better error message to users when commands fail because of 
@@ -288,6 +359,28 @@
 
     HDFS-640. Fixed TestHDFSFileContextMainOperations.java build failure. (suresh)
 
+    HDFS-547. TestHDFSFileSystemContract#testOutputStreamClosedTwice
+    sometimes fails with CloseByInterruptException. (hairong)
+
+    HDFS-588. Fix TestFiDataTransferProtocol and TestAppend2 failures. (shv)
+
+    HDFS-550. DataNode restarts may introduce corrupt/duplicated/lost replicas
+    when handling detached replicas. (hairong)
+
+    HDFS-659. If the the last block is not complete, update its length with
+    one of its replica's length stored in datanode.  (szetszwo)
+
+    HDFS-649. Check null pointers for DataTransferTest.  (Konstantin Boudnik
+    via szetszwo)
+
+    HDFS-661. DataNode upgrade fails on non-existant current directory.
+    (hairong)
+
+    HDFS-597. Mofication introduced by HDFS-537 breakes an advice binding in
+    FSDatasetAspects.  (Konstantin Boudnik via szetszwo)
+
+    HDFS-665. TestFileAppend2 sometimes hangs. (hairong)
+
 Release 0.20.1 - 2009-09-01
 
   IMPROVEMENTS

Propchange: hadoop/hdfs/trunk/build.xml
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/build.xml:713112
 /hadoop/core/trunk/build.xml:779102
+/hadoop/hdfs/branches/HDFS-265/build.xml:796829-820463
+/hadoop/hdfs/branches/branch-0.21/build.xml:820487

Propchange: hadoop/hdfs/trunk/src/contrib/hdfsproxy/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/contrib/hdfsproxy:713112
 /hadoop/core/trunk/src/contrib/hdfsproxy:776175-784663
+/hadoop/hdfs/branches/HDFS-265/src/contrib/hdfsproxy:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/contrib/hdfsproxy:820487

Modified: hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml (original)
+++ hadoop/hdfs/trunk/src/docs/src/documentation/content/xdocs/hdfs_imageviewer.xml Wed Sep 30 23:39:30 2009
@@ -219,7 +219,7 @@
 
         PermString = rwxr-xr-x 
 
-…remaining output omitted…
+���remaining output omitted���
 </source>          
           
       </section> <!-- example-->

Propchange: hadoop/hdfs/trunk/src/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Sep 30 23:39:30 2009
@@ -1,2 +1,4 @@
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278
+/hadoop/hdfs/branches/HDFS-265/src/java:796829-820463
+/hadoop/hdfs/branches/branch-0.21/src/java:820487

Propchange: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/BlockMissingException.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Wed Sep 30 23:39:30 2009
@@ -36,8 +36,6 @@
 import java.nio.ByteBuffer;
 import java.util.AbstractMap;
 import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -86,6 +84,7 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
+import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -1432,8 +1431,8 @@
         int dataLen = in.readInt();
       
         // Sanity check the lengths
-        if ( dataLen < 0 || 
-             ( (dataLen % bytesPerChecksum) != 0 && !lastPacketInBlock ) ||
+        if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
+             ( dataLen != 0 && lastPacketInBlock) ||
              (seqno != (lastSeqNo + 1)) ) {
              throw new IOException("BlockReader: error in packet header" +
                                    "(chunkOffset : " + chunkOffset + 
@@ -1612,6 +1611,7 @@
     private BlockReader blockReader = null;
     private boolean verifyChecksum;
     private LocatedBlocks locatedBlocks = null;
+    private long lastBlockBeingWrittenLength = 0;
     private DatanodeInfo currentNode = null;
     private Block currentBlock = null;
     private long pos = 0;
@@ -1644,6 +1644,9 @@
      */
     synchronized void openInfo() throws IOException {
       LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("newInfo = " + newInfo);
+      }
       if (newInfo == null) {
         throw new IOException("Cannot open filename " + src);
       }
@@ -1658,11 +1661,46 @@
         }
       }
       this.locatedBlocks = newInfo;
+      this.lastBlockBeingWrittenLength = 0;
+      if (!locatedBlocks.isLastBlockComplete()) {
+        final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
+        if (last != null) {
+          final long len = readBlockLength(last);
+          last.getBlock().setNumBytes(len);
+          this.lastBlockBeingWrittenLength = len; 
+        }
+      }
+
       this.currentNode = null;
     }
+
+    /** Read the block length from one of the datanodes. */
+    private long readBlockLength(LocatedBlock locatedblock) throws IOException {
+      if (locatedblock == null || locatedblock.getLocations().length == 0) {
+        return 0;
+      }
+      for(DatanodeInfo datanode : locatedblock.getLocations()) {
+        try {
+          final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
+              datanode, conf);
+          final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
+          if (n >= 0) {
+            return n;
+          }
+        }
+        catch(IOException ioe) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Faild to getReplicaVisibleLength from datanode "
+                + datanode + " for block " + locatedblock.getBlock(), ioe);
+          }
+        }
+      }
+      throw new IOException("Cannot obtain block length for " + locatedblock);
+    }
     
     public synchronized long getFileLength() {
-      return (locatedBlocks == null) ? 0 : locatedBlocks.getFileLength();
+      return locatedBlocks == null? 0:
+          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
     }
 
     /**
@@ -1698,17 +1736,36 @@
     private synchronized LocatedBlock getBlockAt(long offset,
         boolean updatePosition) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
-      // search cached blocks first
-      int targetBlockIdx = locatedBlocks.findBlock(offset);
-      if (targetBlockIdx < 0) { // block is not cached
-        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-        // fetch more blocks
-        LocatedBlocks newBlocks;
-        newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-        assert (newBlocks != null) : "Could not find target position " + offset;
-        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+
+      final LocatedBlock blk;
+
+      //check offset
+      if (offset < 0 || offset >= getFileLength()) {
+        throw new IOException("offset < 0 || offset > getFileLength(), offset="
+            + offset
+            + ", updatePosition=" + updatePosition
+            + ", locatedBlocks=" + locatedBlocks);
+      }
+      else if (offset >= locatedBlocks.getFileLength()) {
+        // offset to the portion of the last block,
+        // which is not known to the name-node yet;
+        // getting the last block 
+        blk = locatedBlocks.getLastLocatedBlock();
+      }
+      else {
+        // search cached blocks first
+        int targetBlockIdx = locatedBlocks.findBlock(offset);
+        if (targetBlockIdx < 0) { // block is not cached
+          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+          // fetch more blocks
+          LocatedBlocks newBlocks;
+          newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
+          assert (newBlocks != null) : "Could not find target position " + offset;
+          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+        }
+        blk = locatedBlocks.get(targetBlockIdx);
       }
-      LocatedBlock blk = locatedBlocks.get(targetBlockIdx);
+
       // update current position
       if (updatePosition) {
         this.pos = offset;
@@ -1745,6 +1802,27 @@
     private synchronized List<LocatedBlock> getBlockRange(long offset, 
                                                           long length) 
                                                         throws IOException {
+      final List<LocatedBlock> blocks;
+      if (locatedBlocks.isLastBlockComplete()) {
+        blocks = getFinalizedBlockRange(offset, length);
+      }
+      else {
+        if (length + offset > locatedBlocks.getFileLength()) {
+          length = locatedBlocks.getFileLength() - offset;
+        }
+        blocks = getFinalizedBlockRange(offset, length);
+        blocks.add(locatedBlocks.getLastLocatedBlock());
+      }
+      return blocks;
+    }
+
+    /**
+     * Get blocks in the specified range.
+     * Includes only the complete blocks.
+     * Fetch them from the namenode if not cached.
+     */
+    private synchronized List<LocatedBlock> getFinalizedBlockRange(
+        long offset, long length) throws IOException {
       assert (locatedBlocks != null) : "locatedBlocks is null";
       List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
       // search cached blocks first
@@ -2312,7 +2390,7 @@
     private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
     private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
     private Packet currentPacket = null;
-    private DataStreamer streamer = new DataStreamer();
+    private DataStreamer streamer;
     private long currentSeqno = 0;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
@@ -2419,6 +2497,18 @@
         buffer.reset();
         return buffer;
       }
+      
+      // get the packet's last byte's offset in the block
+      long getLastByteOffsetBlock() {
+        return offsetInBlock + dataPos - dataStart;
+      }
+      
+      public String toString() {
+        return "packet seqno:" + this.seqno +
+        " offsetInBlock:" + this.offsetInBlock + 
+        " lastPacketInBlock:" + this.lastPacketInBlock +
+        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
+      }
     }
   
     //
@@ -2430,18 +2520,101 @@
     // if them are received, the DataStreamer closes the current block.
     //
     class DataStreamer extends Daemon {
-      private static final int MAX_RECOVERY_ERROR_COUNT = 5; // try block recovery 5 times
-      private int recoveryErrorCount = 0; // number of times block recovery failed
       private volatile boolean streamerClosed = false;
-      private Block block;
+      private Block block; // its length is number of bytes acked
       private AccessToken accessToken;
       private DataOutputStream blockStream;
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
       volatile boolean hasError = false;
-      volatile int errorIndex = 0;
-  
+      volatile int errorIndex = -1;
+      private BlockConstructionStage stage;  // block construction stage
+      private long bytesSent = 0; // number of bytes that've been sent
+
+      /**
+       * Default construction for file create
+       */
+      private DataStreamer() {
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
+      /**
+       * Construct a data streamer for append
+       * @param lastBlock last block of the file to be appended
+       * @param stat status of the file to be appended
+       * @param bytesPerChecksum number of bytes per checksum
+       * @throws IOException if error occurs
+       */
+      private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
+          int bytesPerChecksum) throws IOException {
+        stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
+        block = lastBlock.getBlock();
+        bytesSent = block.getNumBytes();
+        accessToken = lastBlock.getAccessToken();
+        long usedInLastBlock = stat.getLen() % blockSize;
+        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
+
+        // calculate the amount of free space in the pre-existing 
+        // last crc chunk
+        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
+        int freeInCksum = bytesPerChecksum - usedInCksum;
+
+        // if there is space in the last block, then we have to 
+        // append to that block
+        if (freeInLastBlock == blockSize) {
+          throw new IOException("The last block for file " + 
+              src + " is full.");
+        }
+
+        if (usedInCksum > 0 && freeInCksum > 0) {
+          // if there is space in the last partial chunk, then 
+          // setup in such a way that the next packet will have only 
+          // one chunk that fills up the partial chunk.
+          //
+          computePacketChunkSize(0, freeInCksum);
+          resetChecksumChunk(freeInCksum);
+          appendChunk = true;
+        } else {
+          // if the remaining space in the block is smaller than 
+          // that expected size of of a packet, then create 
+          // smaller size packet.
+          //
+          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
+              bytesPerChecksum);
+        }
+
+        // setup pipeline to append to the last block XXX retries??
+        nodes = lastBlock.getLocations();
+        errorIndex = -1;   // no errors yet.
+        if (nodes.length < 1) {
+          throw new IOException("Unable to retrieve blocks locations " +
+              " for last block " + block +
+              "of file " + src);
+
+        }
+      }
+      
+      /**
+       * Initialize for data streaming
+       */
+      private void initDataStreaming() {
+        this.setName("DataStreamer for file " + src +
+            " block " + block);
+        response = new ResponseProcessor(nodes);
+        response.start();
+        stage = BlockConstructionStage.DATA_STREAMING;
+      }
+      
+      private void endBlock() {
+        LOG.debug("Closing old block " + block);
+        this.setName("DataStreamer for file " + src);
+        closeResponder();
+        closeStream();
+        nodes = null;
+        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
+      }
+      
       /*
        * streamer thread is the only thread that opens streams to datanode, 
        * and closes them. Any error recovery is also done by this thread.
@@ -2461,47 +2634,69 @@
 
           Packet one = null;
 
-          // process IO errors if any
-          boolean doSleep = processDatanodeError(hasError, false);
+          try {
+            // process datanode IO errors if any
+            boolean doSleep = false;
+            if (hasError && errorIndex>=0) {
+              doSleep = processDatanodeError();
+            }
 
-          synchronized (dataQueue) {
-            // wait for a packet to be sent.
-            while ((!streamerClosed && !hasError && clientRunning 
-                && dataQueue.size() == 0) || doSleep) {
-              try {
-                dataQueue.wait(1000);
-              } catch (InterruptedException  e) {
+            synchronized (dataQueue) {
+              // wait for a packet to be sent.
+              while ((!streamerClosed && !hasError && clientRunning 
+                  && dataQueue.size() == 0) || doSleep) {
+                try {
+                  dataQueue.wait(1000);
+                } catch (InterruptedException  e) {
+                }
+                doSleep = false;
               }
-              doSleep = false;
-            }
-            if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
-              continue;
+              if (streamerClosed || hasError || dataQueue.size() == 0 || !clientRunning) {
+                continue;
+              }
+              // get packet to be sent.
+              one = dataQueue.getFirst();
             }
-            // get packet to be sent.
-            one = dataQueue.getFirst();
-          }
-
-          try {
-            long offsetInBlock = one.offsetInBlock;
 
             // get new block from namenode.
-            if (blockStream == null) {
+            if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
               LOG.debug("Allocating new block");
-              nodes = nextBlockOutputStream(src); 
-              this.setName("DataStreamer for file " + src +
-                  " block " + block);
-              response = new ResponseProcessor(nodes);
-              response.start();
+              nodes = nextBlockOutputStream(src);
+              initDataStreaming();
+            } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
+              LOG.debug("Append to block " + block);
+              setupPipelineForAppendOrRecovery();
+              initDataStreaming();
             }
 
-            if (offsetInBlock >= blockSize) {
+            long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
+            if (lastByteOffsetInBlock > blockSize) {
               throw new IOException("BlockSize " + blockSize +
                   " is smaller than data size. " +
                   " Offset of packet in block " + 
-                  offsetInBlock +
+                  lastByteOffsetInBlock +
                   " Aborting file " + src);
             }
 
+            if (one.lastPacketInBlock) {
+              // wait for all data packets have been successfully acked
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  try {
+                    // wait for acks to arrive from datanodes
+                    dataQueue.wait(1000);
+                  } catch (InterruptedException  e) {
+                  }
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+              stage = BlockConstructionStage.PIPELINE_CLOSE;
+            }
+            
+            // send the packet
             ByteBuffer buf = one.getBuffer();
 
             synchronized (dataQueue) {
@@ -2511,19 +2706,45 @@
               dataQueue.notifyAll();
             }
 
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("DataStreamer block " + block +
+                  " sending packet " + one);
+            }
+
             // write out data to remote datanode
             blockStream.write(buf.array(), buf.position(), buf.remaining());
+            blockStream.flush();
+            
+            // update bytesSent
+            long tmpBytesSent = one.getLastByteOffsetBlock();
+            if (bytesSent < tmpBytesSent) {
+              bytesSent = tmpBytesSent;
+            }
 
+            if (streamerClosed || hasError || !clientRunning) {
+              continue;
+            }
+
+            // Is this block full?
             if (one.lastPacketInBlock) {
-              blockStream.writeInt(0); // indicate end-of-block 
+              // wait for the close packet has been acked
+              synchronized (dataQueue) {
+                while (!streamerClosed && !hasError && 
+                    ackQueue.size() != 0 && clientRunning) {
+                  dataQueue.wait(1000);// wait for acks to arrive from datanodes
+                }
+              }
+              if (streamerClosed || hasError || !clientRunning) {
+                continue;
+              }
+
+              endBlock();
             }
-            blockStream.flush();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DataStreamer block " + block +
-                  " wrote packet seqno:" + one.seqno +
-                  " size:" + buf.remaining() +
-                  " offsetInBlock:" + one.offsetInBlock + 
-                  " lastPacketInBlock:" + one.lastPacketInBlock);
+            if (progress != null) { progress.progress(); }
+
+            // This is used by unit test to trigger race conditions.
+            if (artificialSlowdown != 0 && clientRunning) {
+              Thread.sleep(artificialSlowdown); 
             }
           } catch (Throwable e) {
             LOG.warn("DataStreamer Exception: " + 
@@ -2532,47 +2753,16 @@
               setLastException((IOException)e);
             }
             hasError = true;
-          }
-
-
-          if (streamerClosed || hasError || !clientRunning) {
-            continue;
-          }
-
-          // Is this block full?
-          if (one.lastPacketInBlock) {
-            synchronized (dataQueue) {
-              while (!streamerClosed && !hasError && ackQueue.size() != 0 && clientRunning) {
-                try {
-                  dataQueue.wait(1000);   // wait for acks to arrive from datanodes
-                } catch (InterruptedException  e) {
-                }
-              }
-            }
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
+            if (errorIndex == -1) { // not a datanode error
+              streamerClosed = true;
             }
-
-            LOG.debug("Closing old block " + block);
-            this.setName("DataStreamer for file " + src);
-            closeResponder();
-            closeStream();
-            nodes = null;
-          }
-          if (progress != null) { progress.progress(); }
-
-          // This is used by unit test to trigger race conditions.
-          if (artificialSlowdown != 0 && clientRunning) {
-            try { 
-              Thread.sleep(artificialSlowdown); 
-            } catch (InterruptedException e) {}
           }
         }
         closeInternal();
       }
 
       private void closeInternal() {
-        closeResponder();
+        closeResponder();       // close and join
         closeStream();
         streamerClosed = true;
         closed = true;
@@ -2585,10 +2775,16 @@
        * close both streamer and DFSOutputStream, should be called only 
        * by an external thread and only after all data to be sent has 
        * been flushed to datanode.
+       * 
+       * Interrupt this data streamer if force is true
+       * 
+       * @param force if this data stream is forced to be closed 
        */
-      void close() {
+      void close(boolean force) {
         streamerClosed = true;
-        this.interrupt();
+        if (force) {
+          this.interrupt();
+        }
       }
 
       private void closeResponder() {
@@ -2646,12 +2842,12 @@
               // verify seqno from datanode
               long seqno = blockReplyStream.readLong();
               LOG.debug("DFSClient received ack for seqno " + seqno);
+              Packet one = null;
               if (seqno == -1) {
                 continue;
               } else if (seqno == -2) {
                 // no nothing
               } else {
-                Packet one = null;
                 synchronized (dataQueue) {
                   one = ackQueue.getFirst();
                 }
@@ -2664,10 +2860,20 @@
               }
 
               // processes response status from all datanodes.
+              String replies = null;
+              if (LOG.isDebugEnabled()) {
+                replies = "DFSClient Replies for seqno " + seqno + " are";
+              }
               for (int i = 0; i < targets.length && clientRunning; i++) {
                 final DataTransferProtocol.Status reply
                     = DataTransferProtocol.Status.read(blockReplyStream);
+                if (LOG.isDebugEnabled()) {
+                  replies += " " + reply;
+                }
                 if (reply != SUCCESS) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug(replies);
+                  }
                   errorIndex = i; // first bad datanode
                   throw new IOException("Bad response " + reply +
                       " for block " + block +
@@ -2676,6 +2882,18 @@
                 }
               }
 
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(replies);
+              }
+              
+              if (one == null) {
+                throw new IOException("Panic: responder did not receive " +
+                    "an ack for a packet: " + seqno);
+              }
+              
+              // update bytesAcked
+              block.setNumBytes(one.getLastByteOffsetBlock());
+
               synchronized (dataQueue) {
                 ackQueue.removeFirst();
                 dataQueue.notifyAll();
@@ -2686,6 +2904,7 @@
                   setLastException((IOException)e);
                 }
                 hasError = true;
+                errorIndex = errorIndex==-1 ? 0 : errorIndex;
                 synchronized (dataQueue) {
                   dataQueue.notifyAll();
                 }
@@ -2708,21 +2927,12 @@
       // threads and mark stream as closed. Returns true if we should
       // sleep for a while after returning from this call.
       //
-      private boolean processDatanodeError(boolean error, boolean isAppend) {
-        if (!error) {
-          return false;
-        }
+      private boolean processDatanodeError() throws IOException {
         if (response != null) {
           LOG.info("Error Recovery for block " + block +
           " waiting for responder to exit. ");
           return true;
         }
-        if (errorIndex >= 0) {
-          LOG.warn("Error Recovery for block " + block
-              + " bad datanode[" + errorIndex + "] "
-              + (nodes == null? "nodes == null": nodes[errorIndex].getName()));
-        }
-
         closeStream();
 
         // move packets from ack queue to front of the data queue
@@ -2731,31 +2941,57 @@
           ackQueue.clear();
         }
 
-        boolean success = false;
-        while (!success && !streamerClosed && clientRunning) {
-          DatanodeInfo[] newnodes = null;
-          if (nodes == null) {
-            String msg = "Could not get block locations. " + "Source file \""
-                + src + "\" - Aborting...";
-            LOG.warn(msg);
-            setLastException(new IOException(msg));
-            streamerClosed = true;
-            return false;
-          }
-          StringBuilder pipelineMsg = new StringBuilder();
-          for (int j = 0; j < nodes.length; j++) {
-            pipelineMsg.append(nodes[j].getName());
-            if (j < nodes.length - 1) {
-              pipelineMsg.append(", ");
+        boolean doSleep = setupPipelineForAppendOrRecovery();
+        
+        if (!streamerClosed && clientRunning) {
+          if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
+            synchronized (dataQueue) {
+              dataQueue.remove();  // remove the end of block packet
+              dataQueue.notifyAll();
             }
+            endBlock();
+          } else {
+            initDataStreaming();
           }
+        }
+        
+        return doSleep;
+      }
+
+
+      /**
+       * Open a DataOutputStream to a DataNode pipeline so that 
+       * it can be written to.
+       * This happens when a file is appended or data streaming fails
+       * It keeps on trying until a pipeline is setup
+       */
+      private boolean setupPipelineForAppendOrRecovery() throws IOException {
+        // check number of datanodes
+        if (nodes == null || nodes.length == 0) {
+          String msg = "Could not get block locations. " + "Source file \""
+              + src + "\" - Aborting...";
+          LOG.warn(msg);
+          setLastException(new IOException(msg));
+          streamerClosed = true;
+          return false;
+        }
+        
+        boolean success = false;
+        long newGS = 0L;
+        while (!success && !streamerClosed && clientRunning) {
+          boolean isRecovery = hasError;
           // remove bad datanode from list of datanodes.
           // If errorIndex was not set (i.e. appends), then do not remove 
           // any datanodes
           // 
-          if (errorIndex < 0) {
-            newnodes = nodes;
-          } else {
+          if (errorIndex >= 0) {
+            StringBuilder pipelineMsg = new StringBuilder();
+            for (int j = 0; j < nodes.length; j++) {
+              pipelineMsg.append(nodes[j].getName());
+              if (j < nodes.length - 1) {
+                pipelineMsg.append(", ");
+              }
+            }
             if (nodes.length <= 1) {
               lastException = new IOException("All datanodes " + pipelineMsg
                   + " are bad. Aborting...");
@@ -2765,86 +3001,32 @@
             LOG.warn("Error Recovery for block " + block +
                 " in pipeline " + pipelineMsg + 
                 ": bad datanode " + nodes[errorIndex].getName());
-            newnodes =  new DatanodeInfo[nodes.length-1];
+            DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
             System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
             System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
                 newnodes.length-errorIndex);
+            nodes = newnodes;
+            this.hasError = false;
+            lastException = null;
+            errorIndex = -1;
           }
 
-          // Tell the primary datanode to do error recovery 
-          // by stamping appropriate generation stamps.
-          //
-          LocatedBlock newBlock = null;
-          ClientDatanodeProtocol primary =  null;
-          DatanodeInfo primaryNode = null;
-          try {
-            // Pick the "least" datanode as the primary datanode to avoid deadlock.
-            primaryNode = Collections.min(Arrays.asList(newnodes));
-            primary = createClientDatanodeProtocolProxy(primaryNode, conf);
-            newBlock = primary.recoverBlock(block, isAppend, newnodes);
-          } catch (IOException e) {
-            recoveryErrorCount++;
-            if (recoveryErrorCount > MAX_RECOVERY_ERROR_COUNT) {
-              if (nodes.length > 1) {
-                // if the primary datanode failed, remove it from the list.
-                // The original bad datanode is left in the list because it is
-                // conservative to remove only one datanode in one iteration.
-                for (int j = 0; j < nodes.length; j++) {
-                  if (nodes[j].equals(primaryNode)) {
-                    errorIndex = j; // forget original bad node.
-                  }
-                }
-                // remove primary node from list
-                newnodes =  new DatanodeInfo[nodes.length-1];
-                System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-                System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                    newnodes.length-errorIndex);
-                nodes = newnodes;
-                LOG.warn("Error Recovery for block " + block + " failed "
-                    + " because recovery from primary datanode " + primaryNode
-                    + " failed " + recoveryErrorCount + " times. "
-                    + " Pipeline was " + pipelineMsg
-                    + ". Marking primary datanode as bad.");
-                recoveryErrorCount = 0; 
-                errorIndex = -1;
-                return true;          // sleep when we return from here
-              }
-              String emsg = "Error Recovery for block " + block + " failed "
-                  + " because recovery from primary datanode " + primaryNode
-                  + " failed " + recoveryErrorCount + " times. "
-                  + " Pipeline was " + pipelineMsg + ". Aborting...";
-              LOG.warn(emsg);
-              lastException = new IOException(emsg);
-              streamerClosed = true;
-              return false;       // abort with IOexception
-            } 
-            LOG.warn("Error Recovery for block " + block + " failed "
-                + " because recovery from primary datanode " + primaryNode
-                + " failed " + recoveryErrorCount + " times. "
-                + " Pipeline was " + pipelineMsg + ". Will retry...");
-            return true;          // sleep when we return from here
-          } finally {
-            RPC.stopProxy(primary);
-          }
-          recoveryErrorCount = 0; // block recovery successful
-
-          // If the block recovery generated a new generation stamp, use that
-          // from now on.  Also, setup new pipeline
-          // newBlock should never be null and it should contain a newly 
-          // generated access token.
-          block = newBlock.getBlock();
-          accessToken = newBlock.getAccessToken();
-          nodes = newBlock.getLocations();
-
-          this.hasError = false;
-          lastException = null;
-          errorIndex = 0;
-          success = createBlockOutputStream(nodes, clientName, true);
+          // get a new generation stamp and an access token
+          LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
+          newGS = lb.getBlock().getGenerationStamp();
+          accessToken = lb.getAccessToken();
+          
+          // set up the pipeline again with the remaining nodes
+          success = createBlockOutputStream(nodes, newGS, isRecovery);
         }
 
-        if (!streamerClosed && clientRunning) {
-          response = new ResponseProcessor(nodes);
-          response.start();
+        if (success) {
+          // update pipeline at the namenode
+          Block newBlock = new Block(
+              block.getBlockId(), block.getNumBytes(), newGS);
+          namenode.updatePipeline(clientName, block, newBlock, nodes);
+          // update client side generation stamp
+          block = newBlock;
         }
         return false; // do not sleep, continue processing
       }
@@ -2864,24 +3046,26 @@
         do {
           hasError = false;
           lastException = null;
-          errorIndex = 0;
+          errorIndex = -1;
           retry = false;
           success = false;
 
           long startTime = System.currentTimeMillis();
           lb = locateFollowingBlock(startTime);
           block = lb.getBlock();
+          block.setNumBytes(0);
           accessToken = lb.getAccessToken();
           nodes = lb.getLocations();
 
           //
           // Connect to first DataNode in the list.
           //
-          success = createBlockOutputStream(nodes, clientName, false);
+          success = createBlockOutputStream(nodes, 0L, false);
 
           if (!success) {
             LOG.info("Abandoning block " + block);
             namenode.abandonBlock(block, src, clientName);
+            block = null;
 
             // Connection failed.  Let's wait a little bit and retry
             retry = true;
@@ -2904,7 +3088,7 @@
       // connects to the first datanode in the pipeline
       // Returns true if success, otherwise return failure.
       //
-      private boolean createBlockOutputStream(DatanodeInfo[] nodes, String client,
+      private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
           boolean recoveryFlag) {
         DataTransferProtocol.Status pipelineStatus = SUCCESS;
         String firstBadLink = "";
@@ -2939,9 +3123,11 @@
               DataNode.SMALL_BUFFER_SIZE));
           blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
 
+          // send the request
           DataTransferProtocol.Sender.opWriteBlock(out,
-              block.getBlockId(), block.getGenerationStamp(), nodes.length,
-              recoveryFlag, client, null, nodes, accessToken);
+              block.getBlockId(), block.getGenerationStamp(),
+              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
+              block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
           checksum.writeHeader(out);
           out.flush();
 
@@ -2974,6 +3160,8 @@
                 break;
               }
             }
+          } else {
+            errorIndex = 0;
           }
           hasError = true;
           setLastException(ie);
@@ -2989,7 +3177,7 @@
           long localstart = System.currentTimeMillis();
           while (true) {
             try {
-              return namenode.addBlock(src, clientName);
+              return namenode.addBlock(src, clientName, block);
             } catch (RemoteException e) {
               IOException ue = 
                 e.unwrapRemoteException(FileNotFoundException.class,
@@ -3030,52 +3218,8 @@
         } 
       }
 
-      void initAppend(LocatedBlock lastBlock, FileStatus stat,
-          int bytesPerChecksum) throws IOException {
-        block = lastBlock.getBlock();
-        accessToken = lastBlock.getAccessToken();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock > blockSize) {
-          throw new IOException("The last block for file " + 
-              src + " is full.");
-        }
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-              bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-              " for last block " + block +
-              "of file " + src);
-
-        }
-        processDatanodeError(true, true);
+      Block getBlock() {
+        return block;
       }
 
       DatanodeInfo[] getNodes() {
@@ -3160,6 +3304,7 @@
                                        NSQuotaExceededException.class,
                                        DSQuotaExceededException.class);
       }
+      streamer = new DataStreamer();
       streamer.start();
     }
   
@@ -3179,9 +3324,10 @@
       if (lastBlock != null) {
         // indicate that we are appending to an existing block
         bytesCurBlock = lastBlock.getBlockSize();
-        streamer.initAppend(lastBlock, stat, bytesPerChecksum);
+        streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
       } else {
         computePacketChunkSize(writePacketSize, bytesPerChecksum);
+        streamer = new DataStreamer();
       }
       streamer.start();
     }
@@ -3270,15 +3416,6 @@
               ", blockSize=" + blockSize +
               ", appendChunk=" + appendChunk);
         }
-        //
-        // if we allocated a new packet because we encountered a block
-        // boundary, reset bytesCurBlock.
-        //
-        if (bytesCurBlock == blockSize) {
-          currentPacket.lastPacketInBlock = true;
-          bytesCurBlock = 0;
-          lastFlushOffset = -1;
-        }
         waitAndQueuePacket(currentPacket);
         currentPacket = null;
 
@@ -3291,15 +3428,41 @@
         }
         int psize = Math.min((int)(blockSize-bytesCurBlock), writePacketSize);
         computePacketChunkSize(psize, bytesPerChecksum);
+        
+        //
+        // if encountering a block boundary, send an empty packet to 
+        // indicate the end of block and reset bytesCurBlock.
+        //
+        if (bytesCurBlock == blockSize) {
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
+          currentPacket.lastPacketInBlock = true;
+          waitAndQueuePacket(currentPacket);
+          currentPacket = null;
+          bytesCurBlock = 0;
+          lastFlushOffset = -1;
+        }
       }
     }
   
     /**
-     * All data is written out to datanodes. It is not guaranteed 
-     * that data has been flushed to persistent store on the 
-     * datanode. Block allocations are persisted on namenode.
+     * @deprecated As of HDFS 0.21.0, replaced by hflush
+     * @see #hflush()
      */
+    @Deprecated
     public synchronized void sync() throws IOException {
+      hflush();
+    }
+    
+    /**
+     * All data is flushed out to datanodes.
+     * It is a synchronous operation. When it returns,
+     * it gurantees that flushed data become visible to new readers. 
+     * It is not guaranteed that data has been flushed to 
+     * persistent store on the datanode. 
+     * Block allocations are persisted on namenode.
+     */
+    public synchronized void hflush() throws IOException {
       checkOpen();
       isClosed();
       try {
@@ -3343,7 +3506,7 @@
         }
       } catch (IOException e) {
           lastException = new IOException("IOException flush:" + e);
-          closeThreads();
+          closeThreads(true);
           throw e;
       }
     }
@@ -3384,13 +3547,14 @@
       }
       streamer.setLastException(new IOException("Lease timeout of " +
                                (hdfsTimeout/1000) + " seconds expired."));
-      closeThreads();
+      closeThreads(true);
     }
  
     // shutdown datastreamer and responseprocessor threads.
-    private void closeThreads() throws IOException {
+    // interrupt datastreamer if force is true
+    private void closeThreads(boolean force) throws IOException {
       try {
-        streamer.close();
+        streamer.close(force);
         streamer.join();
         if (s != null) {
           s.close();
@@ -3421,21 +3585,22 @@
       try {
         flushBuffer();       // flush from all upper layers
 
-        // Mark that this packet is the last packet in block.
-        // If there are no outstanding packets and the last packet
-        // was not the last one in the current block, then create a
-        // packet with empty payload.
-        if (currentPacket == null && bytesCurBlock != 0) {
-          currentPacket = new Packet(packetSize, chunksPerPacket,
-              bytesCurBlock);
-        }
         if (currentPacket != null) { 
+          waitAndQueuePacket(currentPacket);
+        }
+
+        if (bytesCurBlock != 0) {
+          // send an empty packet to mark the end of the block
+          currentPacket = new Packet(DataNode.PKT_HEADER_LEN+4, 0, 
+              bytesCurBlock);
           currentPacket.lastPacketInBlock = true;
         }
 
         flushInternal();             // flush all data to Datanodes
-        closeThreads();
-        completeFile();
+        // get last block before destroying the streamer
+        Block lastBlock = streamer.getBlock();
+        closeThreads(false);
+        completeFile(lastBlock);
         leasechecker.remove(src);
       } finally {
         closed = true;
@@ -3444,11 +3609,11 @@
 
     // should be called holding (this) lock since setTestFilename() may 
     // be called during unit tests
-    private void completeFile() throws IOException {
+    private void completeFile(Block last) throws IOException {
       long localstart = System.currentTimeMillis();
       boolean fileComplete = false;
       while (!fileComplete) {
-        fileComplete = namenode.complete(src, clientName);
+        fileComplete = namenode.complete(src, clientName, last);
         if (!fileComplete) {
           if (!clientRunning ||
                 (hdfsTimeout > 0 &&

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Wed Sep 30 23:39:30 2009
@@ -18,6 +18,10 @@
 package org.apache.hadoop.hdfs.protocol;
 
 import java.util.Iterator;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.server.common.HdfsConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 
 /**
  * This class provides an interface for accessing list of blocks that
@@ -25,41 +29,82 @@
  * This class is useful for block report. Rather than send block reports
  * as a Block[] we can send it as a long[].
  *
+ * The structure of the array is as follows:
+ * 0: the length of the finalized replica list;
+ * 1: the length of the under-construction replica list;
+ * - followed by finalized replica list where each replica is represented by
+ *   3 longs: one for the blockId, one for the block length, and one for
+ *   the generation stamp;
+ * - followed by the invalid replica represented with three -1s;
+ * - followed by the under-construction replica list where each replica is
+ *   represented by 4 longs: three for the block id, length, generation 
+ *   stamp, and the forth for the replica state.
  */
-public class BlockListAsLongs implements Iterable<Block>{
+public class BlockListAsLongs implements Iterable<Block> {
   /**
-   * A block as 3 longs
+   * A finalized block as 3 longs
    *   block-id and block length and generation stamp
    */
-  private static final int LONGS_PER_BLOCK = 3;
-  
-  private static int index2BlockId(int index) {
-    return index*LONGS_PER_BLOCK;
-  }
-  private static int index2BlockLen(int index) {
-    return (index*LONGS_PER_BLOCK) + 1;
-  }
-  private static int index2BlockGenStamp(int index) {
-    return (index*LONGS_PER_BLOCK) + 2;
+  private static final int LONGS_PER_FINALIZED_BLOCK = 3;
+
+  /**
+   * An under-construction block as 4 longs
+   *   block-id and block length, generation stamp and replica state
+   */
+  private static final int LONGS_PER_UC_BLOCK = 4;
+
+  /** Number of longs in the header */
+  private static final int HEADER_SIZE = 2;
+
+  /**
+   * Returns the index of the first long in blockList
+   * belonging to the specified block.
+   * The first long contains the block id.
+   */
+  private int index2BlockId(int blockIndex) {
+    if(blockIndex < 0 || blockIndex > getNumberOfBlocks())
+      return -1;
+    int finalizedSize = getNumberOfFinalizedReplicas();
+    if(blockIndex < finalizedSize)
+      return HEADER_SIZE + blockIndex * LONGS_PER_FINALIZED_BLOCK;
+    return HEADER_SIZE + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+            + (blockIndex - finalizedSize) * LONGS_PER_UC_BLOCK;
   }
-  
+
   private long[] blockList;
   
   /**
-   * Converting a block[] to a long[]
-   * @param blockArray - the input array block[]
-   * @return the output array of long[]
+   * Create block report from finalized and under construction lists of blocks.
+   * 
+   * @param finalized - list of finalized blocks
+   * @param uc - list of under construction blocks
    */
-  public static long[] convertToArrayLongs(final Block[] blockArray) {
-    long[] blocksAsLongs = new long[blockArray.length * LONGS_PER_BLOCK];
+  public BlockListAsLongs(final List<? extends Block> finalized,
+                          final List<ReplicaInfo> uc) {
+    int finalizedSize = finalized == null ? 0 : finalized.size();
+    int ucSize = uc == null ? 0 : uc.size();
+    int len = HEADER_SIZE
+              + (finalizedSize + 1) * LONGS_PER_FINALIZED_BLOCK
+              + ucSize * LONGS_PER_UC_BLOCK;
+
+    blockList = new long[len];
+
+    // set the header
+    blockList[0] = finalizedSize;
+    blockList[1] = ucSize;
+
+    // set finalized blocks
+    for (int i = 0; i < finalizedSize; i++) {
+      setBlock(i, finalized.get(i));
+    }
 
-    BlockListAsLongs bl = new BlockListAsLongs(blocksAsLongs);
-    assert bl.getNumberOfBlocks() == blockArray.length;
+    // set invalid delimiting block
+    setDelimitingBlock(finalizedSize);
 
-    for (int i = 0; i < blockArray.length; i++) {
-      bl.setBlock(i, blockArray[i]);
+    // set under construction blocks
+    for (int i = 0; i < ucSize; i++) {
+      setBlock(finalizedSize + i, uc.get(i));
     }
-    return blocksAsLongs;
   }
 
   public BlockListAsLongs() {
@@ -72,27 +117,29 @@
    */
   public BlockListAsLongs(final long[] iBlockList) {
     if (iBlockList == null) {
-      blockList = new long[0];
-    } else {
-      if (iBlockList.length%LONGS_PER_BLOCK != 0) {
-        // must be multiple of LONGS_PER_BLOCK
-        throw new IllegalArgumentException();
-      }
-      blockList = iBlockList;
+      blockList = new long[HEADER_SIZE];
+      return;
     }
+    blockList = iBlockList;
+  }
+
+  public long[] getBlockListAsLongs() {
+    return blockList;
   }
 
   /**
    * Iterates over blocks in the block report.
    * Avoids object allocation on each iteration.
    */
-  private class BlockReportIterator implements Iterator<Block> {
+  public class BlockReportIterator implements Iterator<Block> {
     private int currentBlockIndex;
     private Block block;
+    private ReplicaState currentReplicaState;
 
     BlockReportIterator() {
       this.currentBlockIndex = 0;
       this.block = new Block();
+      this.currentReplicaState = null;
     }
 
     public boolean hasNext() {
@@ -100,22 +147,39 @@
     }
 
     public Block next() {
-      block.set(blockList[index2BlockId(currentBlockIndex)],
-                blockList[index2BlockLen(currentBlockIndex)],
-                blockList[index2BlockGenStamp(currentBlockIndex)]);
+      block.set(blockId(currentBlockIndex),
+                blockLength(currentBlockIndex),
+                blockGenerationStamp(currentBlockIndex));
+      currentReplicaState = blockReplicaState(currentBlockIndex);
       currentBlockIndex++;
       return block;
     }
 
-    public void remove()  {
+    public void remove() {
       throw new UnsupportedOperationException("Sorry. can't remove.");
     }
+
+    /**
+     * Get the state of the current replica.
+     * The state corresponds to the replica returned
+     * by the latest {@link #next()}. 
+     */
+    public ReplicaState getCurrentReplicaState() {
+      return currentReplicaState;
+    }
   }
 
   /**
    * Returns an iterator over blocks in the block report. 
    */
   public Iterator<Block> iterator() {
+    return getBlockReportIterator();
+  }
+
+  /**
+   * Returns {@link BlockReportIterator}. 
+   */
+  public BlockReportIterator getBlockReportIterator() {
     return new BlockReportIterator();
   }
 
@@ -124,7 +188,55 @@
    * @return - the number of blocks
    */
   public int getNumberOfBlocks() {
-    return blockList.length/LONGS_PER_BLOCK;
+    assert blockList.length == HEADER_SIZE + 
+            (blockList[0] + 1) * LONGS_PER_FINALIZED_BLOCK +
+            blockList[1] * LONGS_PER_UC_BLOCK :
+              "Number of blocks is inconcistent with the array length";
+    return getNumberOfFinalizedReplicas() + getNumberOfUCReplicas();
+  }
+
+  /**
+   * Returns the number of finalized replicas in the block report.
+   */
+  private int getNumberOfFinalizedReplicas() {
+    return (int)blockList[0];
+  }
+
+  /**
+   * Returns the number of under construction replicas in the block report.
+   */
+  private int getNumberOfUCReplicas() {
+    return (int)blockList[1];
+  }
+
+  /**
+   * Returns the id of the specified replica of the block report.
+   */
+  private long blockId(int index) {
+    return blockList[index2BlockId(index)];
+  }
+
+  /**
+   * Returns the length of the specified replica of the block report.
+   */
+  private long blockLength(int index) {
+    return blockList[index2BlockId(index) + 1];
+  }
+
+  /**
+   * Returns the generation stamp of the specified replica of the block report.
+   */
+  private long blockGenerationStamp(int index) {
+    return blockList[index2BlockId(index) + 2];
+  }
+
+  /**
+   * Returns the state of the specified replica of the block report.
+   */
+  private ReplicaState blockReplicaState(int index) {
+    if(index < getNumberOfFinalizedReplicas())
+      return ReplicaState.FINALIZED;
+    return ReplicaState.getState((int)blockList[index2BlockId(index) + 3]);
   }
 
   /**
@@ -134,7 +246,7 @@
    */
   @Deprecated
   public long getBlockId(final int index)  {
-    return blockList[index2BlockId(index)];
+    return blockId(index);
   }
   
   /**
@@ -144,7 +256,7 @@
    */
   @Deprecated
   public long getBlockLen(final int index)  {
-    return blockList[index2BlockLen(index)];
+    return blockLength(index);
   }
 
   /**
@@ -154,7 +266,7 @@
    */
   @Deprecated
   public long getBlockGenStamp(final int index)  {
-    return blockList[index2BlockGenStamp(index)];
+    return blockGenerationStamp(index);
   }
   
   /**
@@ -162,9 +274,28 @@
    * @param index - the index of the block to set
    * @param b - the block is set to the value of the this block
    */
-  private void setBlock(final int index, final Block b) {
-    blockList[index2BlockId(index)] = b.getBlockId();
-    blockList[index2BlockLen(index)] = b.getNumBytes();
-    blockList[index2BlockGenStamp(index)] = b.getGenerationStamp();
+  private <T extends Block> void setBlock(final int index, final T b) {
+    int pos = index2BlockId(index);
+    blockList[pos] = b.getBlockId();
+    blockList[pos + 1] = b.getNumBytes();
+    blockList[pos + 2] = b.getGenerationStamp();
+    if(index < getNumberOfFinalizedReplicas())
+      return;
+    assert ((ReplicaInfo)b).getState() != ReplicaState.FINALIZED :
+      "Must be under-construction replica.";
+    blockList[pos + 3] = ((ReplicaInfo)b).getState().getValue();
+  }
+
+  /**
+   * Set the invalid delimiting block between the finalized and
+   * the under-construction lists.
+   * The invalid block has all three fields set to -1.
+   * @param finalizedSzie - the size of the finalized list
+   */
+  private void setDelimitingBlock(final int finalizedSzie) {
+    int idx = HEADER_SIZE + finalizedSzie * LONGS_PER_FINALIZED_BLOCK;
+    blockList[idx] = -1;
+    blockList[idx+1] = -1;
+    blockList[idx+2] = -1;
   }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Sep 30 23:39:30 2009
@@ -29,19 +29,10 @@
   public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
 
   /**
-   * 4: never return null and always return a newly generated access token
+   * 6: recoverBlock() removed.
    */
-  public static final long versionID = 4L;
+  public static final long versionID = 6L;
 
-  /** Start generation-stamp recovery for specified block
-   * @param block the specified block
-   * @param keepLength keep the block length
-   * @param targets the list of possible locations of specified block
-   * @return either a new generation stamp, or the original generation stamp. 
-   * Regardless of whether a new generation stamp is returned, a newly 
-   * generated access token is returned as part of the return value.
-   * @throws IOException
-   */
-  LocatedBlock recoverBlock(Block block, boolean keepLength,
-      DatanodeInfo[] targets) throws IOException;
+  /** Return the visible length of a replica. */
+  long getReplicaVisibleLength(Block b) throws IOException;
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Sep 30 23:39:30 2009
@@ -44,9 +44,9 @@
    * Compared to the previous version the following changes have been introduced:
    * (Only the latest change is reflected.
    * The log of historical changes can be retrieved from the svn).
-   * 48: modified mkdirs() to take an additional boolean parameter
+   * 50: change LocatedBlocks to include last block information.
    */
-  public static final long versionID = 48L;
+  public static final long versionID = 50L;
   
   ///////////////////////////////////////
   // File contents
@@ -93,8 +93,8 @@
    * {@link #rename(String, String)} it until the file is completed
    * or explicitly as a result of lease expiration.
    * <p>
-   * Blocks have a maximum size.  Clients that intend to
-   * create multi-block files must also use {@link #addBlock(String, String)}.
+   * Blocks have a maximum size.  Clients that intend to create
+   * multi-block files must also use {@link #addBlock(String, String, Block)}.
    *
    * @param src path of the file being created.
    * @param masked masked permission.
@@ -187,9 +187,14 @@
    * addBlock() allocates a new block and datanodes the block data
    * should be replicated to.
    * 
+   * addBlock() also commits the previous block by reporting
+   * to the name-node the actual generation stamp and the length
+   * of the block that the client has transmitted to data-nodes.
+   * 
    * @return LocatedBlock allocated block information.
    */
-  public LocatedBlock addBlock(String src, String clientName) throws IOException;
+  public LocatedBlock addBlock(String src, String clientName,
+                               Block previous) throws IOException;
 
   /**
    * The client is done writing data to the given filename, and would 
@@ -197,13 +202,18 @@
    *
    * The function returns whether the file has been closed successfully.
    * If the function returns false, the caller should try again.
+   * 
+   * close() also commits the last block of the file by reporting
+   * to the name-node the actual generation stamp and the length
+   * of the block that the client has transmitted to data-nodes.
    *
    * A call to complete() will not return true until all the file's
    * blocks have been replicated the minimum number of times.  Thus,
    * DataNode failures may cause a client to call complete() several
    * times before succeeding.
    */
-  public boolean complete(String src, String clientName) throws IOException;
+  public boolean complete(String src, String clientName,
+                          Block last) throws IOException;
 
   /**
    * The client wants to report corrupted blocks (blocks with specified
@@ -500,4 +510,32 @@
    *              by this call.
    */
   public void setTimes(String src, long mtime, long atime) throws IOException;
+  
+  /**
+   * Get a new generation stamp together with an access token for 
+   * a block under construction
+   * 
+   * This method is called only when a client needs to recover a failed
+   * pipeline or set up a pipeline for appending to a block.
+   * 
+   * @param block a block
+   * @param clientName the name of the client
+   * @return a located block with a new generation stamp and an access token
+   * @throws IOException if any error occurs
+   */
+  public LocatedBlock updateBlockForPipeline(Block block, String clientName) 
+  throws IOException;
+
+  /**
+   * Update a pipeline for a block under construction
+   * 
+   * @param clientName the name of the client
+   * @param oldBlock the old block
+   * @param newBlock the new block containing new generation stamp and length
+   * @param newNodes datanodes in the pipeline
+   * @throws IOException if any error occurs
+   */
+  public void updatePipeline(String clientName, Block oldBlock, 
+      Block newBlock, DatanodeID[] newNodes)
+  throws IOException;
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/DataTransferProtocol.java Wed Sep 30 23:39:30 2009
@@ -25,6 +25,7 @@
 import java.io.OutputStream;
 
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.security.AccessToken;
 
 /**
@@ -38,12 +39,12 @@
    * when protocol changes. It is not very obvious. 
    */
   /*
-   * Version 16:
-   *    Datanode now needs to send back a status code together 
-   *    with firstBadLink during pipeline setup for dfs write
-   *    (only for DFSClients, not for other datanodes).
+   * Version 17:
+   *    Change the block write protocol to support pipeline recovery.
+   *    Additional fields, like recovery flags, new GS, minBytesRcvd, 
+   *    and maxBytesRcvd are included.
    */
-  public static final int DATA_TRANSFER_VERSION = 16;
+  public static final int DATA_TRANSFER_VERSION = 17;
 
   /** Operation */
   public enum Op {
@@ -119,6 +120,55 @@
     }
   };
   
+  public enum BlockConstructionStage {
+    /** The enumerates are always listed as regular stage followed by the
+     * recovery stage. 
+     * Changing this order will make getRecoveryStage not working.
+     */
+    // pipeline set up for block append
+    PIPELINE_SETUP_APPEND,
+    // pipeline set up for failed PIPELINE_SETUP_APPEND recovery
+    PIPELINE_SETUP_APPEND_RECOVERY,
+    // data streaming
+    DATA_STREAMING,
+    // pipeline setup for failed data streaming recovery
+    PIPELINE_SETUP_STREAMING_RECOVERY,
+    // close the block and pipeline
+    PIPELINE_CLOSE,
+    // Recover a failed PIPELINE_CLOSE
+    PIPELINE_CLOSE_RECOVERY,
+    // pipeline set up for block creation
+    PIPELINE_SETUP_CREATE;
+    
+    final static private byte RECOVERY_BIT = (byte)1;
+    
+    /**
+     * get the recovery stage of this stage
+     */
+    public BlockConstructionStage getRecoveryStage() {
+      if (this == PIPELINE_SETUP_CREATE) {
+        throw new IllegalArgumentException( "Unexpected blockStage " + this);
+      } else {
+        return values()[ordinal()|RECOVERY_BIT];
+      }
+    }
+    
+    private static BlockConstructionStage valueOf(byte code) {
+      return code < 0 || code >= values().length? null: values()[code];
+    }
+    
+    /** Read from in */
+    private static BlockConstructionStage readFields(DataInput in)
+    throws IOException {
+      return valueOf(in.readByte());
+    }
+
+    /** write to out */
+    private void write(DataOutput out) throws IOException {
+      out.writeByte(ordinal());
+    }
+  }    
+
   /** @deprecated Deprecated at 0.21.  Use Op.WRITE_BLOCK instead. */
   @Deprecated
   public static final byte OP_WRITE_BLOCK = Op.WRITE_BLOCK.code;
@@ -187,15 +237,19 @@
     
     /** Send OP_WRITE_BLOCK */
     public static void opWriteBlock(DataOutputStream out,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
-        String client, DatanodeInfo src, DatanodeInfo[] targets,
-        AccessToken accesstoken) throws IOException {
+        long blockId, long blockGs, int pipelineSize, 
+        BlockConstructionStage stage, long newGs, long minBytesRcvd,
+        long maxBytesRcvd, String client, DatanodeInfo src, 
+        DatanodeInfo[] targets, AccessToken accesstoken) throws IOException {
       op(out, Op.WRITE_BLOCK);
 
       out.writeLong(blockId);
       out.writeLong(blockGs);
       out.writeInt(pipelineSize);
-      out.writeBoolean(isRecovery);
+      stage.write(out);
+      WritableUtils.writeVLong(out, newGs);
+      WritableUtils.writeVLong(out, minBytesRcvd);
+      WritableUtils.writeVLong(out, maxBytesRcvd);
       Text.writeString(out, client);
 
       out.writeBoolean(src != null);
@@ -307,7 +361,11 @@
       final long blockId = in.readLong();          
       final long blockGs = in.readLong();
       final int pipelineSize = in.readInt(); // num of datanodes in entire pipeline
-      final boolean isRecovery = in.readBoolean(); // is this part of recovery?
+      final BlockConstructionStage stage = 
+        BlockConstructionStage.readFields(in);
+      final long newGs = WritableUtils.readVLong(in);
+      final long minBytesRcvd = WritableUtils.readVLong(in);
+      final long maxBytesRcvd = WritableUtils.readVLong(in);
       final String client = Text.readString(in); // working on behalf of this client
       final DatanodeInfo src = in.readBoolean()? DatanodeInfo.read(in): null;
 
@@ -321,8 +379,8 @@
       }
       final AccessToken accesstoken = readAccessToken(in);
 
-      opWriteBlock(in, blockId, blockGs, pipelineSize, isRecovery,
-          client, src, targets, accesstoken);
+      opWriteBlock(in, blockId, blockGs, pipelineSize, stage,
+          newGs, minBytesRcvd, maxBytesRcvd, client, src, targets, accesstoken);
     }
 
     /**
@@ -330,7 +388,9 @@
      * Write a block.
      */
     protected abstract void opWriteBlock(DataInputStream in,
-        long blockId, long blockGs, int pipelineSize, boolean isRecovery,
+        long blockId, long blockGs,
+        int pipelineSize, BlockConstructionStage stage,
+        long newGs, long minBytesRcvd, long maxBytesRcvd,
         String client, DatanodeInfo src, DatanodeInfo[] targets,
         AccessToken accesstoken) throws IOException;
 

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Wed Sep 30 23:39:30 2009
@@ -90,7 +90,9 @@
   // Version is reflected in the data storage file.
   // Versions are negative.
   // Decrement LAYOUT_VERSION to define a new version.
-  public static final int LAYOUT_VERSION = -19;
+  public static final int LAYOUT_VERSION = -20;
   // Current version: 
-  // -19: Sticky bit
+  // -20: DataNode adds a "rbw" sub directory to data directory
+  //      current dir contains "finalized" subdir for finalized replicas
+  //      and "rbw" subdir for replicas being written to.
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Sep 30 23:39:30 2009
@@ -145,4 +145,21 @@
       locs[i].readFields(in);
     }
   }
+
+  /** Read LocatedBlock from in. */
+  public static LocatedBlock read(DataInput in) throws IOException {
+    final LocatedBlock lb = new LocatedBlock();
+    lb.readFields(in);
+    return lb;
+  }
+
+  /** {@inheritDoc} */
+  public String toString() {
+    return getClass().getSimpleName() + "{" + b
+        + "; getBlockSize()=" + getBlockSize()
+        + "; corrupt=" + corrupt
+        + "; offset=" + offset
+        + "; locs=" + java.util.Arrays.asList(locs)
+        + "}";
+  }
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=820497&r1=820496&r2=820497&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Wed Sep 30 23:39:30 2009
@@ -36,6 +36,8 @@
   private long fileLength;
   private List<LocatedBlock> blocks; // array of blocks with prioritized locations
   private boolean underConstruction;
+  private LocatedBlock lastLocatedBlock = null;
+  private boolean isLastBlockComplete = false;
 
   LocatedBlocks() {
     fileLength = 0;
@@ -43,11 +45,15 @@
     underConstruction = false;
   }
   
-  public LocatedBlocks(long flength, List<LocatedBlock> blks, boolean isUnderConstuction) {
-
+  /** public Constructor */
+  public LocatedBlocks(long flength, boolean isUnderConstuction,
+      List<LocatedBlock> blks, 
+      LocatedBlock lastBlock, boolean isLastBlockCompleted) {
     fileLength = flength;
     blocks = blks;
     underConstruction = isUnderConstuction;
+    this.lastLocatedBlock = lastBlock;
+    this.isLastBlockComplete = isLastBlockCompleted;
   }
   
   /**
@@ -57,6 +63,16 @@
     return blocks;
   }
   
+  /** Get the last located block. */
+  public LocatedBlock getLastLocatedBlock() {
+    return lastLocatedBlock;
+  }
+  
+  /** Is the last block completed? */
+  public boolean isLastBlockComplete() {
+    return isLastBlockComplete;
+  }
+
   /**
    * Get located block.
    */
@@ -161,6 +177,15 @@
   public void write(DataOutput out) throws IOException {
     out.writeLong(this.fileLength);
     out.writeBoolean(underConstruction);
+
+    //write the last located block
+    final boolean isNull = lastLocatedBlock == null;
+    out.writeBoolean(isNull);
+    if (!isNull) {
+      lastLocatedBlock.write(out);
+    }
+    out.writeBoolean(isLastBlockComplete);
+
     // write located blocks
     int nrBlocks = locatedBlockCount();
     out.writeInt(nrBlocks);
@@ -175,6 +200,14 @@
   public void readFields(DataInput in) throws IOException {
     this.fileLength = in.readLong();
     underConstruction = in.readBoolean();
+
+    //read the last located block
+    final boolean isNull = in.readBoolean();
+    if (!isNull) {
+      lastLocatedBlock = LocatedBlock.read(in);
+    }
+    isLastBlockComplete = in.readBoolean();
+
     // read located blocks
     int nrBlocks = in.readInt();
     this.blocks = new ArrayList<LocatedBlock>(nrBlocks);
@@ -184,4 +217,18 @@
       this.blocks.add(blk);
     }
   }
+
+  /** {@inheritDoc} */
+  @Override
+  public String toString() {
+    final StringBuilder b = new StringBuilder(getClass().getSimpleName());
+    b.append("{")
+     .append("\n  fileLength=").append(fileLength)
+     .append("\n  underConstruction=").append(underConstruction)
+     .append("\n  blocks=").append(blocks)
+     .append("\n  lastLocatedBlock=").append(lastLocatedBlock)
+     .append("\n  isLastBlockComplete=").append(isLastBlockComplete)
+     .append("}");
+    return b.toString();
+  }
 }

Added: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java?rev=820497&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java (added)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/RecoveryInProgressException.java Wed Sep 30 23:39:30 2009
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.protocol;
+
+import java.io.IOException;
+
+/**
+ * Exception indicating that a replica is already being recovery.
+ */
+public class RecoveryInProgressException extends IOException {
+  private static final long serialVersionUID = 1L;
+
+  public RecoveryInProgressException(String msg) {
+    super(msg);
+  }
+}
\ No newline at end of file



Mime
View raw message