hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a..@apache.org
Subject svn commit: r1202013 [1/3] - in /hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/apache/hadoop/hdfs/...
Date Tue, 15 Nov 2011 02:39:29 GMT
Author: atm
Date: Tue Nov 15 02:39:13 2011
New Revision: 1202013

URL: http://svn.apache.org/viewvc?rev=1202013&view=rev
Log:
Merge trunk into HA branch

Added:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/DirectBufferPool.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightLinkedSet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DoAsParam.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/DoAsParam.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfo.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestDirectBufferPool.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightHashSet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/util/TestLightWeightLinkedSet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestOffsetUrlInputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
      - copied unchanged from r1202009, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/WebHdfsTestUtil.java
Modified:
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/JspHelper.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DatanodeJspHelper.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/resources/DatanodeWebHdfsMethods.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSck.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/AuthFilter.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ParamFilter.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/ExceptionHandler.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/GetOpParam.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/HttpOpParam.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/resources/UserProvider.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/native/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/   (props changed)
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientBlockVerification.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestConnCache.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDfsOverAvroRpc.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestLocalDFS.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestSeekBug.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestDelegationTokenForProxyUser.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeJsp.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsFileSystemContract.java
    hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestWebHdfsUrl.java

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 02:39:13 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1196451
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:1152502-1202009
 /hadoop/core/branches/branch-0.19/hdfs:713112
 /hadoop/hdfs/branches/HDFS-1052:987665-1095512
 /hadoop/hdfs/branches/HDFS-265:796829-820463

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Nov 15 02:39:13 2011
@@ -52,6 +52,18 @@ Trunk (unreleased changes)
 
     HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra)
 
+
+  OPTIMIZATIONS
+    HDFS-2477. Optimize computing the diff between a block report and the
+               namenode state. (Tomasz Nykiel via hairong)
+
+    HDFS-2495. Increase granularity of write operations in ReplicationMonitor
+    thus reducing contention for write lock. (Tomasz Nykiel via hairong)
+
+    HDFS-2476. More CPU efficient data structure for under-replicated,
+               over-replicated, and invalidated blocks.
+               (Tomasz Nykiel via todd)
+
   BUG FIXES
     HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)
 
@@ -88,7 +100,25 @@ Trunk (unreleased changes)
     HDFS-2526. (Client)NamenodeProtocolTranslatorR23 do not need to keep a
                reference to rpcProxyWithoutRetry (atm)
 
-    HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra)
+Release 0.23.1 - UNRELEASED
+
+  INCOMPATIBLE CHANGES
+
+  NEW FEATURES
+
+  IMPROVEMENTS
+
+  OPTIMIZATIONS
+
+    HDFS-2130. Switch default checksum to CRC32C. (todd)
+
+    HDFS-2533. Remove needless synchronization on some FSDataSet methods.
+    (todd)
+
+    HDFS-2129. Simplify BlockReader to not inherit from FSInputChecker.
+    (todd)
+
+  BUG FIXES
 
 Release 0.23.0 - 2011-11-01 
 
@@ -416,6 +446,9 @@ Release 0.23.0 - 2011-11-01 
     HDFS-2385. Support renew and cancel delegation tokens in webhdfs.
     (szetszwo)
 
+    HDFS-2539. Support doAs and GETHOMEDIRECTORY in webhdfs.
+    (szetszwo)
+
   IMPROVEMENTS
 
     HDFS-1875. MiniDFSCluster hard-codes dfs.datanode.address to localhost
@@ -1246,6 +1279,22 @@ Release 0.23.0 - 2011-11-01 
     HDFS-2065. Add null checks in DFSClient.getFileChecksum(..).  (Uma
     Maheswara Rao G via szetszwo)
 
+    HDFS-2416. distcp with a webhdfs uri on a secure cluster fails. (jitendra)
+
+    HDFS-2527. WebHdfs: remove the use of "Range" header in Open; use ugi
+    username if renewer parameter is null in GetDelegationToken; response OK
+    when setting replication for non-files; rename GETFILEBLOCKLOCATIONS to
+    GET_BLOCK_LOCATIONS and state that it is a private unstable API; replace
+    isDirectory and isSymlink with enum {FILE, DIRECTORY, SYMLINK} in
+    HdfsFileStatus JSON object.  (szetszwo)
+
+    HDFS-2528. Webhdfs: set delegation kind to WEBHDFS and add a HDFS token
+    when http requests are redirected to datanode.  (szetszwo)
+
+    HDFS-2540. Webhdfs: change "Expect: 100-continue" to two-step write; change 
+    "HdfsFileStatus" and "localName" respectively to "FileStatus" and
+    "pathSuffix" in JSON response.  (szetszwo)
+
   BREAKDOWN OF HDFS-1073 SUBTASKS
 
     HDFS-1521. Persist transaction ID on disk between NN restarts.

Propchange: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue Nov 15 02:39:13 2011
@@ -1,4 +1,4 @@
-/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1196451
+/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:1159757-1202009
 /hadoop/core/branches/branch-0.19/hdfs/src/java:713112
 /hadoop/core/branches/branch-0.19/hdfs/src/main/java:713112
 /hadoop/core/trunk/src/hdfs:776175-785643,785929-786278

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Tue Nov 15 02:39:13 2011
@@ -20,14 +20,11 @@ package org.apache.hadoop.hdfs;
 import java.io.IOException;
 import java.net.Socket;
 
-import org.apache.hadoop.fs.PositionedReadable;
-import org.apache.hadoop.fs.Seekable;
-
 /**
  * A BlockReader is responsible for reading a single block
  * from a single datanode.
  */
-public interface BlockReader extends Seekable, PositionedReadable {
+public interface BlockReader {
 
   /* same interface as inputStream java.io.InputStream#read()
    * used by DFSInputStream#read()
@@ -43,16 +40,21 @@ public interface BlockReader extends See
    */
   long skip(long n) throws IOException;
 
+  void close() throws IOException;
+
   /**
-   * Read a single byte, returning -1 at enf of stream.
+   * Read exactly the given amount of data, throwing an exception
+   * if EOF is reached before that amount
    */
-  int read() throws IOException;
-
-  void close() throws IOException;
+  void readFully(byte[] buf, int readOffset, int amtToRead) throws IOException;
 
   /**
-   * kind of like readFully(). Only reads as much as possible.
-   * And allows use of protected readFully().
+   * Similar to {@link #readFully(byte[], int, int)} except that it will
+   * not throw an exception on EOF. However, it differs from the simple
+   * {@link #read(byte[], int, int)} call in that it is guaranteed to
+   * read the data if it is available. In other words, if this call
+   * does not throw an exception, then either the buffer has been
+   * filled or the next call will return EOF.
    */
   int readAll(byte[] buf, int offset, int len) throws IOException;
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Tue Nov 15 02:39:13 2011
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress;
 import java.net.Socket;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.security.token.Token;
@@ -32,17 +34,26 @@ import org.apache.hadoop.security.token.
  */
 @InterfaceAudience.Private
 public class BlockReaderFactory {
-  public static BlockReader newBlockReader(Socket sock, String file,
+  /**
+   * @see #newBlockReader(Conf, Socket, String, ExtendedBlock, Token, long, long, int, boolean, String)
+   */
+  public static BlockReader newBlockReader(
+      Configuration conf,
+      Socket sock, String file,
       ExtendedBlock block, Token<BlockTokenIdentifier> blockToken, 
-      long startOffset, long len, int bufferSize) throws IOException {
-    return newBlockReader(sock, file, block, blockToken, startOffset,
+      long startOffset, long len) throws IOException {
+    int bufferSize = conf.getInt(DFSConfigKeys.IO_FILE_BUFFER_SIZE_KEY,
+        DFSConfigKeys.IO_FILE_BUFFER_SIZE_DEFAULT);
+    return newBlockReader(new Conf(conf),
+        sock, file, block, blockToken, startOffset,
         len, bufferSize, true, "");
   }
 
   /**
    * Create a new BlockReader specifically to satisfy a read.
    * This method also sends the OP_READ_BLOCK request.
-   *
+   * 
+   * @param conf the DFSClient configuration
    * @param sock  An established Socket to the DN. The BlockReader will not close it normally
    * @param file  File location
    * @param block  The block object
@@ -54,7 +65,9 @@ public class BlockReaderFactory {
    * @param clientName  Client name
    * @return New BlockReader instance, or null on error.
    */
+  @SuppressWarnings("deprecation")
   public static BlockReader newBlockReader(
+                                     Conf conf,
                                      Socket sock, String file,
                                      ExtendedBlock block, 
                                      Token<BlockTokenIdentifier> blockToken,
@@ -62,8 +75,13 @@ public class BlockReaderFactory {
                                      int bufferSize, boolean verifyChecksum,
                                      String clientName)
                                      throws IOException {
-    return RemoteBlockReader.newBlockReader(
-        sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    if (conf.useLegacyBlockReader) {
+      return RemoteBlockReader.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);
+    } else {
+      return RemoteBlockReader2.newBlockReader(
+          sock, file, block, blockToken, startOffset, len, bufferSize, verifyChecksum, clientName);      
+    }
   }
   
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java Tue Nov 15 02:39:13 2011
@@ -18,17 +18,13 @@
 
 package org.apache.hadoop.hdfs;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
-import java.net.MalformedURLException;
 import java.net.URL;
-import java.util.StringTokenizer;
 
 import org.apache.hadoop.fs.FSInputStream;
 import org.apache.hadoop.hdfs.server.namenode.StreamFile;
-import org.apache.hadoop.hdfs.web.resources.OffsetParam;
 
 /**
  * To support HTTP byte streams, a new connection to an HTTP server needs to be
@@ -37,16 +33,14 @@ import org.apache.hadoop.hdfs.web.resour
  * is made on the successive read(). The normal input stream functions are 
  * connected to the currently active input stream. 
  */
-public class ByteRangeInputStream extends FSInputStream {
+public abstract class ByteRangeInputStream extends FSInputStream {
   
   /**
    * This class wraps a URL and provides method to open connection.
    * It can be overridden to change how a connection is opened.
    */
-  public static class URLOpener {
+  public static abstract class URLOpener {
     protected URL url;
-    /** The url with offset parameter */
-    protected URL offsetUrl;
   
     public URLOpener(URL u) {
       url = u;
@@ -60,52 +54,9 @@ public class ByteRangeInputStream extend
       return url;
     }
 
-    protected HttpURLConnection openConnection() throws IOException {
-      return (HttpURLConnection)offsetUrl.openConnection();
-    }
+    protected abstract HttpURLConnection openConnection() throws IOException;
 
-    private HttpURLConnection openConnection(final long offset) throws IOException {
-      offsetUrl = offset == 0L? url: new URL(url + "&" + new OffsetParam(offset));
-      final HttpURLConnection conn = openConnection();
-      conn.setRequestMethod("GET");
-      if (offset != 0L) {
-        conn.setRequestProperty("Range", "bytes=" + offset + "-");
-      }
-      return conn;
-    }  
-  }
-  
-  static private final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
-
-  /** Remove offset parameter, if there is any, from the url */
-  static URL removeOffsetParam(final URL url) throws MalformedURLException {
-    String query = url.getQuery();
-    if (query == null) {
-      return url;
-    }
-    final String lower = query.toLowerCase();
-    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
-        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
-      return url;
-    }
-
-    //rebuild query
-    StringBuilder b = null;
-    for(final StringTokenizer st = new StringTokenizer(query, "&");
-        st.hasMoreTokens();) {
-      final String token = st.nextToken();
-      if (!token.toLowerCase().startsWith(OFFSET_PARAM_PREFIX)) {
-        if (b == null) {
-          b = new StringBuilder("?").append(token);
-        } else {
-          b.append('&').append(token);
-        }
-      }
-    }
-    query = b == null? "": b.toString();
-
-    final String urlStr = url.toString();
-    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+    protected abstract HttpURLConnection openConnection(final long offset) throws IOException;
   }
 
   enum StreamStatus {
@@ -120,11 +71,6 @@ public class ByteRangeInputStream extend
 
   StreamStatus status = StreamStatus.SEEK;
 
-  /** Create an input stream with the URL. */
-  public ByteRangeInputStream(final URL url) {
-    this(new URLOpener(url), new URLOpener(null));
-  }
-  
   /**
    * Create with the specified URLOpeners. Original url is used to open the 
    * stream for the first time. Resolved url is used in subsequent requests.
@@ -136,6 +82,12 @@ public class ByteRangeInputStream extend
     this.resolvedURL = r;
   }
   
+  protected abstract void checkResponseCode(final HttpURLConnection connection
+      ) throws IOException;
+  
+  protected abstract URL getResolvedUrl(final HttpURLConnection connection
+      ) throws IOException;
+
   private InputStream getInputStream() throws IOException {
     if (status != StreamStatus.NORMAL) {
       
@@ -150,32 +102,14 @@ public class ByteRangeInputStream extend
         (resolvedURL.getURL() == null) ? originalURL : resolvedURL;
 
       final HttpURLConnection connection = opener.openConnection(startPos);
-      try {
-        connection.connect();
-        final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
-        filelength = (cl == null) ? -1 : Long.parseLong(cl);
-        if (HftpFileSystem.LOG.isDebugEnabled()) {
-          HftpFileSystem.LOG.debug("filelength = " + filelength);
-        }
-        in = connection.getInputStream();
-      } catch (FileNotFoundException fnfe) {
-        throw fnfe;
-      } catch (IOException ioe) {
-        HftpFileSystem.throwIOExceptionFromConnection(connection, ioe);
-      }
-      
-      int respCode = connection.getResponseCode();
-      if (startPos != 0 && respCode != HttpURLConnection.HTTP_PARTIAL) {
-        // We asked for a byte range but did not receive a partial content
-        // response...
-        throw new IOException("HTTP_PARTIAL expected, received " + respCode);
-      } else if (startPos == 0 && respCode != HttpURLConnection.HTTP_OK) {
-        // We asked for all bytes from the beginning but didn't receive a 200
-        // response (none of the other 2xx codes are valid here)
-        throw new IOException("HTTP_OK expected, received " + respCode);
-      }
+      connection.connect();
+      checkResponseCode(connection);
+
+      final String cl = connection.getHeaderField(StreamFile.CONTENT_LENGTH);
+      filelength = (cl == null) ? -1 : Long.parseLong(cl);
+      in = connection.getInputStream();
 
-      resolvedURL.setURL(removeOffsetParam(connection.getURL()));
+      resolvedURL.setURL(getResolvedUrl(connection));
       status = StreamStatus.NORMAL;
     }
     

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Tue Nov 15 02:39:13 2011
@@ -100,6 +100,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
 
@@ -142,6 +143,7 @@ public class DFSClient implements java.i
     final int maxBlockAcquireFailures;
     final int confTime;
     final int ioBufferSize;
+    final int checksumType;
     final int bytesPerChecksum;
     final int writePacketSize;
     final int socketTimeout;
@@ -156,6 +158,7 @@ public class DFSClient implements java.i
     final short defaultReplication;
     final String taskId;
     final FsPermission uMask;
+    final boolean useLegacyBlockReader;
 
     Conf(Configuration conf) {
       maxBlockAcquireFailures = conf.getInt(
@@ -166,6 +169,7 @@ public class DFSClient implements java.i
       ioBufferSize = conf.getInt(
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
           CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+      checksumType = getChecksumType(conf);
       bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
           DFS_BYTES_PER_CHECKSUM_DEFAULT);
       socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@@ -192,6 +196,29 @@ public class DFSClient implements java.i
           .getInt(DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_KEY,
               DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
       uMask = FsPermission.getUMask(conf);
+      useLegacyBlockReader = conf.getBoolean(
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER,
+          DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
+    }
+
+    private int getChecksumType(Configuration conf) {
+      String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
+          DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
+      if ("CRC32".equals(checksum)) {
+        return DataChecksum.CHECKSUM_CRC32;
+      } else if ("CRC32C".equals(checksum)) {
+        return DataChecksum.CHECKSUM_CRC32C;
+      } else if ("NULL".equals(checksum)) {
+        return DataChecksum.CHECKSUM_NULL;
+      } else {
+        LOG.warn("Bad checksum type: " + checksum + ". Using default.");
+        return DataChecksum.CHECKSUM_CRC32C;
+      }
+    }
+
+    private DataChecksum createChecksum() {
+      return DataChecksum.newDataChecksum(
+          checksumType, bytesPerChecksum);
     }
   }
  
@@ -813,7 +840,7 @@ public class DFSClient implements java.i
     }
     final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
         createParent, replication, blockSize, progress, buffersize,
-        dfsClientConf.bytesPerChecksum);
+        dfsClientConf.createChecksum());
     leaserenewer.put(src, result, this);
     return result;
   }
@@ -857,9 +884,12 @@ public class DFSClient implements java.i
     CreateFlag.validate(flag);
     DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
     if (result == null) {
+      DataChecksum checksum = DataChecksum.newDataChecksum(
+          dfsClientConf.checksumType,
+          bytesPerChecksum);
       result = new DFSOutputStream(this, src, absPermission,
           flag, createParent, replication, blockSize, progress, buffersize,
-          bytesPerChecksum);
+          checksum);
     }
     leaserenewer.put(src, result, this);
     return result;
@@ -917,7 +947,7 @@ public class DFSClient implements java.i
                                      UnresolvedPathException.class);
     }
     return new DFSOutputStream(this, src, buffersize, progress,
-        lastBlock, stat, dfsClientConf.bytesPerChecksum);
+        lastBlock, stat, dfsClientConf.createChecksum());
   }
   
   /**

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue Nov 15 02:39:13 2011
@@ -38,6 +38,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
   public static final String  DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
   public static final int     DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
+  public static final String  DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
+  public static final String  DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
   public static final String  DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
   public static final int     DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
   public static final String  DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";
@@ -180,6 +182,8 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_CLIENT_BLOCK_WRITE_RETRIES_DEFAULT = 3;
   public static final String  DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_KEY = "dfs.client.max.block.acquire.failures";
   public static final int     DFS_CLIENT_MAX_BLOCK_ACQUIRE_FAILURES_DEFAULT = 3;
+  public static final String  DFS_CLIENT_USE_LEGACY_BLOCKREADER = "dfs.client.use.legacy.blockreader";
+  public static final boolean DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT = false;
   public static final String  DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth";
   public static final long    DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
   public static final String  DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Tue Nov 15 02:39:13 2011
@@ -780,7 +780,8 @@ public class DFSInputStream extends FSIn
       try {
         // The OP_READ_BLOCK request is sent as we make the BlockReader
         BlockReader reader =
-            BlockReaderFactory.newBlockReader(sock, file, block,
+            BlockReaderFactory.newBlockReader(dfsClient.getConf(),
+                                       sock, file, block,
                                        blockToken,
                                        startOffset, len,
                                        bufferSize, verifyChecksum,

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Tue Nov 15 02:39:13 2011
@@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.PureJavaCrc32;
 
 
 /****************************************************************
@@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSu
   }
 
   private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
-      int bytesPerChecksum, short replication) throws IOException {
-    super(new PureJavaCrc32(), bytesPerChecksum, 4);
+      DataChecksum checksum, short replication) throws IOException {
+    super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
+    int bytesPerChecksum = checksum.getBytesPerChecksum();
     this.dfsClient = dfsClient;
     this.src = src;
     this.blockSize = blockSize;
@@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSu
                             "multiple of io.bytes.per.checksum");
                             
     }
-    checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 
-                                            bytesPerChecksum);
+    this.checksum = checksum;
   }
 
   /**
@@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSu
    */
   DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
       boolean createParent, short replication, long blockSize, Progressable progress,
-      int buffersize, int bytesPerChecksum) 
+      int buffersize, DataChecksum checksum) 
       throws IOException {
-    this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
+    this(dfsClient, src, blockSize, progress, checksum, replication);
 
-    computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+    computePacketChunkSize(dfsClient.getConf().writePacketSize,
+        checksum.getBytesPerChecksum());
 
     try {
       dfsClient.namenode.create(
@@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSu
    */
   DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
       LocatedBlock lastBlock, HdfsFileStatus stat,
-      int bytesPerChecksum) throws IOException {
-    this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
+      DataChecksum checksum) throws IOException {
+    this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
     initialFileSize = stat.getLen(); // length of file when opened
 
     //
@@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSu
     if (lastBlock != null) {
       // indicate that we are appending to an existing block
       bytesCurBlock = lastBlock.getBlockSize();
-      streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
+      streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
     } else {
-      computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
+      computePacketChunkSize(dfsClient.getConf().writePacketSize,
+          checksum.getBytesPerChecksum());
       streamer = new DataStreamer();
     }
     streamer.start();

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Tue Nov 15 02:39:13 2011
@@ -372,13 +372,66 @@ public class HftpFileSystem extends File
     return query;
   }
 
+  static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
+    RangeHeaderUrlOpener(final URL url) {
+      super(url);
+    }
+
+    @Override
+    protected HttpURLConnection openConnection() throws IOException {
+      return (HttpURLConnection)url.openConnection();
+    }
+
+    /** Use HTTP Range header for specifying offset. */
+    @Override
+    protected HttpURLConnection openConnection(final long offset) throws IOException {
+      final HttpURLConnection conn = openConnection();
+      conn.setRequestMethod("GET");
+      if (offset != 0L) {
+        conn.setRequestProperty("Range", "bytes=" + offset + "-");
+      }
+      return conn;
+    }  
+  }
+
+  static class RangeHeaderInputStream extends ByteRangeInputStream {
+    RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) {
+      super(o, r);
+    }
+
+    RangeHeaderInputStream(final URL url) {
+      this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
+    }
+
+    /** Expects HTTP_OK and HTTP_PARTIAL response codes. */
+    @Override
+    protected void checkResponseCode(final HttpURLConnection connection
+        ) throws IOException {
+      final int code = connection.getResponseCode();
+      if (startPos != 0 && code != HttpURLConnection.HTTP_PARTIAL) {
+        // We asked for a byte range but did not receive a partial content
+        // response...
+        throw new IOException("HTTP_PARTIAL expected, received " + code);
+      } else if (startPos == 0 && code != HttpURLConnection.HTTP_OK) {
+        // We asked for all bytes from the beginning but didn't receive a 200
+        // response (none of the other 2xx codes are valid here)
+        throw new IOException("HTTP_OK expected, received " + code);
+      }
+    }
+
+    @Override
+    protected URL getResolvedUrl(final HttpURLConnection connection) {
+      return connection.getURL();
+    }
+  }
+
   @Override
   public FSDataInputStream open(Path f, int buffersize) throws IOException {
     f = f.makeQualified(getUri(), getWorkingDirectory());
     String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
     String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
     URL u = getNamenodeURL(path, query);    
-    return new FSDataInputStream(new ByteRangeInputStream(u));
+    return new FSDataInputStream(new RangeHeaderInputStream(u));
   }
 
   /** Class to parse and store a listing reply from the server. */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Tue Nov 15 02:39:13 2011
@@ -50,27 +50,13 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.util.DataChecksum;
 
 
-/** This is a wrapper around connection to datanode
- * and understands checksum, offset etc.
- *
- * Terminology:
- * <dl>
- * <dt>block</dt>
- *   <dd>The hdfs block, typically large (~64MB).
- *   </dd>
- * <dt>chunk</dt>
- *   <dd>A block is divided into chunks, each comes with a checksum.
- *       We want transfers to be chunk-aligned, to be able to
- *       verify checksums.
- *   </dd>
- * <dt>packet</dt>
- *   <dd>A grouping of chunks used for transport. It contains a
- *       header, followed by checksum data, followed by real data.
- *   </dd>
- * </dl>
- * Please see DataNode for the RPC specification.
+/**
+ * @deprecated this is an old implementation that is being left around
+ * in case any issues spring up with the new {@link RemoteBlockReader2} implementation.
+ * It will be removed in the next release.
  */
 @InterfaceAudience.Private
+@Deprecated
 public class RemoteBlockReader extends FSInputChecker implements BlockReader {
 
   Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
@@ -410,7 +396,7 @@ public class RemoteBlockReader extends F
     
     BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
         vintPrefixed(in));
-    checkSuccess(status, sock, block, file);
+    RemoteBlockReader2.checkSuccess(status, sock, block, file);
     ReadOpChecksumInfoProto checksumInfo =
       status.getReadOpChecksumInfo();
     DataChecksum checksum = DataTransferProtoUtil.fromProto(
@@ -431,28 +417,6 @@ public class RemoteBlockReader extends F
         in, checksum, verifyChecksum, startOffset, firstChunkOffset, len, sock);
   }
 
-  private static void checkSuccess(
-      BlockOpResponseProto status, Socket sock,
-      ExtendedBlock block, String file)
-      throws IOException {
-    if (status.getStatus() != Status.SUCCESS) {
-      if (status.getStatus() == Status.ERROR_ACCESS_TOKEN) {
-        throw new InvalidBlockTokenException(
-            "Got access token error for OP_READ_BLOCK, self="
-                + sock.getLocalSocketAddress() + ", remote="
-                + sock.getRemoteSocketAddress() + ", for file " + file
-                + ", for pool " + block.getBlockPoolId() + " block " 
-                + block.getBlockId() + "_" + block.getGenerationStamp());
-      } else {
-        throw new IOException("Got error for OP_READ_BLOCK, self="
-            + sock.getLocalSocketAddress() + ", remote="
-            + sock.getRemoteSocketAddress() + ", for file " + file
-            + ", for pool " + block.getBlockPoolId() + " block " 
-            + block.getBlockId() + "_" + block.getGenerationStamp());
-      }
-    }
-  }
-
   @Override
   public synchronized void close() throws IOException {
     startOffset = -1;
@@ -465,6 +429,12 @@ public class RemoteBlockReader extends F
   }
   
   @Override
+  public void readFully(byte[] buf, int readOffset, int amtToRead)
+      throws IOException {
+    IOUtils.readFully(this, buf, readOffset, amtToRead);
+  }
+
+  @Override
   public int readAll(byte[] buf, int offset, int len) throws IOException {
     return readFully(this, buf, offset, len);
   }
@@ -492,14 +462,7 @@ public class RemoteBlockReader extends F
   void sendReadResult(Socket sock, Status statusCode) {
     assert !sentStatusCode : "already sent status code to " + sock;
     try {
-      OutputStream out = NetUtils.getOutputStream(sock, HdfsServerConstants.WRITE_TIMEOUT);
-      
-      ClientReadStatusProto.newBuilder()
-        .setStatus(statusCode)
-        .build()
-        .writeDelimitedTo(out);
-
-      out.flush();
+      RemoteBlockReader2.writeReadResult(sock, statusCode);
       sentStatusCode = true;
     } catch (IOException e) {
       // It's ok not to be able to send this. But something is probably wrong.
@@ -519,4 +482,5 @@ public class RemoteBlockReader extends F
       final String poolId, final long blockId) {
     return s.toString() + ":" + poolId + ":" + blockId;
   }
+
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/PacketHeader.java Tue Nov 15 02:39:13 2011
@@ -136,7 +136,7 @@ public class PacketHeader {
    */
   public boolean sanityCheck(long lastSeqNo) {
     // We should only have a non-positive data length for the last packet
-    if (proto.getDataLen() <= 0 && proto.getLastPacketInBlock()) return false;
+    if (proto.getDataLen() <= 0 && !proto.getLastPacketInBlock()) return false;
     // The last packet should not contain data
     if (proto.getLastPacketInBlock() && proto.getDataLen() != 0) return false;
     // Seqnos should always increase by 1 with each packet received

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/security/token/delegation/DelegationTokenRenewer.java Tue Nov 15 02:39:13 2011
@@ -134,7 +134,7 @@ public class DelegationTokenRenewer<T ex
 
   private DelayQueue<RenewAction<T>> queue = new DelayQueue<RenewAction<T>>();
 
-  public DelegationTokenRenewer(final Class<?> clazz) {
+  public DelegationTokenRenewer(final Class<T> clazz) {
     super(clazz.getSimpleName() + "-" + DelegationTokenRenewer.class.getSimpleName());
     setDaemon(true);
   }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Nov 15 02:39:13 2011
@@ -122,6 +122,38 @@ public class BlockInfo extends Block imp
     triplets[index*3+2] = to;
   }
 
+  /**
+   * Return the previous block on the block list for the datanode at
+   * position index. Set the previous block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to previous on the list of blocks
+   * @return current previous block on the list of blocks
+   */
+  BlockInfo getSetPrevious(int index, BlockInfo to) {
+	assert this.triplets != null : "BlockInfo is not initialized";
+	assert index >= 0 && index*3+1 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo)triplets[index*3+1];
+    triplets[index*3+1] = to;
+    return info;
+  }
+
+  /**
+   * Return the next block on the block list for the datanode at
+   * position index. Set the next block on the list to "to".
+   *
+   * @param index - the datanode index
+   * @param to - block to be set to next on the list of blocks
+   *    * @return current next block on the list of blocks
+   */
+  BlockInfo getSetNext(int index, BlockInfo to) {
+	assert this.triplets != null : "BlockInfo is not initialized";
+	assert index >= 0 && index*3+2 < triplets.length : "Index is out of bound";
+    BlockInfo info = (BlockInfo)triplets[index*3+2];
+    triplets[index*3+2] = to;
+    return info;
+  }
+
   int getCapacity() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
@@ -260,6 +292,27 @@ public class BlockInfo extends Block imp
   }
 
   /**
+   * Remove this block from the list of blocks related to the specified
+   * DatanodeDescriptor. Insert it into the head of the list of blocks.
+   *
+   * @return the new head of the list.
+   */
+  public BlockInfo moveBlockToHead(BlockInfo head, DatanodeDescriptor dn,
+      int curIndex, int headIndex) {
+    if (head == this) {
+      return this;
+    }
+    BlockInfo next = this.getSetNext(curIndex, head);
+    BlockInfo prev = this.getSetPrevious(curIndex, null);
+
+    head.setPrevious(headIndex, this);
+    prev.setNext(prev.findDatanode(dn), next);
+    if (next != null)
+      next.setPrevious(next.findDatanode(dn), prev);
+    return this;
+  }
+
+  /**
    * BlockInfo represents a block that is not being constructed.
    * In order to start modifying the block, the BlockInfo should be converted
    * to {@link BlockInfoUnderConstruction}.
@@ -317,4 +370,4 @@ public class BlockInfo extends Block imp
   public void setNext(LightWeightGSet.LinkedElement next) {
     this.nextLinkedElement = next;
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Nov 15 02:39:13 2011
@@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
 import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.Daemon;
 
@@ -75,6 +76,7 @@ import com.google.common.annotations.Vis
  */
 @InterfaceAudience.Private
 public class BlockManager {
+
   static final Log LOG = LogFactory.getLog(BlockManager.class);
 
   /** Default load factor of map */
@@ -141,8 +143,8 @@ public class BlockManager {
   // eventually remove these extras.
   // Mapping: StorageID -> TreeSet<Block>
   //
-  public final Map<String, Collection<Block>> excessReplicateMap =
-    new TreeMap<String, Collection<Block>>();
+  public final Map<String, LightWeightLinkedSet<Block>> excessReplicateMap =
+    new TreeMap<String, LightWeightLinkedSet<Block>>();
 
   //
   // Store set of Blocks that need to be replicated 1 or more times.
@@ -928,15 +930,7 @@ public class BlockManager {
       chooseUnderReplicatedBlocks(blocksToProcess);
 
     // replicate blocks
-    int scheduledReplicationCount = 0;
-    for (int i=0; i<blocksToReplicate.size(); i++) {
-      for(Block block : blocksToReplicate.get(i)) {
-        if (computeReplicationWorkForBlock(block, i)) {
-          scheduledReplicationCount++;
-        }
-      }
-    }
-    return scheduledReplicationCount;
+    return computeReplicationWorkForBlocks(blocksToReplicate);
   }
 
   /**
@@ -1001,170 +995,201 @@ public class BlockManager {
     return blocksToReplicate;
   }
 
-  /** Replicate a block
+  /** Replicate a set of blocks
    *
-   * @param block block to be replicated
-   * @param priority a hint of its priority in the neededReplication queue
-   * @return if the block gets replicated or not
+   * @param blocksToReplicate blocks to be replicated, for each priority
+   * @return the number of blocks scheduled for replication
    */
   @VisibleForTesting
-  boolean computeReplicationWorkForBlock(Block block, int priority) {
+  int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
     int requiredReplication, numEffectiveReplicas;
     List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
     DatanodeDescriptor srcNode;
     INodeFile fileINode = null;
     int additionalReplRequired;
 
+    int scheduledWork = 0;
+    List<ReplicationWork> work = new LinkedList<ReplicationWork>();
+
     namesystem.writeLock();
     try {
       synchronized (neededReplications) {
-        // block should belong to a file
-        fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
-        }
+        for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
+          for (Block block : blocksToReplicate.get(priority)) {
+            // block should belong to a file
+            fileINode = blocksMap.getINode(block);
+            // abandoned block or block reopened for append
+            if(fileINode == null || fileINode.isUnderConstruction()) {
+              neededReplications.remove(block, priority); // remove from neededReplications
+              replIndex--;
+              continue;
+            }
 
-        requiredReplication = fileINode.getReplication();
+            requiredReplication = fileINode.getReplication();
 
-        // get a source data-node
-        containingNodes = new ArrayList<DatanodeDescriptor>();
-        liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
-        NumberReplicas numReplicas = new NumberReplicas();
-        srcNode = chooseSourceDatanode(
-            block, containingNodes, liveReplicaNodes, numReplicas);
-        if(srcNode == null) // block can not be replicated from any node
-          return false;
-
-        assert liveReplicaNodes.size() == numReplicas.liveReplicas();
-        // do not schedule more if enough replicas is already pending
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-                                pendingReplications.getNumReplicas(block);
+            // get a source data-node
+            containingNodes = new ArrayList<DatanodeDescriptor>();
+            liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
+            NumberReplicas numReplicas = new NumberReplicas();
+            srcNode = chooseSourceDatanode(
+                block, containingNodes, liveReplicaNodes, numReplicas);
+            if(srcNode == null) // block can not be replicated from any node
+              continue;
+
+            assert liveReplicaNodes.size() == numReplicas.liveReplicas();
+            // do not schedule more if enough replicas is already pending
+            numEffectiveReplicas = numReplicas.liveReplicas() +
+                                    pendingReplications.getNumReplicas(block);
       
-        if (numEffectiveReplicas >= requiredReplication) {
-          if ( (pendingReplications.getNumReplicas(block) > 0) ||
-               (blockHasEnoughRacks(block)) ) {
-            neededReplications.remove(block, priority); // remove from neededReplications
-            replIndex--;
-            NameNode.stateChangeLog.info("BLOCK* "
-                + "Removing block " + block
-                + " from neededReplications as it has enough replicas.");
-            return false;
-          }
-        }
+            if (numEffectiveReplicas >= requiredReplication) {
+              if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                   (blockHasEnoughRacks(block)) ) {
+                neededReplications.remove(block, priority); // remove from neededReplications
+                replIndex--;
+                NameNode.stateChangeLog.info("BLOCK* "
+                    + "Removing block " + block
+                    + " from neededReplications as it has enough replicas.");
+                continue;
+              }
+            }
 
-        if (numReplicas.liveReplicas() < requiredReplication) {
-          additionalReplRequired = requiredReplication - numEffectiveReplicas;
-        } else {
-          additionalReplRequired = 1; //Needed on a new rack
+            if (numReplicas.liveReplicas() < requiredReplication) {
+              additionalReplRequired = requiredReplication
+                  - numEffectiveReplicas;
+            } else {
+              additionalReplRequired = 1; // Needed on a new rack
+            }
+            work.add(new ReplicationWork(block, fileINode, srcNode,
+                containingNodes, liveReplicaNodes, additionalReplRequired,
+                priority));
+          }
         }
-
       }
     } finally {
       namesystem.writeUnlock();
     }
-    
-    // Exclude all of the containing nodes from being targets.
-    // This list includes decommissioning or corrupt nodes.
-    HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
-    for (DatanodeDescriptor dn : containingNodes) {
-      excludedNodes.put(dn, dn);
-    }
 
-    // choose replication targets: NOT HOLDING THE GLOBAL LOCK
-    // It is costly to extract the filename for which chooseTargets is called,
-    // so for now we pass in the Inode itself.
-    DatanodeDescriptor targets[] = 
-                       blockplacement.chooseTarget(fileINode, additionalReplRequired,
-                       srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
-    if(targets.length == 0)
-      return false;
+    HashMap<Node, Node> excludedNodes
+        = new HashMap<Node, Node>();
+    for(ReplicationWork rw : work){
+      // Exclude all of the containing nodes from being targets.
+      // This list includes decommissioning or corrupt nodes.
+      excludedNodes.clear();
+      for (DatanodeDescriptor dn : rw.containingNodes) {
+        excludedNodes.put(dn, dn);
+      }
+
+      // choose replication targets: NOT HOLDING THE GLOBAL LOCK
+      // It is costly to extract the filename for which chooseTargets is called,
+      // so for now we pass in the Inode itself.
+      rw.targets = blockplacement.chooseTarget(rw.fileINode,
+          rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
+          excludedNodes, rw.block.getNumBytes());
+    }
 
     namesystem.writeLock();
     try {
-      synchronized (neededReplications) {
-        // Recheck since global lock was released
-        // block should belong to a file
-        fileINode = blocksMap.getINode(block);
-        // abandoned block or block reopened for append
-        if(fileINode == null || fileINode.isUnderConstruction()) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
-          return false;
+      for(ReplicationWork rw : work){
+        DatanodeDescriptor[] targets = rw.targets;
+        if(targets == null || targets.length == 0){
+          rw.targets = null;
+          continue;
         }
-        requiredReplication = fileINode.getReplication();
 
-        // do not schedule more if enough replicas is already pending
-        NumberReplicas numReplicas = countNodes(block);
-        numEffectiveReplicas = numReplicas.liveReplicas() +
-        pendingReplications.getNumReplicas(block);
-
-        if (numEffectiveReplicas >= requiredReplication) {
-          if ( (pendingReplications.getNumReplicas(block) > 0) ||
-               (blockHasEnoughRacks(block)) ) {
+        synchronized (neededReplications) {
+          Block block = rw.block;
+          int priority = rw.priority;
+          // Recheck since global lock was released
+          // block should belong to a file
+          fileINode = blocksMap.getINode(block);
+          // abandoned block or block reopened for append
+          if(fileINode == null || fileINode.isUnderConstruction()) {
             neededReplications.remove(block, priority); // remove from neededReplications
+            rw.targets = null;
             replIndex--;
-            NameNode.stateChangeLog.info("BLOCK* "
-                + "Removing block " + block
-                + " from neededReplications as it has enough replicas.");
-            return false;
+            continue;
           }
-        }
+          requiredReplication = fileINode.getReplication();
 
-        if ( (numReplicas.liveReplicas() >= requiredReplication) &&
-             (!blockHasEnoughRacks(block)) ) {
-          if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
-            //No use continuing, unless a new rack in this case
-            return false;
+          // do not schedule more if enough replicas is already pending
+          NumberReplicas numReplicas = countNodes(block);
+          numEffectiveReplicas = numReplicas.liveReplicas() +
+            pendingReplications.getNumReplicas(block);
+
+          if (numEffectiveReplicas >= requiredReplication) {
+            if ( (pendingReplications.getNumReplicas(block) > 0) ||
+                 (blockHasEnoughRacks(block)) ) {
+              neededReplications.remove(block, priority); // remove from neededReplications
+              replIndex--;
+              rw.targets = null;
+              NameNode.stateChangeLog.info("BLOCK* "
+                  + "Removing block " + block
+                  + " from neededReplications as it has enough replicas.");
+              continue;
+            }
           }
-        }
 
-        // Add block to the to be replicated list
-        srcNode.addBlockToBeReplicated(block, targets);
+          if ( (numReplicas.liveReplicas() >= requiredReplication) &&
+               (!blockHasEnoughRacks(block)) ) {
+            if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
+              //No use continuing, unless a new rack in this case
+              continue;
+            }
+          }
 
-        for (DatanodeDescriptor dn : targets) {
-          dn.incBlocksScheduled();
-        }
+          // Add block to the to be replicated list
+          rw.srcNode.addBlockToBeReplicated(block, targets);
+          scheduledWork++;
 
-        // Move the block-replication into a "pending" state.
-        // The reason we use 'pending' is so we can retry
-        // replications that fail after an appropriate amount of time.
-        pendingReplications.add(block, targets.length);
-        if(NameNode.stateChangeLog.isDebugEnabled()) {
-          NameNode.stateChangeLog.debug(
-              "BLOCK* block " + block
-              + " is moved from neededReplications to pendingReplications");
-        }
+          for (DatanodeDescriptor dn : targets) {
+            dn.incBlocksScheduled();
+          }
 
-        // remove from neededReplications
-        if(numEffectiveReplicas + targets.length >= requiredReplication) {
-          neededReplications.remove(block, priority); // remove from neededReplications
-          replIndex--;
+          // Move the block-replication into a "pending" state.
+          // The reason we use 'pending' is so we can retry
+          // replications that fail after an appropriate amount of time.
+          pendingReplications.add(block, targets.length);
+          if(NameNode.stateChangeLog.isDebugEnabled()) {
+            NameNode.stateChangeLog.debug(
+                "BLOCK* block " + block
+                + " is moved from neededReplications to pendingReplications");
+          }
+
+          // remove from neededReplications
+          if(numEffectiveReplicas + targets.length >= requiredReplication) {
+            neededReplications.remove(block, priority); // remove from neededReplications
+            replIndex--;
+          }
         }
-        if (NameNode.stateChangeLog.isInfoEnabled()) {
+      }
+    } finally {
+      namesystem.writeUnlock();
+    }
+
+    if (NameNode.stateChangeLog.isInfoEnabled()) {
+      // log which blocks have been scheduled for replication
+      for(ReplicationWork rw : work){
+        DatanodeDescriptor[] targets = rw.targets;
+        if (targets != null && targets.length != 0) {
           StringBuilder targetList = new StringBuilder("datanode(s)");
           for (int k = 0; k < targets.length; k++) {
             targetList.append(' ');
             targetList.append(targets[k].getName());
           }
           NameNode.stateChangeLog.info(
-                    "BLOCK* ask "
-                    + srcNode.getName() + " to replicate "
-                    + block + " to " + targetList);
-          if(NameNode.stateChangeLog.isDebugEnabled()) {
-            NameNode.stateChangeLog.debug(
-                "BLOCK* neededReplications = " + neededReplications.size()
-                + " pendingReplications = " + pendingReplications.size());
-          }
+                  "BLOCK* ask "
+                  + rw.srcNode.getName() + " to replicate "
+                  + rw.block + " to " + targetList);
         }
       }
-    } finally {
-      namesystem.writeUnlock();
+    }
+    if(NameNode.stateChangeLog.isDebugEnabled()) {
+        NameNode.stateChangeLog.debug(
+          "BLOCK* neededReplications = " + neededReplications.size()
+          + " pendingReplications = " + pendingReplications.size());
     }
 
-    return true;
+    return scheduledWork;
   }
 
   /**
@@ -1220,7 +1245,7 @@ public class BlockManager {
     Collection<DatanodeDescriptor> nodesCorrupt = corruptReplicas.getNodes(block);
     while(it.hasNext()) {
       DatanodeDescriptor node = it.next();
-      Collection<Block> excessBlocks =
+      LightWeightLinkedSet<Block> excessBlocks =
         excessReplicateMap.get(node.getStorageID());
       if ((nodesCorrupt != null) && (nodesCorrupt.contains(node)))
         corrupt++;
@@ -1443,7 +1468,10 @@ public class BlockManager {
     BlockInfo delimiter = new BlockInfo(new Block(), 1);
     boolean added = dn.addBlock(delimiter);
     assert added : "Delimiting block cannot be present in the node";
-    if(newReport == null)
+    int headIndex = 0; //currently the delimiter is in the head of the list
+    int curIndex;
+
+    if (newReport == null)
       newReport = new BlockListAsLongs();
     // scan the report and process newly reported blocks
     BlockReportIterator itBR = newReport.getBlockReportIterator();
@@ -1453,8 +1481,9 @@ public class BlockManager {
       BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
                                   toAdd, toInvalidate, toCorrupt, toUC);
       // move block to the head of the list
-      if(storedBlock != null && storedBlock.findDatanode(dn) >= 0)
-        dn.moveBlockToHead(storedBlock);
+      if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+        headIndex = dn.moveBlockToHead(storedBlock, curIndex, headIndex);
+      }
     }
     // collect blocks that have not been reported
     // all of them are next to the delimiter
@@ -1871,7 +1900,7 @@ public class BlockManager {
     for (Iterator<DatanodeDescriptor> it = blocksMap.nodeIterator(block);
          it.hasNext();) {
       DatanodeDescriptor cur = it.next();
-      Collection<Block> excessBlocks = excessReplicateMap.get(cur
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(cur
           .getStorageID());
       if (excessBlocks == null || !excessBlocks.contains(block)) {
         if (!cur.isDecommissionInProgress() && !cur.isDecommissioned()) {
@@ -1989,9 +2018,9 @@ public class BlockManager {
 
   private void addToExcessReplicate(DatanodeInfo dn, Block block) {
     assert namesystem.hasWriteLock();
-    Collection<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
+    LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(dn.getStorageID());
     if (excessBlocks == null) {
-      excessBlocks = new TreeSet<Block>();
+      excessBlocks = new LightWeightLinkedSet<Block>();
       excessReplicateMap.put(dn.getStorageID(), excessBlocks);
     }
     if (excessBlocks.add(block)) {
@@ -2039,7 +2068,7 @@ public class BlockManager {
       // We've removed a block from a node, so it's definitely no longer
       // in "excess" there.
       //
-      Collection<Block> excessBlocks = excessReplicateMap.get(node
+      LightWeightLinkedSet<Block> excessBlocks = excessReplicateMap.get(node
           .getStorageID());
       if (excessBlocks != null) {
         if (excessBlocks.remove(block)) {
@@ -2189,8 +2218,8 @@ public class BlockManager {
       } else if (node.isDecommissionInProgress() || node.isDecommissioned()) {
         count++;
       } else {
-        Collection<Block> blocksExcess =
-          excessReplicateMap.get(node.getStorageID());
+        LightWeightLinkedSet<Block> blocksExcess = excessReplicateMap.get(node
+            .getStorageID());
         if (blocksExcess != null && blocksExcess.contains(b)) {
           excess++;
         } else {
@@ -2591,4 +2620,34 @@ public class BlockManager {
     return workFound;
   }
 
+  private static class ReplicationWork {
+
+    private Block block;
+    private INodeFile fileINode;
+
+    private DatanodeDescriptor srcNode;
+    private List<DatanodeDescriptor> containingNodes;
+    private List<DatanodeDescriptor> liveReplicaNodes;
+    private int additionalReplRequired;
+
+    private DatanodeDescriptor targets[];
+    private int priority;
+
+    public ReplicationWork(Block block,
+        INodeFile fileINode,
+        DatanodeDescriptor srcNode,
+        List<DatanodeDescriptor> containingNodes,
+        List<DatanodeDescriptor> liveReplicaNodes,
+        int additionalReplRequired,
+        int priority) {
+      this.block = block;
+      this.fileINode = fileINode;
+      this.srcNode = srcNode;
+      this.containingNodes = containingNodes;
+      this.liveReplicaNodes = liveReplicaNodes;
+      this.additionalReplRequired = additionalReplRequired;
+      this.priority = priority;
+      this.targets = null;
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Nov 15 02:39:13 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.Deprecated
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.WritableUtils;
 
@@ -120,11 +121,11 @@ public class DatanodeDescriptor extends 
   private BlockQueue<BlockInfoUnderConstruction> recoverBlocks =
                                 new BlockQueue<BlockInfoUnderConstruction>();
   /** A set of blocks to be invalidated by this datanode */
-  private Set<Block> invalidateBlocks = new TreeSet<Block>();
+  private LightWeightHashSet<Block> invalidateBlocks = new LightWeightHashSet<Block>();
 
   /* Variables for maintaining number of blocks scheduled to be written to
    * this datanode. This count is approximate and might be slightly bigger
-   * in case of errors (e.g. datanode does not report if an error occurs 
+   * in case of errors (e.g. datanode does not report if an error occurs
    * while writing the block).
    */
   private int currApproxBlocksScheduled = 0;
@@ -244,15 +245,24 @@ public class DatanodeDescriptor extends 
 
   /**
    * Move block to the head of the list of blocks belonging to the data-node.
+   * @return the index of the head of the blockList
    */
-  void moveBlockToHead(BlockInfo b) {
-    blockList = b.listRemove(blockList, this);
-    blockList = b.listInsert(blockList, this);
+  int moveBlockToHead(BlockInfo b, int curIndex, int headIndex) {
+    blockList = b.moveBlockToHead(blockList, this, curIndex, headIndex);
+    return curIndex;
+  }
+
+  /**
+   * Used for testing only
+   * @return the head of the blockList
+   */
+  protected BlockInfo getHead(){
+    return blockList;
   }
 
   /**
    * Replace specified old block with a new one in the DataNodeDescriptor.
-   * 
+   *
    * @param oldBlock - block to be replaced
    * @param newBlock - a replacement block
    * @return the new block
@@ -391,45 +401,11 @@ public class DatanodeDescriptor extends 
    * Remove the specified number of blocks to be invalidated
    */
   public Block[] getInvalidateBlocks(int maxblocks) {
-    return getBlockArray(invalidateBlocks, maxblocks); 
-  }
-
-  static private Block[] getBlockArray(Collection<Block> blocks, int max) {
-    Block[] blockarray = null;
-    synchronized(blocks) {
-      int available = blocks.size();
-      int n = available;
-      if (max > 0 && n > 0) {
-        if (max < n) {
-          n = max;
-        }
-        // allocate the properly sized block array ... 
-        blockarray = new Block[n];
-
-        // iterate tree collecting n blocks... 
-        Iterator<Block> e = blocks.iterator();
-        int blockCount = 0;
-
-        while (blockCount < n && e.hasNext()) {
-          // insert into array ... 
-          blockarray[blockCount++] = e.next();
-
-          // remove from tree via iterator, if we are removing 
-          // less than total available blocks
-          if (n < available){
-            e.remove();
-          }
-        }
-        assert(blockarray.length == n);
-        
-        // now if the number of blocks removed equals available blocks,
-        // them remove all blocks in one fell swoop via clear
-        if (n == available) { 
-          blocks.clear();
-        }
-      }
+    synchronized (invalidateBlocks) {
+      Block[] deleteList = invalidateBlocks.pollToArray(new Block[Math.min(
+          invalidateBlocks.size(), maxblocks)]);
+      return deleteList.length == 0 ? null : deleteList;
     }
-    return blockarray;
   }
 
   /** Serialization for FSEditLog */

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Tue Nov 15 02:39:13 2011
@@ -30,8 +30,9 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.LightWeightHashSet;
 
-/** 
+/**
  * Keeps a Collection for every named machine containing blocks
  * that have recently been invalidated and are thought to live
  * on the machine in question.
@@ -39,8 +40,8 @@ import org.apache.hadoop.hdfs.server.nam
 @InterfaceAudience.Private
 class InvalidateBlocks {
   /** Mapping: StorageID -> Collection of Blocks */
-  private final Map<String, Collection<Block>> node2blocks =
-      new TreeMap<String, Collection<Block>>();
+  private final Map<String, LightWeightHashSet<Block>> node2blocks =
+      new TreeMap<String, LightWeightHashSet<Block>>();
   /** The total number of blocks in the map. */
   private long numBlocks = 0L;
 
@@ -67,9 +68,9 @@ class InvalidateBlocks {
    */
   synchronized void add(final Block block, final DatanodeInfo datanode,
       final boolean log) {
-    Collection<Block> set = node2blocks.get(datanode.getStorageID());
+    LightWeightHashSet<Block> set = node2blocks.get(datanode.getStorageID());
     if (set == null) {
-      set = new HashSet<Block>();
+      set = new LightWeightHashSet<Block>();
       node2blocks.put(datanode.getStorageID(), set);
     }
     if (set.add(block)) {
@@ -83,7 +84,7 @@ class InvalidateBlocks {
 
   /** Remove a storage from the invalidatesSet */
   synchronized void remove(final String storageID) {
-    final Collection<Block> blocks = node2blocks.remove(storageID);
+    final LightWeightHashSet<Block> blocks = node2blocks.remove(storageID);
     if (blocks != null) {
       numBlocks -= blocks.size();
     }
@@ -91,7 +92,7 @@ class InvalidateBlocks {
 
   /** Remove the block from the specified storage. */
   synchronized void remove(final String storageID, final Block block) {
-    final Collection<Block> v = node2blocks.get(storageID);
+    final LightWeightHashSet<Block> v = node2blocks.get(storageID);
     if (v != null && v.remove(block)) {
       numBlocks--;
       if (v.isEmpty()) {
@@ -109,8 +110,8 @@ class InvalidateBlocks {
       return;
     }
 
-    for(Map.Entry<String,Collection<Block>> entry : node2blocks.entrySet()) {
-      final Collection<Block> blocks = entry.getValue();
+    for(Map.Entry<String,LightWeightHashSet<Block>> entry : node2blocks.entrySet()) {
+      final LightWeightHashSet<Block> blocks = entry.getValue();
       if (blocks.size() > 0) {
         out.println(datanodeManager.getDatanode(entry.getKey()).getName() + blocks);
       }
@@ -143,21 +144,17 @@ class InvalidateBlocks {
 
   private synchronized List<Block> invalidateWork(
       final String storageId, final DatanodeDescriptor dn) {
-    final Collection<Block> set = node2blocks.get(storageId);
+    final LightWeightHashSet<Block> set = node2blocks.get(storageId);
     if (set == null) {
       return null;
     }
 
     // # blocks that can be sent in one message is limited
     final int limit = datanodeManager.blockInvalidateLimit;
-    final List<Block> toInvalidate = new ArrayList<Block>(limit);
-    final Iterator<Block> it = set.iterator();
-    for(int count = 0; count < limit && it.hasNext(); count++) {
-      toInvalidate.add(it.next());
-      it.remove();
-    }
+    final List<Block> toInvalidate = set.pollN(limit);
+
     // If we send everything in this message, remove this node entry
-    if (!it.hasNext()) {
+    if (set.isEmpty()) {
       remove(storageId);
     }
 

Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java?rev=1202013&r1=1202012&r2=1202013&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UnderReplicatedBlocks.java Tue Nov 15 02:39:13 2011
@@ -24,6 +24,7 @@ import java.util.NavigableSet;
 import java.util.TreeSet;
 
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 
 /**
@@ -80,13 +81,13 @@ class UnderReplicatedBlocks implements I
   /** The queue for corrupt blocks: {@value} */
   static final int QUEUE_WITH_CORRUPT_BLOCKS = 4;
   /** the queues themselves */
-  private final List<NavigableSet<Block>> priorityQueues
-      = new ArrayList<NavigableSet<Block>>(LEVEL);
+  private List<LightWeightLinkedSet<Block>> priorityQueues
+      = new ArrayList<LightWeightLinkedSet<Block>>();
 
   /** Create an object. */
   UnderReplicatedBlocks() {
     for (int i = 0; i < LEVEL; i++) {
-      priorityQueues.add(new TreeSet<Block>());
+      priorityQueues.add(new LightWeightLinkedSet<Block>());
     }
   }
 
@@ -123,10 +124,10 @@ class UnderReplicatedBlocks implements I
   synchronized int getCorruptBlockSize() {
     return priorityQueues.get(QUEUE_WITH_CORRUPT_BLOCKS).size();
   }
-  
+
   /** Check if a block is in the neededReplication queue */
   synchronized boolean contains(Block block) {
-    for (NavigableSet<Block> set : priorityQueues) {
+    for(LightWeightLinkedSet<Block> set : priorityQueues) {
       if (set.contains(block)) {
         return true;
       }



Mime
View raw message