hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tomwh...@apache.org
Subject svn commit: r906690 [2/4] - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/server/common/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/aop/org/apache/hadoop/hdfs/ src/test/hdfs/org/apache/hadoop/...
Date Thu, 04 Feb 2010 22:20:03 GMT
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=906690&r1=906689&r2=906690&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Thu Feb  4 22:20:02 2010
@@ -19,11 +19,9 @@
 package org.apache.hadoop.hdfs;
 
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Op.BLOCK_CHECKSUM;
-import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.CHECKSUM_OK;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.ERROR_ACCESS_TOKEN;
 import static org.apache.hadoop.hdfs.protocol.DataTransferProtocol.Status.SUCCESS;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
@@ -33,37 +31,24 @@
 import java.net.InetSocketAddress;
 import java.net.Socket;
 import java.net.SocketTimeoutException;
-import java.nio.BufferOverflowException;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
 import java.util.EnumSet;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.SortedMap;
 import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.net.SocketFactory;
-import javax.security.auth.login.LoginException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.ChecksumException;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSInputChecker;
-import org.apache.hadoop.fs.FSInputStream;
-import org.apache.hadoop.fs.FSOutputSummer;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -71,7 +56,6 @@
 import org.apache.hadoop.fs.FsStatus;
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.Syncable;
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
@@ -86,16 +70,11 @@
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.BlockConstructionStage;
-import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
-import org.apache.hadoop.hdfs.security.BlockAccessToken;
-import org.apache.hadoop.hdfs.security.InvalidAccessTokenException;
 import org.apache.hadoop.hdfs.security.token.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.common.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.EnumSetWritable;
 import org.apache.hadoop.io.IOUtils;
@@ -114,9 +93,7 @@
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.util.Daemon;
-import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.PureJavaCrc32;
 import org.apache.hadoop.util.StringUtils;
 
 /********************************************************
@@ -134,8 +111,8 @@
   public static final Log LOG = LogFactory.getLog(DFSClient.class);
   public static final long SERVER_DEFAULTS_VALIDITY_PERIOD = 60 * 60 * 1000L; // 1 hour
   public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3;
-  private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
-  private final ClientProtocol namenode;
+  static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB
+  final ClientProtocol namenode;
   private final ClientProtocol rpcNamenode;
   final UserGroupInformation ugi;
   volatile boolean clientRunning = true;
@@ -144,16 +121,14 @@
   Random r = new Random();
   final String clientName;
   final LeaseChecker leasechecker = new LeaseChecker();
-  private Configuration conf;
-  private long defaultBlockSize;
+  Configuration conf;
+  long defaultBlockSize;
   private short defaultReplication;
-  private SocketFactory socketFactory;
-  private int socketTimeout;
-  private int datanodeWriteTimeout;
+  SocketFactory socketFactory;
+  int socketTimeout;
   final int writePacketSize;
-  private final FileSystem.Statistics stats;
-  private int maxBlockAcquireFailures;
-  private final int hdfsTimeout;    // timeout value for a DFS operation.
+  final FileSystem.Statistics stats;
+  final int hdfsTimeout;    // timeout value for a DFS operation.
 
   /**
    * The locking hierarchy is to first acquire lock on DFSClient object, followed by 
@@ -253,13 +228,10 @@
     this.stats = stats;
     this.socketTimeout = conf.getInt(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 
                                      HdfsConstants.READ_TIMEOUT);
-    this.datanodeWriteTimeout = conf.getInt("dfs.datanode.socket.write.timeout",
-                                            HdfsConstants.WRITE_TIMEOUT);
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     // dfs.write.packet.size is an internal config variable
     this.writePacketSize = conf.getInt(DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY, 
 		                       DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT);
-    this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf);
     // The hdfsTimeout is currently the same as the ipc timeout 
     this.hdfsTimeout = Client.getTimeout(conf);
 
@@ -287,13 +259,41 @@
     }
   }
 
-  private void checkOpen() throws IOException {
+  /**
+   * Return the number of times the client should go back to the namenode
+   * to retrieve block locations when reading.
+   */
+  int getMaxBlockAcquireFailures() {
+    return conf.getInt("dfs.client.max.block.acquire.failures",
+                       MAX_BLOCK_ACQUIRE_FAILURES);
+  }
+
+  /**
+   * Return the timeout that clients should use when writing to datanodes.
+   * @param numNodes the number of nodes in the pipeline.
+   */
+  int getDatanodeWriteTimeout(int numNodes) {
+    int confTime =
+        conf.getInt("dfs.datanode.socket.write.timeout",
+                    HdfsConstants.WRITE_TIMEOUT);
+
+    return (confTime > 0) ?
+      (confTime + HdfsConstants.WRITE_TIMEOUT_EXTENSION * numNodes) : 0;
+  }
+
+  int getDatanodeReadTimeout(int numNodes) {
+    return socketTimeout > 0 ?
+        (HdfsConstants.READ_TIMEOUT_EXTENSION * numNodes +
+        socketTimeout) : 0;
+  }
+
+  void checkOpen() throws IOException {
     if (!clientRunning) {
       IOException result = new IOException("Filesystem closed");
       throw result;
     }
   }
-    
+
   /**
    * Close the file system, abandoning all of the leases and files being
    * created and close connections to the namenode.
@@ -330,11 +330,6 @@
     }
   }
 
-  static int getMaxBlockAcquireFailures(Configuration conf) {
-    return conf.getInt("dfs.client.max.block.acquire.failures",
-                       MAX_BLOCK_ACQUIRE_FAILURES);
-  }
-
   /**
    * Get server default values for a number of configuration params.
    */
@@ -377,7 +372,7 @@
     return defaultReplication;
   }
 
-  private static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
+  static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
       String src, long start, long length) throws IOException {
     try {
       return namenode.getBlockLocations(src, start, length);
@@ -458,7 +453,7 @@
       ) throws IOException {
     checkOpen();
     //    Get block info from namenode
-    return new DFSInputStream(src, buffersize, verifyChecksum);
+    return new DFSInputStream(this, src, buffersize, verifyChecksum);
   }
 
   /**
@@ -603,7 +598,7 @@
     }
     FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf));
     LOG.debug(src + ": masked=" + masked);
-    OutputStream result = new DFSOutputStream(src, masked,
+    OutputStream result = new DFSOutputStream(this, src, masked,
         flag, createParent, replication, blockSize, progress, buffersize,
         conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                     DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
@@ -628,7 +623,7 @@
                              int bytesPerChecksum)
     throws IOException {
     checkOpen();
-    OutputStream result = new DFSOutputStream(src, absPermission,
+    OutputStream result = new DFSOutputStream(this, src, absPermission,
         flag, createParent, replication, blockSize, progress, buffersize,
         bytesPerChecksum);
     leasechecker.put(src, result);
@@ -659,7 +654,7 @@
                                      NSQuotaExceededException.class,
                                      DSQuotaExceededException.class);
     }
-    OutputStream result = new DFSOutputStream(src, buffersize, progress,
+    OutputStream result = new DFSOutputStream(this, src, buffersize, progress,
         lastBlock, stat, conf.getInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 
                                      DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_DEFAULT));
     leasechecker.put(src, result);
@@ -1165,23 +1160,6 @@
     }
   }
 
-  /**
-   * Pick the best node from which to stream the data.
-   * Entries in <i>nodes</i> are already in the priority order
-   */
-  private DatanodeInfo bestNode(DatanodeInfo nodes[], 
-                                AbstractMap<DatanodeInfo, DatanodeInfo> deadNodes)
-                                throws IOException {
-    if (nodes != null) { 
-      for (int i = 0; i < nodes.length; i++) {
-        if (!deadNodes.containsKey(nodes[i])) {
-          return nodes[i];
-        }
-      }
-    }
-    throw new IOException("No live nodes contain current block");
-  }
-
   boolean isLeaseCheckerStarted() {
     return leasechecker.daemon != null;
   }
@@ -1324,2600 +1302,42 @@
     }
   }
 
-  /** Utility class to encapsulate data node info and its ip address. */
-  private static class DNAddrPair {
-    DatanodeInfo info;
-    InetSocketAddress addr;
-    DNAddrPair(DatanodeInfo info, InetSocketAddress addr) {
-      this.info = info;
-      this.addr = addr;
-    }
-  }
-
-  /** This is a wrapper around connection to datadone
-   * and understands checksum, offset etc
+  /**
+   * The Hdfs implementation of {@link FSDataInputStream}
    */
-  public static class BlockReader extends FSInputChecker {
-
-    Socket dnSock; //for now just sending checksumOk.
-    private DataInputStream in;
-    private DataChecksum checksum;
-
-    /** offset in block of the last chunk received */
-    private long lastChunkOffset = -1;
-    private long lastChunkLen = -1;
-    private long lastSeqNo = -1;
-
-    /** offset in block where reader wants to actually read */
-    private long startOffset;
-
-    /** offset in block of of first chunk - may be less than startOffset
-        if startOffset is not chunk-aligned */
-    private final long firstChunkOffset;
-
-    private int bytesPerChecksum;
-    private int checksumSize;
-
-    /**
-     * The total number of bytes we need to transfer from the DN.
-     * This is the amount that the user has requested plus some padding
-     * at the beginning so that the read can begin on a chunk boundary.
-     */
-    private final long bytesNeededToFinish;
-
-    private boolean gotEOS = false;
-    
-    byte[] skipBuf = null;
-    ByteBuffer checksumBytes = null;
-    int dataLeft = 0;
-    
-    /* FSInputChecker interface */
-    
-    /* same interface as inputStream java.io.InputStream#read()
-     * used by DFSInputStream#read()
-     * This violates one rule when there is a checksum error:
-     * "Read should not modify user buffer before successful read"
-     * because it first reads the data to user buffer and then checks
-     * the checksum.
-     */
-    @Override
-    public synchronized int read(byte[] buf, int off, int len) 
-                                 throws IOException {
-      
-      // This has to be set here, *before* the skip, since we can
-      // hit EOS during the skip, in the case that our entire read
-      // is smaller than the checksum chunk.
-      boolean eosBefore = gotEOS;
-
-      //for the first read, skip the extra bytes at the front.
-      if (lastChunkLen < 0 && startOffset > firstChunkOffset && len > 0) {
-        // Skip these bytes. But don't call this.skip()!
-        int toSkip = (int)(startOffset - firstChunkOffset);
-        if ( skipBuf == null ) {
-          skipBuf = new byte[bytesPerChecksum];
-        }
-        if ( super.read(skipBuf, 0, toSkip) != toSkip ) {
-          // should never happen
-          throw new IOException("Could not skip required number of bytes");
-        }
-      }
-      
-      int nRead = super.read(buf, off, len);
-      
-      // if gotEOS was set in the previous read and checksum is enabled :
-      if (gotEOS && !eosBefore && nRead >= 0 && needChecksum()) {
-        //checksum is verified and there are no errors.
-        checksumOk(dnSock);
-      }
-      return nRead;
-    }
-
-    @Override
-    public synchronized long skip(long n) throws IOException {
-      /* How can we make sure we don't throw a ChecksumException, at least
-       * in majority of the cases?. This one throws. */  
-      if ( skipBuf == null ) {
-        skipBuf = new byte[bytesPerChecksum]; 
-      }
-
-      long nSkipped = 0;
-      while ( nSkipped < n ) {
-        int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
-        int ret = read(skipBuf, 0, toSkip);
-        if ( ret <= 0 ) {
-          return nSkipped;
-        }
-        nSkipped += ret;
-      }
-      return nSkipped;
-    }
-
-    @Override
-    public int read() throws IOException {
-      throw new IOException("read() is not expected to be invoked. " +
-                            "Use read(buf, off, len) instead.");
-    }
-    
-    @Override
-    public boolean seekToNewSource(long targetPos) throws IOException {
-      /* Checksum errors are handled outside the BlockReader. 
-       * DFSInputStream does not always call 'seekToNewSource'. In the 
-       * case of pread(), it just tries a different replica without seeking.
-       */ 
-      return false;
-    }
-    
-    @Override
-    public void seek(long pos) throws IOException {
-      throw new IOException("Seek() is not supported in BlockInputChecker");
-    }
-
-    @Override
-    protected long getChunkPosition(long pos) {
-      throw new RuntimeException("getChunkPosition() is not supported, " +
-                                 "since seek is not required");
-    }
-    
-    /**
-     * Makes sure that checksumBytes has enough capacity 
-     * and limit is set to the number of checksum bytes needed 
-     * to be read.
-     */
-    private void adjustChecksumBytes(int dataLen) {
-      int requiredSize = 
-        ((dataLen + bytesPerChecksum - 1)/bytesPerChecksum)*checksumSize;
-      if (checksumBytes == null || requiredSize > checksumBytes.capacity()) {
-        checksumBytes =  ByteBuffer.wrap(new byte[requiredSize]);
-      } else {
-        checksumBytes.clear();
-      }
-      checksumBytes.limit(requiredSize);
-    }
-    
-    @Override
-    protected synchronized int readChunk(long pos, byte[] buf, int offset, 
-                                         int len, byte[] checksumBuf) 
-                                         throws IOException {
-      // Read one chunk.
-      if ( gotEOS ) {
-        // Already hit EOF
-        return -1;
-      }
-      
-      // Read one DATA_CHUNK.
-      long chunkOffset = lastChunkOffset;
-      if ( lastChunkLen > 0 ) {
-        chunkOffset += lastChunkLen;
-      }
-      
-      // pos is relative to the start of the first chunk of the read.
-      // chunkOffset is relative to the start of the block.
-      // This makes sure that the read passed from FSInputChecker is the
-      // for the same chunk we expect to be reading from the DN.
-      if ( (pos + firstChunkOffset) != chunkOffset ) {
-        throw new IOException("Mismatch in pos : " + pos + " + " + 
-                              firstChunkOffset + " != " + chunkOffset);
-      }
-
-      // Read next packet if the previous packet has been read completely.
-      if (dataLeft <= 0) {
-        //Read packet headers.
-        int packetLen = in.readInt();
-        long offsetInBlock = in.readLong();
-        long seqno = in.readLong();
-        boolean lastPacketInBlock = in.readBoolean();
-      
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("DFSClient readChunk got seqno " + seqno +
-                    " offsetInBlock " + offsetInBlock +
-                    " lastPacketInBlock " + lastPacketInBlock +
-                    " packetLen " + packetLen);
-        }
-        
-        int dataLen = in.readInt();
-      
-        // Sanity check the lengths
-        if ( ( dataLen <= 0 && !lastPacketInBlock ) ||
-             ( dataLen != 0 && lastPacketInBlock) ||
-             (seqno != (lastSeqNo + 1)) ) {
-             throw new IOException("BlockReader: error in packet header" +
-                                   "(chunkOffset : " + chunkOffset + 
-                                   ", dataLen : " + dataLen +
-                                   ", seqno : " + seqno + 
-                                   " (last: " + lastSeqNo + "))");
-        }
-        
-        lastSeqNo = seqno;
-        dataLeft = dataLen;
-        adjustChecksumBytes(dataLen);
-        if (dataLen > 0) {
-          IOUtils.readFully(in, checksumBytes.array(), 0,
-                            checksumBytes.limit());
-        }
-      }
-
-      // Sanity checks
-      assert len >= bytesPerChecksum;
-      assert checksum != null;
-      assert checksumSize == 0 || (checksumBuf.length % checksumSize == 0);
-
-
-      int checksumsToRead, bytesToRead;
-
-      if (checksumSize > 0) {
-
-        // How many chunks left in our stream - this is a ceiling
-        // since we may have a partial chunk at the end of the file
-        int chunksLeft = (dataLeft - 1) / bytesPerChecksum + 1;
-
-        // How many chunks we can fit in databuffer
-        //  - note this is a floor since we always read full chunks
-        int chunksCanFit = Math.min(len / bytesPerChecksum,
-                                    checksumBuf.length / checksumSize);
-
-        // How many chunks should we read
-        checksumsToRead = Math.min(chunksLeft, chunksCanFit);
-        // How many bytes should we actually read
-        bytesToRead = Math.min(
-          checksumsToRead * bytesPerChecksum, // full chunks
-          dataLeft); // in case we have a partial
-      } else {
-        // no checksum
-        bytesToRead = Math.min(dataLeft, len);
-        checksumsToRead = 0;
-      }
-
-      if ( bytesToRead > 0 ) {
-        // Assert we have enough space
-        assert bytesToRead <= len;
-        assert checksumBytes.remaining() >= checksumSize * checksumsToRead;
-        assert checksumBuf.length >= checksumSize * checksumsToRead;
-        IOUtils.readFully(in, buf, offset, bytesToRead);
-        checksumBytes.get(checksumBuf, 0, checksumSize * checksumsToRead);
-      }
-
-      dataLeft -= bytesToRead;
-      assert dataLeft >= 0;
-
-      lastChunkOffset = chunkOffset;
-      lastChunkLen = bytesToRead;
-
-      // If there's no data left in the current packet after satisfying
-      // this read, and we have satisfied the client read, we expect
-      // an empty packet header from the DN to signify this.
-      // Note that pos + bytesToRead may in fact be greater since the
-      // DN finishes off the entire last chunk.
-      if (dataLeft == 0 &&
-          pos + bytesToRead >= bytesNeededToFinish) {
-
-        // Read header
-        int packetLen = in.readInt();
-        long offsetInBlock = in.readLong();
-        long seqno = in.readLong();
-        boolean lastPacketInBlock = in.readBoolean();
-        int dataLen = in.readInt();
-
-        if (!lastPacketInBlock ||
-            dataLen != 0) {
-          throw new IOException("Expected empty end-of-read packet! Header: " +
-                                "(packetLen : " + packetLen + 
-                                ", offsetInBlock : " + offsetInBlock +
-                                ", seqno : " + seqno + 
-                                ", lastInBlock : " + lastPacketInBlock +
-                                ", dataLen : " + dataLen);
-        }
-
-        gotEOS = true;
-      }
-
-      if ( bytesToRead == 0 ) {
-        return -1;
-      }
-
-      return bytesToRead;
+  public static class DFSDataInputStream extends FSDataInputStream {
+    public DFSDataInputStream(DFSInputStream in)
+      throws IOException {
+      super(in);
     }
-    
-    private BlockReader( String file, long blockId, DataInputStream in, 
-                         DataChecksum checksum, boolean verifyChecksum,
-                         long startOffset, long firstChunkOffset,
-                         long bytesToRead,
-                         Socket dnSock ) {
-      super(new Path("/blk_" + blockId + ":of:" + file)/*too non path-like?*/,
-            1, verifyChecksum,
-            checksum.getChecksumSize() > 0? checksum : null, 
-            checksum.getBytesPerChecksum(),
-            checksum.getChecksumSize());
       
-      this.dnSock = dnSock;
-      this.in = in;
-      this.checksum = checksum;
-      this.startOffset = Math.max( startOffset, 0 );
-
-      // The total number of bytes that we need to transfer from the DN is
-      // the amount that the user wants (bytesToRead), plus the padding at
-      // the beginning in order to chunk-align. Note that the DN may elect
-      // to send more than this amount if the read ends mid-chunk.
-      this.bytesNeededToFinish = bytesToRead + (startOffset - firstChunkOffset);
-
-      this.firstChunkOffset = firstChunkOffset;
-      lastChunkOffset = firstChunkOffset;
-      lastChunkLen = -1;
-
-      bytesPerChecksum = this.checksum.getBytesPerChecksum();
-      checksumSize = this.checksum.getChecksumSize();
-    }
-
-    public static BlockReader newBlockReader(Socket sock, String file, long blockId, BlockAccessToken accessToken, 
-        long genStamp, long startOffset, long len, int bufferSize) throws IOException {
-      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset, len, bufferSize,
-          true);
-    }
-
-    /** Java Doc required */
-    public static BlockReader newBlockReader( Socket sock, String file, long blockId, 
-                                       BlockAccessToken accessToken,
-                                       long genStamp,
-                                       long startOffset, long len,
-                                       int bufferSize, boolean verifyChecksum)
-                                       throws IOException {
-      return newBlockReader(sock, file, blockId, accessToken, genStamp, startOffset,
-                            len, bufferSize, verifyChecksum, "");
-    }
-
-    public static BlockReader newBlockReader( Socket sock, String file,
-                                       long blockId, 
-                                       BlockAccessToken accessToken,
-                                       long genStamp,
-                                       long startOffset, long len,
-                                       int bufferSize, boolean verifyChecksum,
-                                       String clientName)
-                                       throws IOException {
-      // in and out will be closed when sock is closed (by the caller)
-      DataTransferProtocol.Sender.opReadBlock(
-          new DataOutputStream(new BufferedOutputStream(
-              NetUtils.getOutputStream(sock,HdfsConstants.WRITE_TIMEOUT))),
-          blockId, genStamp, startOffset, len, clientName, accessToken);
-      
-      //
-      // Get bytes in block, set streams
-      //
-
-      DataInputStream in = new DataInputStream(
-          new BufferedInputStream(NetUtils.getInputStream(sock), 
-                                  bufferSize));
-      
-      DataTransferProtocol.Status status = DataTransferProtocol.Status.read(in);
-      if (status != SUCCESS) {
-        if (status == ERROR_ACCESS_TOKEN) {
-          throw new InvalidAccessTokenException(
-              "Got access token error for OP_READ_BLOCK, self="
-                  + sock.getLocalSocketAddress() + ", remote="
-                  + sock.getRemoteSocketAddress() + ", for file " + file
-                  + ", for block " + blockId + "_" + genStamp);
-        } else {
-          throw new IOException("Got error for OP_READ_BLOCK, self="
-              + sock.getLocalSocketAddress() + ", remote="
-              + sock.getRemoteSocketAddress() + ", for file " + file
-              + ", for block " + blockId + "_" + genStamp);
-        }
-      }
-      DataChecksum checksum = DataChecksum.newDataChecksum( in );
-      //Warning when we get CHECKSUM_NULL?
-      
-      // Read the first chunk offset.
-      long firstChunkOffset = in.readLong();
-      
-      if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
-          firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
-        throw new IOException("BlockReader: error in first chunk offset (" +
-                              firstChunkOffset + ") startOffset is " + 
-                              startOffset + " for file " + file);
-      }
-
-      return new BlockReader( file, blockId, in, checksum, verifyChecksum,
-                              startOffset, firstChunkOffset, len,
-                              sock );
-    }
-
-    @Override
-    public synchronized void close() throws IOException {
-      startOffset = -1;
-      checksum = null;
-      // in will be closed when its Socket is closed.
-    }
-    
-    /** kind of like readFully(). Only reads as much as possible.
-     * And allows use of protected readFully().
-     */
-    public int readAll(byte[] buf, int offset, int len) throws IOException {
-      return readFully(this, buf, offset, len);
-    }
-    
-    /* When the reader reaches end of a block and there are no checksum
-     * errors, we send OP_STATUS_CHECKSUM_OK to datanode to inform that 
-     * checksum was verified and there was no error.
-     */ 
-    void checksumOk(Socket sock) {
-      try {
-        OutputStream out = NetUtils.getOutputStream(sock, HdfsConstants.WRITE_TIMEOUT);
-        CHECKSUM_OK.writeOutputStream(out);
-        out.flush();
-      } catch (IOException e) {
-        // its ok not to be able to send this.
-        LOG.debug("Could not write to datanode " + sock.getInetAddress() +
-                  ": " + e.getMessage());
-      }
-    }
-  }
-    
-  /****************************************************************
-   * DFSInputStream provides bytes from a named file.  It handles 
-   * negotiation of the namenode and various datanodes as necessary.
-   ****************************************************************/
-  class DFSInputStream extends FSInputStream {
-    private Socket s = null;
-    private boolean closed = false;
-
-    private String src;
-    private long prefetchSize = 10 * defaultBlockSize;
-    private BlockReader blockReader = null;
-    private boolean verifyChecksum;
-    private LocatedBlocks locatedBlocks = null;
-    private long lastBlockBeingWrittenLength = 0;
-    private DatanodeInfo currentNode = null;
-    private Block currentBlock = null;
-    private long pos = 0;
-    private long blockEnd = -1;
-
-    /**
-     * This variable tracks the number of failures since the start of the
-     * most recent user-facing operation. That is to say, it should be reset
-     * whenever the user makes a call on this stream, and if at any point
-     * during the retry logic, the failure count exceeds a threshold,
-     * the errors will be thrown back to the operation.
-     *
-     * Specifically this counts the number of times the client has gone
-     * back to the namenode to get a new list of block locations, and is
-     * capped at maxBlockAcquireFailures
-     */
-    private int failures = 0;
-    private int timeWindow = 3000; // wait time window (in msec) if BlockMissingException is caught
-
-    /* XXX Use of CocurrentHashMap is temp fix. Need to fix 
-     * parallel accesses to DFSInputStream (through ptreads) properly */
-    private ConcurrentHashMap<DatanodeInfo, DatanodeInfo> deadNodes = 
-               new ConcurrentHashMap<DatanodeInfo, DatanodeInfo>();
-    private int buffersize = 1;
-    
-    private byte[] oneByteBuf = new byte[1]; // used for 'int read()'
-    
-    void addToDeadNodes(DatanodeInfo dnInfo) {
-      deadNodes.put(dnInfo, dnInfo);
-    }
-    
-    DFSInputStream(String src, int buffersize, boolean verifyChecksum
-                   ) throws IOException {
-      this.verifyChecksum = verifyChecksum;
-      this.buffersize = buffersize;
-      this.src = src;
-      prefetchSize = conf.getLong(DFSConfigKeys.DFS_CLIENT_READ_PREFETCH_SIZE_KEY, prefetchSize);
-      timeWindow = conf.getInt(DFSConfigKeys.DFS_CLIENT_RETRY_WINDOW_BASE, timeWindow);
-      openInfo();
-    }
-
-    /**
-     * Grab the open-file info from namenode
-     */
-    synchronized void openInfo() throws IOException {
-      LocatedBlocks newInfo = callGetBlockLocations(namenode, src, 0, prefetchSize);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("newInfo = " + newInfo);
-      }
-      if (newInfo == null) {
-        throw new IOException("Cannot open filename " + src);
-      }
-
-      if (locatedBlocks != null) {
-        Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
-        Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
-        while (oldIter.hasNext() && newIter.hasNext()) {
-          if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
-            throw new IOException("Blocklist for " + src + " has changed!");
-          }
-        }
-      }
-      this.locatedBlocks = newInfo;
-      this.lastBlockBeingWrittenLength = 0;
-      if (!locatedBlocks.isLastBlockComplete()) {
-        final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
-        if (last != null) {
-          final long len = readBlockLength(last);
-          last.getBlock().setNumBytes(len);
-          this.lastBlockBeingWrittenLength = len; 
-        }
-      }
-
-      this.currentNode = null;
-    }
-
-    /** Read the block length from one of the datanodes. */
-    private long readBlockLength(LocatedBlock locatedblock) throws IOException {
-      if (locatedblock == null || locatedblock.getLocations().length == 0) {
-        return 0;
-      }
-      for(DatanodeInfo datanode : locatedblock.getLocations()) {
-        try {
-          final ClientDatanodeProtocol cdp = createClientDatanodeProtocolProxy(
-              datanode, conf);
-          final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
-          if (n >= 0) {
-            return n;
-          }
-        }
-        catch(IOException ioe) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("Faild to getReplicaVisibleLength from datanode "
-                + datanode + " for block " + locatedblock.getBlock(), ioe);
-          }
-        }
-      }
-      throw new IOException("Cannot obtain block length for " + locatedblock);
-    }
-    
-    public synchronized long getFileLength() {
-      return locatedBlocks == null? 0:
-          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
-    }
-
     /**
      * Returns the datanode from which the stream is currently reading.
      */
     public DatanodeInfo getCurrentDatanode() {
-      return currentNode;
+      return ((DFSInputStream)in).getCurrentDatanode();
     }
-
+      
     /**
      * Returns the block containing the target position. 
      */
     public Block getCurrentBlock() {
-      return currentBlock;
+      return ((DFSInputStream)in).getCurrentBlock();
     }
 
     /**
      * Return collection of blocks that has already been located.
      */
     synchronized List<LocatedBlock> getAllBlocks() throws IOException {
-      return getBlockRange(0, this.getFileLength());
-    }
-
-    /**
-     * Get block at the specified position.
-     * Fetch it from the namenode if not cached.
-     * 
-     * @param offset
-     * @param updatePosition whether to update current position
-     * @return located block
-     * @throws IOException
-     */
-    private synchronized LocatedBlock getBlockAt(long offset,
-        boolean updatePosition) throws IOException {
-      assert (locatedBlocks != null) : "locatedBlocks is null";
-
-      final LocatedBlock blk;
-
-      //check offset
-      if (offset < 0 || offset >= getFileLength()) {
-        throw new IOException("offset < 0 || offset > getFileLength(), offset="
-            + offset
-            + ", updatePosition=" + updatePosition
-            + ", locatedBlocks=" + locatedBlocks);
-      }
-      else if (offset >= locatedBlocks.getFileLength()) {
-        // offset to the portion of the last block,
-        // which is not known to the name-node yet;
-        // getting the last block 
-        blk = locatedBlocks.getLastLocatedBlock();
-      }
-      else {
-        // search cached blocks first
-        int targetBlockIdx = locatedBlocks.findBlock(offset);
-        if (targetBlockIdx < 0) { // block is not cached
-          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-          // fetch more blocks
-          LocatedBlocks newBlocks;
-          newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-          assert (newBlocks != null) : "Could not find target position " + offset;
-          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
-        }
-        blk = locatedBlocks.get(targetBlockIdx);
-      }
-
-      // update current position
-      if (updatePosition) {
-        this.pos = offset;
-        this.blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-        this.currentBlock = blk.getBlock();
-      }
-      return blk;
-    }
-
-    /** Fetch a block from namenode and cache it */
-    private synchronized void fetchBlockAt(long offset) throws IOException {
-      int targetBlockIdx = locatedBlocks.findBlock(offset);
-      if (targetBlockIdx < 0) { // block is not cached
-        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-      }
-      // fetch blocks
-      LocatedBlocks newBlocks;
-      newBlocks = callGetBlockLocations(namenode, src, offset, prefetchSize);
-      if (newBlocks == null) {
-        throw new IOException("Could not find target position " + offset);
-      }
-      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
-    }
-
-    /**
-     * Get blocks in the specified range.
-     * Fetch them from the namenode if not cached.
-     * 
-     * @param offset
-     * @param length
-     * @return consequent segment of located blocks
-     * @throws IOException
-     */
-    private synchronized List<LocatedBlock> getBlockRange(long offset, 
-                                                          long length) 
-                                                        throws IOException {
-      final List<LocatedBlock> blocks;
-      if (locatedBlocks.isLastBlockComplete()) {
-        blocks = getFinalizedBlockRange(offset, length);
-      }
-      else {
-        if (length + offset > locatedBlocks.getFileLength()) {
-          length = locatedBlocks.getFileLength() - offset;
-        }
-        blocks = getFinalizedBlockRange(offset, length);
-        blocks.add(locatedBlocks.getLastLocatedBlock());
-      }
-      return blocks;
-    }
-
-    /**
-     * Get blocks in the specified range.
-     * Includes only the complete blocks.
-     * Fetch them from the namenode if not cached.
-     */
-    private synchronized List<LocatedBlock> getFinalizedBlockRange(
-        long offset, long length) throws IOException {
-      assert (locatedBlocks != null) : "locatedBlocks is null";
-      List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
-      // search cached blocks first
-      int blockIdx = locatedBlocks.findBlock(offset);
-      if (blockIdx < 0) { // block is not cached
-        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
-      }
-      long remaining = length;
-      long curOff = offset;
-      while(remaining > 0) {
-        LocatedBlock blk = null;
-        if(blockIdx < locatedBlocks.locatedBlockCount())
-          blk = locatedBlocks.get(blockIdx);
-        if (blk == null || curOff < blk.getStartOffset()) {
-          LocatedBlocks newBlocks;
-          newBlocks = callGetBlockLocations(namenode, src, curOff, remaining);
-          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
-          continue;
-        }
-        assert curOff >= blk.getStartOffset() : "Block not found";
-        blockRange.add(blk);
-        long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
-        remaining -= bytesRead;
-        curOff += bytesRead;
-        blockIdx++;
-      }
-      return blockRange;
-    }
-
-    /**
-     * Open a DataInputStream to a DataNode so that it can be read from.
-     * We get block ID and the IDs of the destinations at startup, from the namenode.
-     */
-    private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
-      if (target >= getFileLength()) {
-        throw new IOException("Attempted to read past end of file");
-      }
-
-      if ( blockReader != null ) {
-        blockReader.close(); 
-        blockReader = null;
-      }
-      
-      if (s != null) {
-        s.close();
-        s = null;
-      }
-
-      //
-      // Connect to best DataNode for desired Block, with potential offset
-      //
-      DatanodeInfo chosenNode = null;
-      int refetchToken = 1; // only need to get a new access token once
-      
-      while (true) {
-        //
-        // Compute desired block
-        //
-        LocatedBlock targetBlock = getBlockAt(target, true);
-        assert (target==this.pos) : "Wrong postion " + pos + " expect " + target;
-        long offsetIntoBlock = target - targetBlock.getStartOffset();
-
-        DNAddrPair retval = chooseDataNode(targetBlock);
-        chosenNode = retval.info;
-        InetSocketAddress targetAddr = retval.addr;
-
-        try {
-          s = socketFactory.createSocket();
-          NetUtils.connect(s, targetAddr, socketTimeout);
-          s.setSoTimeout(socketTimeout);
-          Block blk = targetBlock.getBlock();
-          BlockAccessToken accessToken = targetBlock.getAccessToken();
-          
-          blockReader = BlockReader.newBlockReader(s, src, blk.getBlockId(), 
-              accessToken, 
-              blk.getGenerationStamp(),
-              offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-              buffersize, verifyChecksum, clientName);
-          return chosenNode;
-        } catch (IOException ex) {
-          if (ex instanceof InvalidAccessTokenException && refetchToken > 0) {
-            LOG.info("Will fetch a new access token and retry, " 
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + ex);
-            /*
-             * Get a new access token and retry. Retry is needed in 2 cases. 1)
-             * When both NN and DN re-started while DFSClient holding a cached
-             * access token. 2) In the case that NN fails to update its
-             * access key at pre-set interval (by a wide margin) and
-             * subsequently restarts. In this case, DN re-registers itself with
-             * NN and receives a new access key, but DN will delete the old
-             * access key from its memory since it's considered expired based on
-             * the estimated expiration date.
-             */
-            refetchToken--;
-            fetchBlockAt(target);
-          } else {
-            LOG.info("Failed to connect to " + targetAddr
-                + ", add to deadNodes and continue", ex);
-            // Put chosen node into dead list, continue
-            addToDeadNodes(chosenNode);
-          }
-          if (s != null) {
-            try {
-              s.close();
-            } catch (IOException iex) {
-            }                        
-          }
-          s = null;
-        }
-      }
-    }
-
-    /**
-     * Close it down!
-     */
-    @Override
-    public synchronized void close() throws IOException {
-      if (closed) {
-        return;
-      }
-      checkOpen();
-      
-      if ( blockReader != null ) {
-        blockReader.close();
-        blockReader = null;
-      }
-      
-      if (s != null) {
-        s.close();
-        s = null;
-      }
-      super.close();
-      closed = true;
-    }
-
-    @Override
-    public synchronized int read() throws IOException {
-      int ret = read( oneByteBuf, 0, 1 );
-      return ( ret <= 0 ) ? -1 : (oneByteBuf[0] & 0xff);
-    }
-
-    /* This is a used by regular read() and handles ChecksumExceptions.
-     * name readBuffer() is chosen to imply similarity to readBuffer() in
-     * ChecksuFileSystem
-     */ 
-    private synchronized int readBuffer(byte buf[], int off, int len) 
-                                                    throws IOException {
-      IOException ioe;
-      
-      /* we retry current node only once. So this is set to true only here.
-       * Intention is to handle one common case of an error that is not a
-       * failure on datanode or client : when DataNode closes the connection
-       * since client is idle. If there are other cases of "non-errors" then
-       * then a datanode might be retried by setting this to true again.
-       */
-      boolean retryCurrentNode = true;
- 
-      while (true) {
-        // retry as many times as seekToNewSource allows.
-        try {
-          return blockReader.read(buf, off, len);
-        } catch ( ChecksumException ce ) {
-          LOG.warn("Found Checksum error for " + currentBlock + " from " +
-                   currentNode.getName() + " at " + ce.getPos());          
-          reportChecksumFailure(src, currentBlock, currentNode);
-          ioe = ce;
-          retryCurrentNode = false;
-        } catch ( IOException e ) {
-          if (!retryCurrentNode) {
-            LOG.warn("Exception while reading from " + currentBlock +
-                     " of " + src + " from " + currentNode + ": " +
-                     StringUtils.stringifyException(e));
-          }
-          ioe = e;
-        }
-        boolean sourceFound = false;
-        if (retryCurrentNode) {
-          /* possibly retry the same node so that transient errors don't
-           * result in application level failures (e.g. Datanode could have
-           * closed the connection because the client is idle for too long).
-           */ 
-          sourceFound = seekToBlockSource(pos);
-        } else {
-          addToDeadNodes(currentNode);
-          sourceFound = seekToNewSource(pos);
-        }
-        if (!sourceFound) {
-          throw ioe;
-        }
-        retryCurrentNode = false;
-      }
+      return ((DFSInputStream)in).getAllBlocks();
     }
 
     /**
-     * Read the entire buffer.
+     * @return The visible length of the file.
      */
-    @Override
-    public synchronized int read(byte buf[], int off, int len) throws IOException {
-      checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-      failures = 0;
-      if (pos < getFileLength()) {
-        int retries = 2;
-        while (retries > 0) {
-          try {
-            if (pos > blockEnd) {
-              currentNode = blockSeekTo(pos);
-            }
-            int realLen = Math.min(len, (int) (blockEnd - pos + 1));
-            int result = readBuffer(buf, off, realLen);
-            
-            if (result >= 0) {
-              pos += result;
-            } else {
-              // got a EOS from reader though we expect more data on it.
-              throw new IOException("Unexpected EOS from the reader");
-            }
-            if (stats != null && result != -1) {
-              stats.incrementBytesRead(result);
-            }
-            return result;
-          } catch (ChecksumException ce) {
-            throw ce;            
-          } catch (IOException e) {
-            if (retries == 1) {
-              LOG.warn("DFS Read: " + StringUtils.stringifyException(e));
-            }
-            blockEnd = -1;
-            if (currentNode != null) { addToDeadNodes(currentNode); }
-            if (--retries == 0) {
-              throw e;
-            }
-          }
-        }
-      }
-      return -1;
+    public long getVisibleLength() throws IOException {
+      return ((DFSInputStream)in).getFileLength();
     }
-
-        
-    private DNAddrPair chooseDataNode(LocatedBlock block)
-      throws IOException {
-      while (true) {
-        DatanodeInfo[] nodes = block.getLocations();
-        try {
-          DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
-          InetSocketAddress targetAddr = 
-                            NetUtils.createSocketAddr(chosenNode.getName());
-          return new DNAddrPair(chosenNode, targetAddr);
-        } catch (IOException ie) {
-          String blockInfo = block.getBlock() + " file=" + src;
-          if (failures >= maxBlockAcquireFailures) {
-            throw new BlockMissingException(src, "Could not obtain block: " + blockInfo,
-                                            block.getStartOffset());
-          }
-          
-          if (nodes == null || nodes.length == 0) {
-            LOG.info("No node available for block: " + blockInfo);
-          }
-          LOG.info("Could not obtain block " + block.getBlock()
-              + " from any node: " + ie
-              + ". Will get new block locations from namenode and retry...");
-          try {
-            // Introducing a random factor to the wait time before another retry.
-            // The wait time is dependent on # of failures and a random factor.
-            // At the first time of getting a BlockMissingException, the wait time
-            // is a random number between 0..3000 ms. If the first retry
-            // still fails, we will wait 3000 ms grace period before the 2nd retry.
-            // Also at the second retry, the waiting window is expanded to 6000 ms
-            // alleviating the request rate from the server. Similarly the 3rd retry
-            // will wait 6000ms grace period before retry and the waiting window is
-            // expanded to 9000ms. 
-            double waitTime = timeWindow * failures +       // grace period for the last round of attempt
-              timeWindow * (failures + 1) * r.nextDouble(); // expanding time window for each failure
-            LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
-            Thread.sleep((long)waitTime);
-          } catch (InterruptedException iex) {
-          }
-          deadNodes.clear(); //2nd option is to remove only nodes[blockId]
-          openInfo();
-          block = getBlockAt(block.getStartOffset(), false);
-          failures++;
-          continue;
-        }
-      }
-    } 
-        
-    private void fetchBlockByteRange(LocatedBlock block, long start,
-                                     long end, byte[] buf, int offset) throws IOException {
-      //
-      // Connect to best DataNode for desired Block, with potential offset
-      //
-      Socket dn = null;
-      int refetchToken = 1; // only need to get a new access token once
-      
-      while (true) {
-        // cached block locations may have been updated by chooseDataNode()
-        // or fetchBlockAt(). Always get the latest list of locations at the 
-        // start of the loop.
-        block = getBlockAt(block.getStartOffset(), false);
-        DNAddrPair retval = chooseDataNode(block);
-        DatanodeInfo chosenNode = retval.info;
-        InetSocketAddress targetAddr = retval.addr;
-        BlockReader reader = null;
-            
-        try {
-          dn = socketFactory.createSocket();
-          NetUtils.connect(dn, targetAddr, socketTimeout);
-          dn.setSoTimeout(socketTimeout);
-          BlockAccessToken accessToken = block.getAccessToken();
-              
-          int len = (int) (end - start + 1);
-              
-          reader = BlockReader.newBlockReader(dn, src, 
-                                              block.getBlock().getBlockId(),
-                                              accessToken,
-                                              block.getBlock().getGenerationStamp(),
-                                              start, len, buffersize, 
-                                              verifyChecksum, clientName);
-          int nread = reader.readAll(buf, offset, len);
-          if (nread != len) {
-            throw new IOException("truncated return from reader.read(): " +
-                                  "excpected " + len + ", got " + nread);
-          }
-          return;
-        } catch (ChecksumException e) {
-          LOG.warn("fetchBlockByteRange(). Got a checksum exception for " +
-                   src + " at " + block.getBlock() + ":" + 
-                   e.getPos() + " from " + chosenNode.getName());
-          reportChecksumFailure(src, block.getBlock(), chosenNode);
-        } catch (IOException e) {
-          if (e instanceof InvalidAccessTokenException && refetchToken > 0) {
-            LOG.info("Will get a new access token and retry, "
-                + "access token was invalid when connecting to " + targetAddr
-                + " : " + e);
-            refetchToken--;
-            fetchBlockAt(block.getStartOffset());
-            continue;
-          } else {
-            LOG.warn("Failed to connect to " + targetAddr + " for file " + src
-                + " for block " + block.getBlock() + ":"
-                + StringUtils.stringifyException(e));
-          }
-        } finally {
-          IOUtils.closeStream(reader);
-          IOUtils.closeSocket(dn);
-        }
-        // Put chosen node into dead list, continue
-        addToDeadNodes(chosenNode);
-      }
-    }
-
-    /**
-     * Read bytes starting from the specified position.
-     * 
-     * @param position start read from this position
-     * @param buffer read buffer
-     * @param offset offset into buffer
-     * @param length number of bytes to read
-     * 
-     * @return actual number of bytes read
-     */
-    @Override
-    public int read(long position, byte[] buffer, int offset, int length)
-      throws IOException {
-      // sanity checks
-      checkOpen();
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-      failures = 0;
-      long filelen = getFileLength();
-      if ((position < 0) || (position >= filelen)) {
-        return -1;
-      }
-      int realLen = length;
-      if ((position + length) > filelen) {
-        realLen = (int)(filelen - position);
-      }
-      
-      // determine the block and byte range within the block
-      // corresponding to position and realLen
-      List<LocatedBlock> blockRange = getBlockRange(position, realLen);
-      int remaining = realLen;
-      for (LocatedBlock blk : blockRange) {
-        long targetStart = position - blk.getStartOffset();
-        long bytesToRead = Math.min(remaining, blk.getBlockSize() - targetStart);
-        fetchBlockByteRange(blk, targetStart, 
-                            targetStart + bytesToRead - 1, buffer, offset);
-        remaining -= bytesToRead;
-        position += bytesToRead;
-        offset += bytesToRead;
-      }
-      assert remaining == 0 : "Wrong number of bytes read.";
-      if (stats != null) {
-        stats.incrementBytesRead(realLen);
-      }
-      return realLen;
-    }
-     
-    @Override
-    public long skip(long n) throws IOException {
-      if ( n > 0 ) {
-        long curPos = getPos();
-        long fileLen = getFileLength();
-        if( n+curPos > fileLen ) {
-          n = fileLen - curPos;
-        }
-        seek(curPos+n);
-        return n;
-      }
-      return n < 0 ? -1 : 0;
-    }
-
-    /**
-     * Seek to a new arbitrary location
-     */
-    @Override
-    public synchronized void seek(long targetPos) throws IOException {
-      if (targetPos > getFileLength()) {
-        throw new IOException("Cannot seek after EOF");
-      }
-      boolean done = false;
-      if (pos <= targetPos && targetPos <= blockEnd) {
-        //
-        // If this seek is to a positive position in the current
-        // block, and this piece of data might already be lying in
-        // the TCP buffer, then just eat up the intervening data.
-        //
-        int diff = (int)(targetPos - pos);
-        if (diff <= TCP_WINDOW_SIZE) {
-          try {
-            pos += blockReader.skip(diff);
-            if (pos == targetPos) {
-              done = true;
-            }
-          } catch (IOException e) {//make following read to retry
-            LOG.debug("Exception while seek to " + targetPos + " from "
-                      + currentBlock +" of " + src + " from " + currentNode + 
-                      ": " + StringUtils.stringifyException(e));
-          }
-        }
-      }
-      if (!done) {
-        pos = targetPos;
-        blockEnd = -1;
-      }
-    }
-
-    /**
-     * Same as {@link #seekToNewSource(long)} except that it does not exclude
-     * the current datanode and might connect to the same node.
-     */
-    private synchronized boolean seekToBlockSource(long targetPos)
-                                                   throws IOException {
-      currentNode = blockSeekTo(targetPos);
-      return true;
-    }
-    
-    /**
-     * Seek to given position on a node other than the current node.  If
-     * a node other than the current node is found, then returns true. 
-     * If another node could not be found, then returns false.
-     */
-    @Override
-    public synchronized boolean seekToNewSource(long targetPos) throws IOException {
-      boolean markedDead = deadNodes.containsKey(currentNode);
-      addToDeadNodes(currentNode);
-      DatanodeInfo oldNode = currentNode;
-      DatanodeInfo newNode = blockSeekTo(targetPos);
-      if (!markedDead) {
-        /* remove it from deadNodes. blockSeekTo could have cleared 
-         * deadNodes and added currentNode again. Thats ok. */
-        deadNodes.remove(oldNode);
-      }
-      if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
-        currentNode = newNode;
-        return true;
-      } else {
-        return false;
-      }
-    }
-        
-    /**
-     */
-    @Override
-    public synchronized long getPos() throws IOException {
-      return pos;
-    }
-
-    /** Return the size of the remaining available bytes
-     * if the size is less than or equal to {@link Integer#MAX_VALUE},
-     * otherwise, return {@link Integer#MAX_VALUE}.
-     */
-    @Override
-    public synchronized int available() throws IOException {
-      if (closed) {
-        throw new IOException("Stream closed");
-      }
-
-      final long remaining = getFileLength() - pos;
-      return remaining <= Integer.MAX_VALUE? (int)remaining: Integer.MAX_VALUE;
-    }
-
-    /**
-     * We definitely don't support marks
-     */
-    @Override
-    public boolean markSupported() {
-      return false;
-    }
-    @Override
-    public void mark(int readLimit) {
-    }
-    @Override
-    public void reset() throws IOException {
-      throw new IOException("Mark/reset not supported");
-    }
-  }
-    
-  /**
-   * The Hdfs implementation of {@link FSDataInputStream}
-   */
-  public static class DFSDataInputStream extends FSDataInputStream {
-    public DFSDataInputStream(DFSInputStream in)
-      throws IOException {
-      super(in);
-    }
-      
-    /**
-     * Returns the datanode from which the stream is currently reading.
-     */
-    public DatanodeInfo getCurrentDatanode() {
-      return ((DFSInputStream)in).getCurrentDatanode();
-    }
-      
-    /**
-     * Returns the block containing the target position. 
-     */
-    public Block getCurrentBlock() {
-      return ((DFSInputStream)in).getCurrentBlock();
-    }
-
-    /**
-     * Return collection of blocks that has already been located.
-     */
-    synchronized List<LocatedBlock> getAllBlocks() throws IOException {
-      return ((DFSInputStream)in).getAllBlocks();
-    }
-
-    /**
-     * @return The visible length of the file.
-     */
-    public long getVisibleLength() throws IOException {
-      return ((DFSInputStream)in).getFileLength();
-    }
-  }
-
-  /****************************************************************
-   * DFSOutputStream creates files from a stream of bytes.
-   *
-   * The client application writes data that is cached internally by
-   * this stream. Data is broken up into packets, each packet is
-   * typically 64K in size. A packet comprises of chunks. Each chunk
-   * is typically 512 bytes and has an associated checksum with it.
-   *
-   * When a client application fills up the currentPacket, it is
-   * enqueued into dataQueue.  The DataStreamer thread picks up
-   * packets from the dataQueue, sends it to the first datanode in
-   * the pipeline and moves it from the dataQueue to the ackQueue.
-   * The ResponseProcessor receives acks from the datanodes. When an
-   * successful ack for a packet is received from all datanodes, the
-   * ResponseProcessor removes the corresponding packet from the
-   * ackQueue.
-   *
-   * In case of error, all outstanding packets and moved from
-   * ackQueue. A new pipeline is setup by eliminating the bad
-   * datanode from the original pipeline. The DataStreamer now
-   * starts sending packets from the dataQueue.
-  ****************************************************************/
-  class DFSOutputStream extends FSOutputSummer implements Syncable {
-    private static final int MAX_PACKETS = 80; // each packet 64K, total 5MB
-    private Socket s;
-    // closed is accessed by different threads under different locks.
-    private volatile boolean closed = false;
-  
-    private String src;
-    private final long blockSize;
-    private final DataChecksum checksum;
-    // both dataQueue and ackQueue are protected by dataQueue lock
-    private final LinkedList<Packet> dataQueue = new LinkedList<Packet>();
-    private final LinkedList<Packet> ackQueue = new LinkedList<Packet>();
-    private Packet currentPacket = null;
-    private DataStreamer streamer;
-    private long currentSeqno = 0;
-    private long bytesCurBlock = 0; // bytes writen in current block
-    private int packetSize = 0; // write packet size, including the header.
-    private int chunksPerPacket = 0;
-    private volatile IOException lastException = null;
-    private long artificialSlowdown = 0;
-    private long lastFlushOffset = -1; // offset when flush was invoked
-    //persist blocks on namenode
-    private final AtomicBoolean persistBlocks = new AtomicBoolean(false);
-    private volatile boolean appendChunk = false;   // appending to existing partial block
-    private long initialFileSize = 0; // at time of file open
-    private Progressable progress;
-    
-    private class Packet {
-      ByteBuffer buffer;           // only one of buf and buffer is non-null
-      byte[]  buf;
-      long    seqno;               // sequencenumber of buffer in block
-      long    offsetInBlock;       // offset in block
-      boolean lastPacketInBlock;   // is this the last packet in block?
-      int     numChunks;           // number of chunks currently in packet
-      int     maxChunks;           // max chunks in packet
-      int     dataStart;
-      int     dataPos;
-      int     checksumStart;
-      int     checksumPos;      
-      private static final long HEART_BEAT_SEQNO = -1L;
-
-      /**
-       *  create a heartbeat packet
-       */
-      Packet() {
-        this.lastPacketInBlock = false;
-        this.numChunks = 0;
-        this.offsetInBlock = 0;
-        this.seqno = HEART_BEAT_SEQNO;
-        
-        buffer = null;
-        int packetSize = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
-        buf = new byte[packetSize];
-        
-        checksumStart = dataStart = packetSize;
-        checksumPos = checksumStart;
-        dataPos = dataStart;
-        maxChunks = 0;
-      }
-      
-      // create a new packet
-      Packet(int pktSize, int chunksPerPkt, long offsetInBlock) {
-        this.lastPacketInBlock = false;
-        this.numChunks = 0;
-        this.offsetInBlock = offsetInBlock;
-        this.seqno = currentSeqno;
-        currentSeqno++;
-        
-        buffer = null;
-        buf = new byte[pktSize];
-        
-        checksumStart = DataNode.PKT_HEADER_LEN + SIZE_OF_INTEGER;
-        checksumPos = checksumStart;
-        dataStart = checksumStart + chunksPerPkt * checksum.getChecksumSize();
-        dataPos = dataStart;
-        maxChunks = chunksPerPkt;
-      }
-
-      void writeData(byte[] inarray, int off, int len) {
-        if ( dataPos + len > buf.length) {
-          throw new BufferOverflowException();
-        }
-        System.arraycopy(inarray, off, buf, dataPos, len);
-        dataPos += len;
-      }
-  
-      void  writeChecksum(byte[] inarray, int off, int len) {
-        if (checksumPos + len > dataStart) {
-          throw new BufferOverflowException();
-        }
-        System.arraycopy(inarray, off, buf, checksumPos, len);
-        checksumPos += len;
-      }
-      
-      /**
-       * Returns ByteBuffer that contains one full packet, including header.
-       */
-      ByteBuffer getBuffer() {
-        /* Once this is called, no more data can be added to the packet.
-         * setting 'buf' to null ensures that.
-         * This is called only when the packet is ready to be sent.
-         */
-        if (buffer != null) {
-          return buffer;
-        }
-        
-        //prepare the header and close any gap between checksum and data.
-        
-        int dataLen = dataPos - dataStart;
-        int checksumLen = checksumPos - checksumStart;
-        
-        if (checksumPos != dataStart) {
-          /* move the checksum to cover the gap.
-           * This can happen for the last packet.
-           */
-          System.arraycopy(buf, checksumStart, buf, 
-                           dataStart - checksumLen , checksumLen); 
-        }
-        
-        int pktLen = SIZE_OF_INTEGER + dataLen + checksumLen;
-        
-        //normally dataStart == checksumPos, i.e., offset is zero.
-        buffer = ByteBuffer.wrap(buf, dataStart - checksumPos,
-                                 DataNode.PKT_HEADER_LEN + pktLen);
-        buf = null;
-        buffer.mark();
-        
-        /* write the header and data length.
-         * The format is described in comment before DataNode.BlockSender
-         */
-        buffer.putInt(pktLen);  // pktSize
-        buffer.putLong(offsetInBlock); 
-        buffer.putLong(seqno);
-        buffer.put((byte) ((lastPacketInBlock) ? 1 : 0));
-        //end of pkt header
-        buffer.putInt(dataLen); // actual data length, excluding checksum.
-        
-        buffer.reset();
-        return buffer;
-      }
-      
-      // get the packet's last byte's offset in the block
-      long getLastByteOffsetBlock() {
-        return offsetInBlock + dataPos - dataStart;
-      }
-      
-      /**
-       * Check if this packet is a heart beat packet
-       * @return true if the sequence number is HEART_BEAT_SEQNO
-       */
-      private boolean isHeartbeatPacket() {
-        return seqno == HEART_BEAT_SEQNO;
-      }
-      
-      public String toString() {
-        return "packet seqno:" + this.seqno +
-        " offsetInBlock:" + this.offsetInBlock + 
-        " lastPacketInBlock:" + this.lastPacketInBlock +
-        " lastByteOffsetInBlock: " + this.getLastByteOffsetBlock();
-      }
-    }
-  
-    //
-    // The DataStreamer class is responsible for sending data packets to the
-    // datanodes in the pipeline. It retrieves a new blockid and block locations
-    // from the namenode, and starts streaming packets to the pipeline of
-    // Datanodes. Every packet has a sequence number associated with
-    // it. When all the packets for a block are sent out and acks for each
-    // if them are received, the DataStreamer closes the current block.
-    //
-    class DataStreamer extends Daemon {
-      private volatile boolean streamerClosed = false;
-      private Block block; // its length is number of bytes acked
-      private BlockAccessToken accessToken;
-      private DataOutputStream blockStream;
-      private DataInputStream blockReplyStream;
-      private ResponseProcessor response = null;
-      private volatile DatanodeInfo[] nodes = null; // list of targets for current block
-      private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
-      volatile boolean hasError = false;
-      volatile int errorIndex = -1;
-      private BlockConstructionStage stage;  // block construction stage
-      private long bytesSent = 0; // number of bytes that've been sent
-
-      /**
-       * Default construction for file create
-       */
-      private DataStreamer() {
-        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      }
-      
-      /**
-       * Construct a data streamer for append
-       * @param lastBlock last block of the file to be appended
-       * @param stat status of the file to be appended
-       * @param bytesPerChecksum number of bytes per checksum
-       * @throws IOException if error occurs
-       */
-      private DataStreamer(LocatedBlock lastBlock, FileStatus stat,
-          int bytesPerChecksum) throws IOException {
-        stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-        block = lastBlock.getBlock();
-        bytesSent = block.getNumBytes();
-        accessToken = lastBlock.getAccessToken();
-        long usedInLastBlock = stat.getLen() % blockSize;
-        int freeInLastBlock = (int)(blockSize - usedInLastBlock);
-
-        // calculate the amount of free space in the pre-existing 
-        // last crc chunk
-        int usedInCksum = (int)(stat.getLen() % bytesPerChecksum);
-        int freeInCksum = bytesPerChecksum - usedInCksum;
-
-        // if there is space in the last block, then we have to 
-        // append to that block
-        if (freeInLastBlock == blockSize) {
-          throw new IOException("The last block for file " + 
-              src + " is full.");
-        }
-
-        if (usedInCksum > 0 && freeInCksum > 0) {
-          // if there is space in the last partial chunk, then 
-          // setup in such a way that the next packet will have only 
-          // one chunk that fills up the partial chunk.
-          //
-          computePacketChunkSize(0, freeInCksum);
-          resetChecksumChunk(freeInCksum);
-          appendChunk = true;
-        } else {
-          // if the remaining space in the block is smaller than 
-          // that expected size of of a packet, then create 
-          // smaller size packet.
-          //
-          computePacketChunkSize(Math.min(writePacketSize, freeInLastBlock), 
-              bytesPerChecksum);
-        }
-
-        // setup pipeline to append to the last block XXX retries??
-        nodes = lastBlock.getLocations();
-        errorIndex = -1;   // no errors yet.
-        if (nodes.length < 1) {
-          throw new IOException("Unable to retrieve blocks locations " +
-              " for last block " + block +
-              "of file " + src);
-
-        }
-      }
-      
-      /**
-       * Initialize for data streaming
-       */
-      private void initDataStreaming() {
-        this.setName("DataStreamer for file " + src +
-            " block " + block);
-        response = new ResponseProcessor(nodes);
-        response.start();
-        stage = BlockConstructionStage.DATA_STREAMING;
-      }
-      
-      private void endBlock() {
-        LOG.debug("Closing old block " + block);
-        this.setName("DataStreamer for file " + src);
-        closeResponder();
-        closeStream();
-        nodes = null;
-        stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
-      }
-      
-      /*
-       * streamer thread is the only thread that opens streams to datanode, 
-       * and closes them. Any error recovery is also done by this thread.
-       */
-      public void run() {
-        long lastPacket = System.currentTimeMillis();
-        while (!streamerClosed && clientRunning) {
-
-          // if the Responder encountered an error, shutdown Responder
-          if (hasError && response != null) {
-            try {
-              response.close();
-              response.join();
-              response = null;
-            } catch (InterruptedException  e) {
-            }
-          }
-
-          Packet one = null;
-
-          try {
-            // process datanode IO errors if any
-            boolean doSleep = false;
-            if (hasError && errorIndex>=0) {
-              doSleep = processDatanodeError();
-            }
-
-            synchronized (dataQueue) {
-              // wait for a packet to be sent.
-              long now = System.currentTimeMillis();
-              while ((!streamerClosed && !hasError && clientRunning 
-                  && dataQueue.size() == 0 && 
-                  (stage != BlockConstructionStage.DATA_STREAMING || 
-                   stage == BlockConstructionStage.DATA_STREAMING && 
-                   now - lastPacket < socketTimeout/2)) || doSleep ) {
-                long timeout = socketTimeout/2 - (now-lastPacket);
-                timeout = timeout <= 0 ? 1000 : timeout;
-                timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
-                   timeout : 1000;
-                try {
-                  dataQueue.wait(timeout);
-                } catch (InterruptedException  e) {
-                }
-                doSleep = false;
-                now = System.currentTimeMillis();
-              }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
-              }
-              // get packet to be sent.
-              if (dataQueue.isEmpty()) {
-                one = new Packet();  // heartbeat packet
-              } else {
-                one = dataQueue.getFirst(); // regular data packet
-              }
-            }
-
-            // get new block from namenode.
-            if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
-              LOG.debug("Allocating new block");
-              nodes = nextBlockOutputStream(src);
-              initDataStreaming();
-            } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
-              LOG.debug("Append to block " + block);
-              setupPipelineForAppendOrRecovery();
-              initDataStreaming();
-            }
-
-            long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
-            if (lastByteOffsetInBlock > blockSize) {
-              throw new IOException("BlockSize " + blockSize +
-                  " is smaller than data size. " +
-                  " Offset of packet in block " + 
-                  lastByteOffsetInBlock +
-                  " Aborting file " + src);
-            }
-
-            if (one.lastPacketInBlock) {
-              // wait for all data packets have been successfully acked
-              synchronized (dataQueue) {
-                while (!streamerClosed && !hasError && 
-                    ackQueue.size() != 0 && clientRunning) {
-                  try {
-                    // wait for acks to arrive from datanodes
-                    dataQueue.wait(1000);
-                  } catch (InterruptedException  e) {
-                  }
-                }
-              }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
-              }
-              stage = BlockConstructionStage.PIPELINE_CLOSE;
-            }
-            
-            // send the packet
-            ByteBuffer buf = one.getBuffer();
-
-            synchronized (dataQueue) {
-              // move packet from dataQueue to ackQueue
-              if (!one.isHeartbeatPacket()) {
-                dataQueue.removeFirst();
-                ackQueue.addLast(one);
-                dataQueue.notifyAll();
-              }
-            }
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("DataStreamer block " + block +
-                  " sending packet " + one);
-            }
-
-            // write out data to remote datanode
-            blockStream.write(buf.array(), buf.position(), buf.remaining());
-            blockStream.flush();
-            lastPacket = System.currentTimeMillis();
-            
-            if (one.isHeartbeatPacket()) {  //heartbeat packet
-            }
-            
-            // update bytesSent
-            long tmpBytesSent = one.getLastByteOffsetBlock();
-            if (bytesSent < tmpBytesSent) {
-              bytesSent = tmpBytesSent;
-            }
-
-            if (streamerClosed || hasError || !clientRunning) {
-              continue;
-            }
-
-            // Is this block full?
-            if (one.lastPacketInBlock) {
-              // wait for the close packet has been acked
-              synchronized (dataQueue) {
-                while (!streamerClosed && !hasError && 
-                    ackQueue.size() != 0 && clientRunning) {
-                  dataQueue.wait(1000);// wait for acks to arrive from datanodes
-                }
-              }
-              if (streamerClosed || hasError || !clientRunning) {
-                continue;
-              }
-
-              endBlock();
-            }
-            if (progress != null) { progress.progress(); }
-
-            // This is used by unit test to trigger race conditions.
-            if (artificialSlowdown != 0 && clientRunning) {
-              Thread.sleep(artificialSlowdown); 
-            }
-          } catch (Throwable e) {
-            LOG.warn("DataStreamer Exception: " + 
-                StringUtils.stringifyException(e));
-            if (e instanceof IOException) {
-              setLastException((IOException)e);
-            }
-            hasError = true;
-            if (errorIndex == -1) { // not a datanode error
-              streamerClosed = true;
-            }
-          }
-        }
-        closeInternal();
-      }
-
-      private void closeInternal() {
-        closeResponder();       // close and join
-        closeStream();
-        streamerClosed = true;
-        closed = true;
-        synchronized (dataQueue) {
-          dataQueue.notifyAll();
-        }
-      }
-
-      /*
-       * close both streamer and DFSOutputStream, should be called only 
-       * by an external thread and only after all data to be sent has 
-       * been flushed to datanode.
-       * 
-       * Interrupt this data streamer if force is true
-       * 
-       * @param force if this data stream is forced to be closed 
-       */
-      void close(boolean force) {
-        streamerClosed = true;
-        synchronized (dataQueue) {
-          dataQueue.notifyAll();
-        }
-        if (force) {
-          this.interrupt();
-        }
-      }
-
-      private void closeResponder() {
-        if (response != null) {
-          try {
-            response.close();
-            response.join();
-          } catch (InterruptedException  e) {
-          } finally {
-            response = null;
-          }
-        }
-      }
-
-      private void closeStream() {
-        if (blockStream != null) {
-          try {
-            blockStream.close();
-          } catch (IOException e) {
-          } finally {
-            blockStream = null;
-          }
-        }
-        if (blockReplyStream != null) {
-          try {
-            blockReplyStream.close();
-          } catch (IOException e) {
-          } finally {
-            blockReplyStream = null;
-          }
-        }
-      }
-
-      //
-      // Processes reponses from the datanodes.  A packet is removed 
-      // from the ackQueue when its response arrives.
-      //
-      private class ResponseProcessor extends Daemon {
-
-        private volatile boolean responderClosed = false;
-        private DatanodeInfo[] targets = null;
-        private boolean isLastPacketInBlock = false;
-
-        ResponseProcessor (DatanodeInfo[] targets) {
-          this.targets = targets;
-        }
-
-        public void run() {
-
-          this.setName("ResponseProcessor for block " + block);
-          PipelineAck ack = new PipelineAck();
-
-          while (!responderClosed && clientRunning && !isLastPacketInBlock) {
-            // process responses from datanodes.
-            try {
-              // read an ack from the pipeline
-              ack.readFields(blockReplyStream);
-              if (LOG.isDebugEnabled()) {
-                LOG.debug("DFSClient " + ack);
-              }
-              
-              long seqno = ack.getSeqno();
-              // processes response status from datanodes.
-              for (int i = ack.getNumOfReplies()-1; i >=0  && clientRunning; i--) {
-                final DataTransferProtocol.Status reply = ack.getReply(i);
-                if (reply != SUCCESS) {
-                  errorIndex = i; // first bad datanode
-                  throw new IOException("Bad response " + reply +
-                      " for block " + block +
-                      " from datanode " + 
-                      targets[i].getName());
-                }
-              }
-              
-              assert seqno != PipelineAck.UNKOWN_SEQNO : 
-                "Ack for unkown seqno should be a failed ack: " + ack;
-              if (seqno == Packet.HEART_BEAT_SEQNO) {  // a heartbeat ack
-                continue;
-              }
-
-              // a success ack for a data packet
-              Packet one = null;
-              synchronized (dataQueue) {
-                one = ackQueue.getFirst();
-              }
-              if (one.seqno != seqno) {
-                throw new IOException("Responseprocessor: Expecting seqno " +
-                                      " for block " + block +
-                                      one.seqno + " but received " + seqno);
-              }
-              isLastPacketInBlock = one.lastPacketInBlock;
-              // update bytesAcked
-              block.setNumBytes(one.getLastByteOffsetBlock());
-
-              synchronized (dataQueue) {
-                ackQueue.removeFirst();
-                dataQueue.notifyAll();
-              }
-            } catch (Exception e) {
-              if (!responderClosed) {
-                if (e instanceof IOException) {
-                  setLastException((IOException)e);
-                }
-                hasError = true;
-                errorIndex = errorIndex==-1 ? 0 : errorIndex;
-                synchronized (dataQueue) {
-                  dataQueue.notifyAll();
-                }
-                LOG.warn("DFSOutputStream ResponseProcessor exception " + 
-                    " for block " + block +
-                    StringUtils.stringifyException(e));
-                responderClosed = true;
-              }
-            }
-          }
-        }
-
-        void close() {
-          responderClosed = true;
-          this.interrupt();
-        }
-      }
-
-      // If this stream has encountered any errors so far, shutdown 
-      // threads and mark stream as closed. Returns true if we should
-      // sleep for a while after returning from this call.
-      //
-      private boolean processDatanodeError() throws IOException {
-        if (response != null) {
-          LOG.info("Error Recovery for block " + block +
-          " waiting for responder to exit. ");
-          return true;
-        }
-        closeStream();
-
-        // move packets from ack queue to front of the data queue
-        synchronized (dataQueue) {
-          dataQueue.addAll(0, ackQueue);
-          ackQueue.clear();
-        }
-
-        boolean doSleep = setupPipelineForAppendOrRecovery();
-        
-        if (!streamerClosed && clientRunning) {
-          if (stage == BlockConstructionStage.PIPELINE_CLOSE) {
-            synchronized (dataQueue) {
-              dataQueue.remove();  // remove the end of block packet
-              dataQueue.notifyAll();
-            }
-            endBlock();
-          } else {
-            initDataStreaming();
-          }
-        }
-        
-        return doSleep;
-      }
-
-
-      /**
-       * Open a DataOutputStream to a DataNode pipeline so that 
-       * it can be written to.
-       * This happens when a file is appended or data streaming fails
-       * It keeps on trying until a pipeline is setup
-       */
-      private boolean setupPipelineForAppendOrRecovery() throws IOException {
-        // check number of datanodes
-        if (nodes == null || nodes.length == 0) {
-          String msg = "Could not get block locations. " + "Source file \""
-              + src + "\" - Aborting...";
-          LOG.warn(msg);
-          setLastException(new IOException(msg));
-          streamerClosed = true;
-          return false;
-        }
-        
-        boolean success = false;
-        long newGS = 0L;
-        while (!success && !streamerClosed && clientRunning) {
-          boolean isRecovery = hasError;
-          // remove bad datanode from list of datanodes.
-          // If errorIndex was not set (i.e. appends), then do not remove 
-          // any datanodes
-          // 
-          if (errorIndex >= 0) {
-            StringBuilder pipelineMsg = new StringBuilder();
-            for (int j = 0; j < nodes.length; j++) {
-              pipelineMsg.append(nodes[j].getName());
-              if (j < nodes.length - 1) {
-                pipelineMsg.append(", ");
-              }
-            }
-            if (nodes.length <= 1) {
-              lastException = new IOException("All datanodes " + pipelineMsg
-                  + " are bad. Aborting...");
-              streamerClosed = true;
-              return false;
-            }
-            LOG.warn("Error Recovery for block " + block +
-                " in pipeline " + pipelineMsg + 
-                ": bad datanode " + nodes[errorIndex].getName());
-            DatanodeInfo[] newnodes = new DatanodeInfo[nodes.length-1];
-            System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
-            System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
-                newnodes.length-errorIndex);
-            nodes = newnodes;
-            this.hasError = false;
-            lastException = null;
-            errorIndex = -1;
-          }
-
-          // get a new generation stamp and an access token
-          LocatedBlock lb = namenode.updateBlockForPipeline(block, clientName);
-          newGS = lb.getBlock().getGenerationStamp();
-          accessToken = lb.getAccessToken();
-          
-          // set up the pipeline again with the remaining nodes
-          success = createBlockOutputStream(nodes, newGS, isRecovery);
-        }
-
-        if (success) {
-          // update pipeline at the namenode
-          Block newBlock = new Block(
-              block.getBlockId(), block.getNumBytes(), newGS);
-          namenode.updatePipeline(clientName, block, newBlock, nodes);
-          // update client side generation stamp
-          block = newBlock;
-        }
-        return false; // do not sleep, continue processing
-      }
-
-      /**
-       * Open a DataOutputStream to a DataNode so that it can be written to.
-       * This happens when a file is created and each time a new block is allocated.
-       * Must get block ID and the IDs of the destinations from the namenode.
-       * Returns the list of target datanodes.
-       */
-      private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
-        LocatedBlock lb = null;
-        boolean retry = false;
-        DatanodeInfo[] nodes = null;
-        int count = conf.getInt("dfs.client.block.write.retries", 3);
-        boolean success = false;
-        do {
-          hasError = false;
-          lastException = null;
-          errorIndex = -1;
-          retry = false;
-          success = false;
-
-          long startTime = System.currentTimeMillis();
-          DatanodeInfo[] w = excludedNodes.toArray(
-              new DatanodeInfo[excludedNodes.size()]);
-          lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
-          block = lb.getBlock();
-          block.setNumBytes(0);
-          accessToken = lb.getAccessToken();
-          nodes = lb.getLocations();
-
-          //
-          // Connect to first DataNode in the list.
-          //
-          success = createBlockOutputStream(nodes, 0L, false);
-
-          if (!success) {
-            LOG.info("Abandoning block " + block);
-            namenode.abandonBlock(block, src, clientName);
-            block = null;
-
-            LOG.debug("Excluding datanode " + nodes[errorIndex]);
-            excludedNodes.add(nodes[errorIndex]);
-
-            // Connection failed.  Let's wait a little bit and retry
-            retry = true;
-            try {
-              if (System.currentTimeMillis() - startTime > 5000) {
-                LOG.info("Waiting to find target node: " + nodes[0].getName());
-              }
-              //TODO fix this timout. Extract it o a constant, maybe make it available from conf
-              Thread.sleep(6000);
-            } catch (InterruptedException iex) {
-            }
-          }
-        } while (retry && --count >= 0);
-
-        if (!success) {
-          throw new IOException("Unable to create new block.");
-        }
-        return nodes;
-      }
-
-      // connects to the first datanode in the pipeline
-      // Returns true if success, otherwise return failure.
-      //
-      private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
-          boolean recoveryFlag) {
-        DataTransferProtocol.Status pipelineStatus = SUCCESS;
-        String firstBadLink = "";
-        if (LOG.isDebugEnabled()) {
-          for (int i = 0; i < nodes.length; i++) {
-            LOG.debug("pipeline = " + nodes[i].getName());
-          }
-        }
-
-        // persist blocks on namenode on next flush
-        persistBlocks.set(true);
-
-        try {
-          LOG.debug("Connecting to " + nodes[0].getName());
-          InetSocketAddress target = NetUtils.createSocketAddr(nodes[0].getName());
-          s = socketFactory.createSocket();
-          int timeoutValue = (socketTimeout > 0) ? (HdfsConstants.READ_TIMEOUT_EXTENSION
-              * nodes.length + socketTimeout) : 0;
-          NetUtils.connect(s, target, timeoutValue);
-          s.setSoTimeout(timeoutValue);
-          s.setSendBufferSize(DEFAULT_DATA_SOCKET_SIZE);
-          LOG.debug("Send buf size " + s.getSendBufferSize());
-          long writeTimeout = (datanodeWriteTimeout > 0) ? 
-              (HdfsConstants.WRITE_TIMEOUT_EXTENSION * nodes.length +
-                  datanodeWriteTimeout) : 0;
-
-          //
-          // Xmit header info to datanode
-          //
-          DataOutputStream out = new DataOutputStream(new BufferedOutputStream(
-              NetUtils.getOutputStream(s, writeTimeout),
-              DataNode.SMALL_BUFFER_SIZE));
-          blockReplyStream = new DataInputStream(NetUtils.getInputStream(s));
-
-          // send the request
-          DataTransferProtocol.Sender.opWriteBlock(out,
-              block.getBlockId(), block.getGenerationStamp(),
-              nodes.length, recoveryFlag?stage.getRecoveryStage():stage, newGS,
-              block.getNumBytes(), bytesSent, clientName, null, nodes, accessToken);
-          checksum.writeHeader(out);
-          out.flush();
-
-          // receive ack for connect
-          pipelineStatus = DataTransferProtocol.Status.read(blockReplyStream);
-          firstBadLink = Text.readString(blockReplyStream);
-          if (pipelineStatus != SUCCESS) {
-            if (pipelineStatus == ERROR_ACCESS_TOKEN) {
-              throw new InvalidAccessTokenException(
-                  "Got access token error for connect ack with firstBadLink as "
-                      + firstBadLink);
-            } else {
-              throw new IOException("Bad connect ack with firstBadLink as "
-                  + firstBadLink);
-            }
-          }
-
-          blockStream = out;
-          return true; // success
-

[... 537 lines stripped ...]


Mime
View raw message