hadoop-hdfs-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From szets...@apache.org
Subject svn commit: r1555021 [2/15] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache...
Date Fri, 03 Jan 2014 07:27:01 GMT
Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Fri Jan  3 07:26:52 2014
@@ -17,25 +17,30 @@
  */
 package org.apache.hadoop.hdfs;
 
-import java.io.BufferedInputStream;
-import java.io.DataInputStream;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import org.apache.hadoop.conf.Configuration;
+import java.nio.channels.FileChannel;
+import java.util.EnumSet;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
+import org.apache.hadoop.hdfs.DFSClient.Conf;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
+import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.util.DirectBufferPool;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * BlockReaderLocal enables local short circuited reads. If the DFS client is on
  * the same machine as the datanode, then the client can read files directly
@@ -55,446 +60,581 @@ import org.apache.hadoop.util.DataChecks
 class BlockReaderLocal implements BlockReader {
   static final Log LOG = LogFactory.getLog(BlockReaderLocal.class);
 
-  private final FileInputStream dataIn; // reader for the data file
-  private final FileInputStream checksumIn;   // reader for the checksum file
-  private final boolean verifyChecksum;
+  private static DirectBufferPool bufferPool = new DirectBufferPool();
+
+  public static class Builder {
+    private int bufferSize;
+    private boolean verifyChecksum;
+    private int maxReadahead;
+    private String filename;
+    private FileInputStream streams[];
+    private long dataPos;
+    private DatanodeID datanodeID;
+    private FileInputStreamCache fisCache;
+    private boolean mlocked;
+    private BlockMetadataHeader header;
+    private ExtendedBlock block;
+
+    public Builder(Conf conf) {
+      this.maxReadahead = Integer.MAX_VALUE;
+      this.verifyChecksum = !conf.skipShortCircuitChecksums;
+      this.bufferSize = conf.shortCircuitBufferSize;
+    }
+
+    public Builder setVerifyChecksum(boolean verifyChecksum) {
+      this.verifyChecksum = verifyChecksum;
+      return this;
+    }
+
+    public Builder setCachingStrategy(CachingStrategy cachingStrategy) {
+      long readahead = cachingStrategy.getReadahead() != null ?
+          cachingStrategy.getReadahead() :
+              DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT;
+      this.maxReadahead = (int)Math.min(Integer.MAX_VALUE, readahead);
+      return this;
+    }
+
+    public Builder setFilename(String filename) {
+      this.filename = filename;
+      return this;
+    }
+
+    public Builder setStreams(FileInputStream streams[]) {
+      this.streams = streams;
+      return this;
+    }
+
+    public Builder setStartOffset(long startOffset) {
+      this.dataPos = Math.max(0, startOffset);
+      return this;
+    }
+
+    public Builder setDatanodeID(DatanodeID datanodeID) {
+      this.datanodeID = datanodeID;
+      return this;
+    }
+
+    public Builder setFileInputStreamCache(FileInputStreamCache fisCache) {
+      this.fisCache = fisCache;
+      return this;
+    }
+
+    public Builder setMlocked(boolean mlocked) {
+      this.mlocked = mlocked;
+      return this;
+    }
+
+    public Builder setBlockMetadataHeader(BlockMetadataHeader header) {
+      this.header = header;
+      return this;
+    }
+
+    public Builder setBlock(ExtendedBlock block) {
+      this.block = block;
+      return this;
+    }
+
+    public BlockReaderLocal build() {
+      Preconditions.checkNotNull(streams);
+      Preconditions.checkArgument(streams.length == 2);
+      Preconditions.checkNotNull(header);
+      return new BlockReaderLocal(this);
+    }
+  }
+
+  private boolean closed = false;
 
   /**
-   * Offset from the most recent chunk boundary at which the next read should
-   * take place. Is only set to non-zero at construction time, and is
-   * decremented (usually to 0) by subsequent reads. This avoids having to do a
-   * checksum read at construction to position the read cursor correctly.
+   * Pair of streams for this block.
    */
-  private int offsetFromChunkBoundary;
-  
-  private byte[] skipBuf = null;
+  private final FileInputStream streams[];
 
   /**
-   * Used for checksummed reads that need to be staged before copying to their
-   * output buffer because they are either a) smaller than the checksum chunk
-   * size or b) issued by the slower read(byte[]...) path
+   * The data FileChannel.
    */
-  private ByteBuffer slowReadBuff = null;
-  private ByteBuffer checksumBuff = null;
-  private DataChecksum checksum;
+  private final FileChannel dataIn;
 
-  private static DirectBufferPool bufferPool = new DirectBufferPool();
+  /**
+   * The next place we'll read from in the block data FileChannel.
+   *
+   * If data is buffered in dataBuf, this offset will be larger than the
+   * offset of the next byte which a read() operation will give us.
+   */
+  private long dataPos;
+
+  /**
+   * The Checksum FileChannel.
+   */
+  private final FileChannel checksumIn;
+  
+  /**
+   * Checksum type and size.
+   */
+  private final DataChecksum checksum;
 
-  private final int bytesPerChecksum;
-  private final int checksumSize;
+  /**
+   * If false, we will always skip the checksum.
+   */
+  private final boolean verifyChecksum;
+
+  /**
+   * If true, this block is mlocked on the DataNode.
+   */
+  private final AtomicBoolean mlocked;
 
-  /** offset in block where reader wants to actually read */
-  private long startOffset;
+  /**
+   * Name of the block, for logging purposes.
+   */
   private final String filename;
 
+  /**
+   * DataNode which contained this block.
+   */
   private final DatanodeID datanodeID;
+  
+  /**
+   * Block ID and Block Pool ID.
+   */
   private final ExtendedBlock block;
   
+  /**
+   * Cache of Checksum#bytesPerChecksum.
+   */
+  private int bytesPerChecksum;
+
+  /**
+   * Cache of Checksum#checksumSize.
+   */
+  private int checksumSize;
+
+  /**
+   * FileInputStream cache to return the streams to upon closing,
+   * or null if we should just close them unconditionally.
+   */
   private final FileInputStreamCache fisCache;
+
+  /**
+   * Maximum number of chunks to allocate.
+   *
+   * This is used to allocate dataBuf and checksumBuf, in the event that
+   * we need them.
+   */
+  private final int maxAllocatedChunks;
+
+  /**
+   * True if zero readahead was requested.
+   */
+  private final boolean zeroReadaheadRequested;
+
+  /**
+   * Maximum amount of readahead we'll do.  This will always be at least the,
+   * size of a single chunk, even if {@link zeroReadaheadRequested} is true.
+   * The reason is because we need to do a certain amount of buffering in order
+   * to do checksumming.
+   * 
+   * This determines how many bytes we'll use out of dataBuf and checksumBuf.
+   * Why do we allocate buffers, and then (potentially) only use part of them?
+   * The rationale is that allocating a lot of buffers of different sizes would
+   * make it very difficult for the DirectBufferPool to re-use buffers. 
+   */
+  private int maxReadaheadLength;
+
   private ClientMmap clientMmap;
-  private boolean mmapDisabled;
-  
-  private static int getSlowReadBufferNumChunks(int bufSize,
-      int bytesPerChecksum) {
-    if (bufSize < bytesPerChecksum) {
-      throw new IllegalArgumentException("Configured BlockReaderLocal buffer size (" +
-          bufSize + ") is not large enough to hold a single chunk (" +
-          bytesPerChecksum +  "). Please configure " +
-          DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_BUFFER_SIZE_KEY + " appropriately");
-    }
-
-    // Round down to nearest chunk size
-    return bufSize / bytesPerChecksum;
-  }
-
-  public BlockReaderLocal(DFSClient.Conf conf, String filename,
-      ExtendedBlock block, long startOffset, long length,
-      FileInputStream dataIn, FileInputStream checksumIn,
-      DatanodeID datanodeID, boolean verifyChecksum,
-      FileInputStreamCache fisCache) throws IOException {
-    this.dataIn = dataIn;
-    this.checksumIn = checksumIn;
-    this.startOffset = Math.max(startOffset, 0);
-    this.filename = filename;
-    this.datanodeID = datanodeID;
-    this.block = block;
-    this.fisCache = fisCache;
-    this.clientMmap = null;
-    this.mmapDisabled = false;
-
-    // read and handle the common header here. For now just a version
-    checksumIn.getChannel().position(0);
-    BlockMetadataHeader header = BlockMetadataHeader
-        .readHeader(new DataInputStream(
-            new BufferedInputStream(checksumIn,
-                BlockMetadataHeader.getHeaderSize())));
-    short version = header.getVersion();
-    if (version != BlockMetadataHeader.VERSION) {
-      throw new IOException("Wrong version (" + version + ") of the " +
-          "metadata file for " + filename + ".");
-    }
-    this.verifyChecksum = verifyChecksum && !conf.skipShortCircuitChecksums;
-    long firstChunkOffset;
-    if (this.verifyChecksum) {
-      this.checksum = header.getChecksum();
-      this.bytesPerChecksum = this.checksum.getBytesPerChecksum();
-      this.checksumSize = this.checksum.getChecksumSize();
-      firstChunkOffset = startOffset
-          - (startOffset % checksum.getBytesPerChecksum());
-      this.offsetFromChunkBoundary = (int) (startOffset - firstChunkOffset);
-
-      int chunksPerChecksumRead = getSlowReadBufferNumChunks(
-          conf.shortCircuitBufferSize, bytesPerChecksum);
-      slowReadBuff = bufferPool.getBuffer(bytesPerChecksum * chunksPerChecksumRead);
-      checksumBuff = bufferPool.getBuffer(checksumSize * chunksPerChecksumRead);
-      // Initially the buffers have nothing to read.
-      slowReadBuff.flip();
-      checksumBuff.flip();
-      long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
-      IOUtils.skipFully(checksumIn, checkSumOffset);
+
+  /**
+   * Buffers data starting at the current dataPos and extending on
+   * for dataBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer dataBuf;
+
+  /**
+   * Buffers checksums starting at the current checksumPos and extending on
+   * for checksumBuf.limit().
+   *
+   * This may be null if we don't need it.
+   */
+  private ByteBuffer checksumBuf;
+
+  private boolean mmapDisabled = false;
+
+  private BlockReaderLocal(Builder builder) {
+    this.streams = builder.streams;
+    this.dataIn = builder.streams[0].getChannel();
+    this.dataPos = builder.dataPos;
+    this.checksumIn = builder.streams[1].getChannel();
+    this.checksum = builder.header.getChecksum();
+    this.verifyChecksum = builder.verifyChecksum &&
+        (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL);
+    this.mlocked = new AtomicBoolean(builder.mlocked);
+    this.filename = builder.filename;
+    this.datanodeID = builder.datanodeID;
+    this.fisCache = builder.fisCache;
+    this.block = builder.block;
+    this.bytesPerChecksum = checksum.getBytesPerChecksum();
+    this.checksumSize = checksum.getChecksumSize();
+
+    this.maxAllocatedChunks = (bytesPerChecksum == 0) ? 0 :
+        ((builder.bufferSize + bytesPerChecksum - 1) / bytesPerChecksum);
+    // Calculate the effective maximum readahead.
+    // We can't do more readahead than there is space in the buffer.
+    int maxReadaheadChunks = (bytesPerChecksum == 0) ? 0 :
+        ((Math.min(builder.bufferSize, builder.maxReadahead) +
+            bytesPerChecksum - 1) / bytesPerChecksum);
+    if (maxReadaheadChunks == 0) {
+      this.zeroReadaheadRequested = true;
+      maxReadaheadChunks = 1;
     } else {
-      firstChunkOffset = startOffset;
-      this.checksum = null;
-      this.bytesPerChecksum = 0;
-      this.checksumSize = 0;
-      this.offsetFromChunkBoundary = 0;
+      this.zeroReadaheadRequested = false;
     }
-    
-    boolean success = false;
+    this.maxReadaheadLength = maxReadaheadChunks * bytesPerChecksum;
+  }
+
+  private synchronized void createDataBufIfNeeded() {
+    if (dataBuf == null) {
+      dataBuf = bufferPool.getBuffer(maxAllocatedChunks * bytesPerChecksum);
+      dataBuf.position(0);
+      dataBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeDataBufIfExists() {
+    if (dataBuf != null) {
+      // When disposing of a dataBuf, we have to move our stored file index
+      // backwards.
+      dataPos -= dataBuf.remaining();
+      dataBuf.clear();
+      bufferPool.returnBuffer(dataBuf);
+      dataBuf = null;
+    }
+  }
+
+  private synchronized void createChecksumBufIfNeeded() {
+    if (checksumBuf == null) {
+      checksumBuf = bufferPool.getBuffer(maxAllocatedChunks * checksumSize);
+      checksumBuf.position(0);
+      checksumBuf.limit(0);
+    }
+  }
+
+  private synchronized void freeChecksumBufIfExists() {
+    if (checksumBuf != null) {
+      checksumBuf.clear();
+      bufferPool.returnBuffer(checksumBuf);
+      checksumBuf = null;
+    }
+  }
+
+  private synchronized int drainDataBuf(ByteBuffer buf)
+      throws IOException {
+    if (dataBuf == null) return 0;
+    int oldLimit = dataBuf.limit();
+    int nRead = Math.min(dataBuf.remaining(), buf.remaining());
+    if (nRead == 0) return 0;
     try {
-      // Reposition both input streams to the beginning of the chunk
-      // containing startOffset
-      this.dataIn.getChannel().position(firstChunkOffset);
-      success = true;
+      dataBuf.limit(dataBuf.position() + nRead);
+      buf.put(dataBuf);
     } finally {
-      if (success) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("Created BlockReaderLocal for file " + filename
-              + " block " + block + " in datanode " + datanodeID);
-        }
-      } else {
-        if (slowReadBuff != null) bufferPool.returnBuffer(slowReadBuff);
-        if (checksumBuff != null) bufferPool.returnBuffer(checksumBuff);
-      }
+      dataBuf.limit(oldLimit);
     }
+    return nRead;
   }
 
   /**
-   * Reads bytes into a buffer until EOF or the buffer's limit is reached
+   * Read from the block file into a buffer.
+   *
+   * This function overwrites checksumBuf.  It will increment dataPos.
+   *
+   * @param buf   The buffer to read into.  May be dataBuf.
+   *              The position and limit of this buffer should be set to
+   *              multiples of the checksum size.
+   * @param canSkipChecksum  True if we can skip checksumming.
+   *
+   * @return      Total bytes read.  0 on EOF.
    */
-  private int fillBuffer(FileInputStream stream, ByteBuffer buf)
+  private synchronized int fillBuffer(ByteBuffer buf, boolean canSkipChecksum)
       throws IOException {
-    int bytesRead = stream.getChannel().read(buf);
-    if (bytesRead < 0) {
-      //EOF
-      return bytesRead;
-    }
-    while (buf.remaining() > 0) {
-      int n = stream.getChannel().read(buf);
-      if (n < 0) {
-        //EOF
-        return bytesRead;
-      }
-      bytesRead += n;
+    int total = 0;
+    long startDataPos = dataPos;
+    int startBufPos = buf.position();
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead < 0) {
+        break;
+      }
+      dataPos += nRead;
+      total += nRead;
+    }
+    if (canSkipChecksum) {
+      freeChecksumBufIfExists();
+      return total;
     }
-    return bytesRead;
-  }
+    if (total > 0) {
+      try {
+        buf.limit(buf.position());
+        buf.position(startBufPos);
+        createChecksumBufIfNeeded();
+        int checksumsNeeded = (total + bytesPerChecksum - 1) / bytesPerChecksum;
+        checksumBuf.clear();
+        checksumBuf.limit(checksumsNeeded * checksumSize);
+        long checksumPos =
+          7 + ((startDataPos / bytesPerChecksum) * checksumSize);
+        while (checksumBuf.hasRemaining()) {
+          int nRead = checksumIn.read(checksumBuf, checksumPos);
+          if (nRead < 0) {
+            throw new IOException("Got unexpected checksum file EOF at " +
+                checksumPos + ", block file position " + startDataPos + " for " +
+                "block " + block + " of file " + filename);
+          }
+          checksumPos += nRead;
+        }
+        checksumBuf.flip();
   
-  /**
-   * Utility method used by read(ByteBuffer) to partially copy a ByteBuffer into
-   * another.
-   */
-  private void writeSlice(ByteBuffer from, ByteBuffer to, int length) {
-    int oldLimit = from.limit();
-    from.limit(from.position() + length);
-    try {
-      to.put(from);
-    } finally {
-      from.limit(oldLimit);
+        checksum.verifyChunkedSums(buf, checksumBuf, filename, startDataPos);
+      } finally {
+        buf.position(buf.limit());
+      }
     }
+    return total;
   }
 
+  private boolean getCanSkipChecksum() {
+    return (!verifyChecksum) || mlocked.get();
+  }
+  
   @Override
   public synchronized int read(ByteBuffer buf) throws IOException {
-    int nRead = 0;
-    if (verifyChecksum) {
-      // A 'direct' read actually has three phases. The first drains any
-      // remaining bytes from the slow read buffer. After this the read is
-      // guaranteed to be on a checksum chunk boundary. If there are still bytes
-      // to read, the fast direct path is used for as many remaining bytes as
-      // possible, up to a multiple of the checksum chunk size. Finally, any
-      // 'odd' bytes remaining at the end of the read cause another slow read to
-      // be issued, which involves an extra copy.
-
-      // Every 'slow' read tries to fill the slow read buffer in one go for
-      // efficiency's sake. As described above, all non-checksum-chunk-aligned
-      // reads will be served from the slower read path.
-
-      if (slowReadBuff.hasRemaining()) {
-        // There are remaining bytes from a small read available. This usually
-        // means this read is unaligned, which falls back to the slow path.
-        int fromSlowReadBuff = Math.min(buf.remaining(), slowReadBuff.remaining());
-        writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-        nRead += fromSlowReadBuff;
-      }
-
-      if (buf.remaining() >= bytesPerChecksum && offsetFromChunkBoundary == 0) {
-        // Since we have drained the 'small read' buffer, we are guaranteed to
-        // be chunk-aligned
-        int len = buf.remaining() - (buf.remaining() % bytesPerChecksum);
-
-        // There's only enough checksum buffer space available to checksum one
-        // entire slow read buffer. This saves keeping the number of checksum
-        // chunks around.
-        len = Math.min(len, slowReadBuff.capacity());
-        int oldlimit = buf.limit();
-        buf.limit(buf.position() + len);
-        int readResult = 0;
-        try {
-          readResult = doByteBufferRead(buf);
-        } finally {
-          buf.limit(oldlimit);
-        }
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          nRead += readResult;
-          buf.position(buf.position() + readResult);
-        }
-      }
-
-      // offsetFromChunkBoundary > 0 => unaligned read, use slow path to read
-      // until chunk boundary
-      if ((buf.remaining() > 0 && buf.remaining() < bytesPerChecksum) || offsetFromChunkBoundary > 0) {
-        int toRead = Math.min(buf.remaining(), bytesPerChecksum - offsetFromChunkBoundary);
-        int readResult = fillSlowReadBuffer(toRead);
-        if (readResult == -1) {
-          return nRead;
-        } else {
-          int fromSlowReadBuff = Math.min(readResult, buf.remaining());
-          writeSlice(slowReadBuff, buf, fromSlowReadBuff);
-          nRead += fromSlowReadBuff;
-        }
+    boolean canSkipChecksum = getCanSkipChecksum();
+    
+    String traceString = null;
+    if (LOG.isTraceEnabled()) {
+      traceString = new StringBuilder().
+          append("read(").
+          append("buf.remaining=").append(buf.remaining()).
+          append(", block=").append(block).
+          append(", filename=").append(filename).
+          append(", canSkipChecksum=").append(canSkipChecksum).
+          append(")").toString();
+      LOG.info(traceString + ": starting");
+    }
+    int nRead;
+    try {
+      if (canSkipChecksum && zeroReadaheadRequested) {
+        nRead = readWithoutBounceBuffer(buf);
+      } else {
+        nRead = readWithBounceBuffer(buf, canSkipChecksum);
       }
-    } else {
-      // Non-checksummed reads are much easier; we can just fill the buffer directly.
-      nRead = doByteBufferRead(buf);
-      if (nRead > 0) {
-        buf.position(buf.position() + nRead);
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.info(traceString + ": I/O error", e);
       }
+      throw e;
+    }
+    if (LOG.isTraceEnabled()) {
+      LOG.info(traceString + ": returning " + nRead);
     }
     return nRead;
   }
 
+  private synchronized int readWithoutBounceBuffer(ByteBuffer buf)
+      throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int total = 0;
+    while (buf.hasRemaining()) {
+      int nRead = dataIn.read(buf, dataPos);
+      if (nRead < 0) {
+        break;
+      }
+      dataPos += nRead;
+      total += nRead;
+    }
+    return (total == 0) ? -1 : total;
+  }
+
   /**
-   * Tries to read as many bytes as possible into supplied buffer, checksumming
-   * each chunk if needed.
-   *
-   * <b>Preconditions:</b>
-   * <ul>
-   * <li>
-   * If checksumming is enabled, buf.remaining must be a multiple of
-   * bytesPerChecksum. Note that this is not a requirement for clients of
-   * read(ByteBuffer) - in the case of non-checksum-sized read requests,
-   * read(ByteBuffer) will substitute a suitably sized buffer to pass to this
-   * method.
-   * </li>
-   * </ul>
-   * <b>Postconditions:</b>
-   * <ul>
-   * <li>buf.limit and buf.mark are unchanged.</li>
-   * <li>buf.position += min(offsetFromChunkBoundary, totalBytesRead) - so the
-   * requested bytes can be read straight from the buffer</li>
-   * </ul>
-   *
-   * @param buf
-   *          byte buffer to write bytes to. If checksums are not required, buf
-   *          can have any number of bytes remaining, otherwise there must be a
-   *          multiple of the checksum chunk size remaining.
-   * @return <tt>max(min(totalBytesRead, len) - offsetFromChunkBoundary, 0)</tt>
-   *         that is, the the number of useful bytes (up to the amount
-   *         requested) readable from the buffer by the client.
-   */
-  private synchronized int doByteBufferRead(ByteBuffer buf) throws IOException {
-    if (verifyChecksum) {
-      assert buf.remaining() % bytesPerChecksum == 0;
-    }
-    int dataRead = -1;
-
-    int oldpos = buf.position();
-    // Read as much as we can into the buffer.
-    dataRead = fillBuffer(dataIn, buf);
-
-    if (dataRead == -1) {
-      return -1;
-    }
-
-    if (verifyChecksum) {
-      ByteBuffer toChecksum = buf.duplicate();
-      toChecksum.position(oldpos);
-      toChecksum.limit(oldpos + dataRead);
-
-      checksumBuff.clear();
-      // Equivalent to (int)Math.ceil(toChecksum.remaining() * 1.0 / bytesPerChecksum );
-      int numChunks =
-        (toChecksum.remaining() + bytesPerChecksum - 1) / bytesPerChecksum;
-      checksumBuff.limit(checksumSize * numChunks);
-
-      fillBuffer(checksumIn, checksumBuff);
-      checksumBuff.flip();
-
-      checksum.verifyChunkedSums(toChecksum, checksumBuff, filename,
-          this.startOffset);
-    }
-
-    if (dataRead >= 0) {
-      buf.position(oldpos + Math.min(offsetFromChunkBoundary, dataRead));
-    }
-
-    if (dataRead < offsetFromChunkBoundary) {
-      // yikes, didn't even get enough bytes to honour offset. This can happen
-      // even if we are verifying checksums if we are at EOF.
-      offsetFromChunkBoundary -= dataRead;
-      dataRead = 0;
+   * Fill the data buffer.  If necessary, validate the data against the
+   * checksums.
+   * 
+   * We always want the offsets of the data contained in dataBuf to be
+   * aligned to the chunk boundary.  If we are validating checksums, we
+   * accomplish this by seeking backwards in the file until we're on a
+   * chunk boundary.  (This is necessary because we can't checksum a
+   * partial chunk.)  If we are not validating checksums, we simply only
+   * fill the latter part of dataBuf.
+   * 
+   * @param canSkipChecksum  true if we can skip checksumming.
+   * @return                 true if we hit EOF.
+   * @throws IOException
+   */
+  private synchronized boolean fillDataBuf(boolean canSkipChecksum)
+      throws IOException {
+    createDataBufIfNeeded();
+    final int slop = (int)(dataPos % bytesPerChecksum);
+    final long oldDataPos = dataPos;
+    dataBuf.limit(maxReadaheadLength);
+    if (canSkipChecksum) {
+      dataBuf.position(slop);
+      fillBuffer(dataBuf, canSkipChecksum);
     } else {
-      dataRead -= offsetFromChunkBoundary;
-      offsetFromChunkBoundary = 0;
+      dataPos -= slop;
+      dataBuf.position(0);
+      fillBuffer(dataBuf, canSkipChecksum);
     }
-
-    return dataRead;
+    dataBuf.limit(dataBuf.position());
+    dataBuf.position(Math.min(dataBuf.position(), slop));
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("loaded " + dataBuf.remaining() + " bytes into bounce " +
+          "buffer from offset " + oldDataPos + " of " + block);
+    }
+    return dataBuf.limit() != maxReadaheadLength;
   }
 
   /**
-   * Ensures that up to len bytes are available and checksummed in the slow read
-   * buffer. The number of bytes available to read is returned. If the buffer is
-   * not already empty, the number of remaining bytes is returned and no actual
-   * read happens.
+   * Read using the bounce buffer.
+   *
+   * A 'direct' read actually has three phases. The first drains any
+   * remaining bytes from the slow read buffer. After this the read is
+   * guaranteed to be on a checksum chunk boundary. If there are still bytes
+   * to read, the fast direct path is used for as many remaining bytes as
+   * possible, up to a multiple of the checksum chunk size. Finally, any
+   * 'odd' bytes remaining at the end of the read cause another slow read to
+   * be issued, which involves an extra copy.
+   *
+   * Every 'slow' read tries to fill the slow read buffer in one go for
+   * efficiency's sake. As described above, all non-checksum-chunk-aligned
+   * reads will be served from the slower read path.
    *
-   * @param len
-   *          the maximum number of bytes to make available. After len bytes
-   *          are read, the underlying bytestream <b>must</b> be at a checksum
-   *          boundary, or EOF. That is, (len + currentPosition) %
-   *          bytesPerChecksum == 0.
-   * @return the number of bytes available to read, or -1 if EOF.
+   * @param buf              The buffer to read into. 
+   * @param canSkipChecksum  True if we can skip checksums.
    */
-  private synchronized int fillSlowReadBuffer(int len) throws IOException {
-    int nRead = -1;
-    if (slowReadBuff.hasRemaining()) {
-      // Already got data, good to go.
-      nRead = Math.min(len, slowReadBuff.remaining());
-    } else {
-      // Round a complete read of len bytes (plus any implicit offset) to the
-      // next chunk boundary, since we try and read in multiples of a chunk
-      int nextChunk = len + offsetFromChunkBoundary +
-          (bytesPerChecksum - ((len + offsetFromChunkBoundary) % bytesPerChecksum));
-      int limit = Math.min(nextChunk, slowReadBuff.capacity());
-      assert limit % bytesPerChecksum == 0;
-
-      slowReadBuff.clear();
-      slowReadBuff.limit(limit);
-
-      nRead = doByteBufferRead(slowReadBuff);
-
-      if (nRead > 0) {
-        // So that next time we call slowReadBuff.hasRemaining(), we don't get a
-        // false positive.
-        slowReadBuff.limit(nRead + slowReadBuff.position());
+  private synchronized int readWithBounceBuffer(ByteBuffer buf,
+        boolean canSkipChecksum) throws IOException {
+    int total = 0;
+    boolean eof = false;
+    while (true) {
+      int bb = drainDataBuf(buf); // drain bounce buffer if possible
+      total += bb;
+      int needed = buf.remaining();
+      if (eof || (needed == 0)) {
+        break;
+      } else if (buf.isDirect() && (needed >= maxReadaheadLength)
+          && ((dataPos % bytesPerChecksum) == 0)) {
+        // Fast lane: try to read directly into user-supplied buffer, bypassing
+        // bounce buffer.
+        int oldLimit = buf.limit();
+        int nRead;
+        try {
+          buf.limit(buf.position() + maxReadaheadLength);
+          nRead = fillBuffer(buf, canSkipChecksum);
+        } finally {
+          buf.limit(oldLimit);
+        }
+        if (nRead < maxReadaheadLength) {
+          eof = true;
+        }
+        total += nRead;
+      } else {
+        // Slow lane: refill bounce buffer.
+        if (fillDataBuf(canSkipChecksum)) {
+          eof = true;
+        }
       }
     }
-    return nRead;
+    return total == 0 ? -1 : total;
   }
 
   @Override
-  public synchronized int read(byte[] buf, int off, int len) throws IOException {
+  public synchronized int read(byte[] arr, int off, int len)
+        throws IOException {
+    boolean canSkipChecksum = getCanSkipChecksum();
+    String traceString = null;
     if (LOG.isTraceEnabled()) {
-      LOG.trace("read off " + off + " len " + len);
+      traceString = new StringBuilder().
+          append("read(arr.length=").append(arr.length).
+          append(", off=").append(off).
+          append(", len=").append(len).
+          append(", filename=").append(filename).
+          append(", block=").append(block).
+          append(", canSkipChecksum=").append(canSkipChecksum).
+          append(")").toString();
+      LOG.trace(traceString + ": starting");
     }
-    if (!verifyChecksum) {
-      return dataIn.read(buf, off, len);
+    int nRead;
+    try {
+      if (canSkipChecksum && zeroReadaheadRequested) {
+        nRead = readWithoutBounceBuffer(arr, off, len);
+      } else {
+        nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum);
+      }
+    } catch (IOException e) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace(traceString + ": I/O error", e);
+      }
+      throw e;
     }
+    if (LOG.isTraceEnabled()) {
+      LOG.trace(traceString + ": returning " + nRead);
+    }
+    return nRead;
+  }
 
-    int nRead = fillSlowReadBuffer(slowReadBuff.capacity());
-
+  private synchronized int readWithoutBounceBuffer(byte arr[], int off,
+        int len) throws IOException {
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
+    int nRead = dataIn.read(ByteBuffer.wrap(arr, off, len), dataPos);
     if (nRead > 0) {
-      // Possible that buffer is filled with a larger read than we need, since
-      // we tried to read as much as possible at once
-      nRead = Math.min(len, nRead);
-      slowReadBuff.get(buf, off, nRead);
+      dataPos += nRead;
     }
+    return nRead == 0 ? -1 : nRead;
+  }
 
-    return nRead;
+  private synchronized int readWithBounceBuffer(byte arr[], int off, int len,
+        boolean canSkipChecksum) throws IOException {
+    createDataBufIfNeeded();
+    if (!dataBuf.hasRemaining()) {
+      dataBuf.position(0);
+      dataBuf.limit(maxReadaheadLength);
+      fillDataBuf(canSkipChecksum);
+    }
+    int toRead = Math.min(dataBuf.remaining(), len);
+    dataBuf.get(arr, off, toRead);
+    return toRead == 0 ? -1 : toRead;
   }
 
   @Override
   public synchronized long skip(long n) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("skip " + n);
-    }
-    if (n <= 0) {
-      return 0;
-    }
-    if (!verifyChecksum) {
-      return dataIn.skip(n);
-    }
-  
-    // caller made sure newPosition is not beyond EOF.
-    int remaining = slowReadBuff.remaining();
-    int position = slowReadBuff.position();
-    int newPosition = position + (int)n;
-  
-    // if the new offset is already read into dataBuff, just reposition
-    if (n <= remaining) {
-      assert offsetFromChunkBoundary == 0;
-      slowReadBuff.position(newPosition);
-      return n;
-    }
-  
-    // for small gap, read through to keep the data/checksum in sync
-    if (n - remaining <= bytesPerChecksum) {
-      slowReadBuff.position(position + remaining);
-      if (skipBuf == null) {
-        skipBuf = new byte[bytesPerChecksum];
-      }
-      int ret = read(skipBuf, 0, (int)(n - remaining));
-      return ret;
+    int discardedFromBuf = 0;
+    long remaining = n;
+    if ((dataBuf != null) && dataBuf.hasRemaining()) {
+      discardedFromBuf = (int)Math.min(dataBuf.remaining(), n);
+      dataBuf.position(dataBuf.position() + discardedFromBuf);
+      remaining -= discardedFromBuf;
     }
-  
-    // optimize for big gap: discard the current buffer, skip to
-    // the beginning of the appropriate checksum chunk and then
-    // read to the middle of that chunk to be in sync with checksums.
-  
-    // We can't use this.offsetFromChunkBoundary because we need to know how
-    // many bytes of the offset were really read. Calling read(..) with a
-    // positive this.offsetFromChunkBoundary causes that many bytes to get
-    // silently skipped.
-    int myOffsetFromChunkBoundary = newPosition % bytesPerChecksum;
-    long toskip = n - remaining - myOffsetFromChunkBoundary;
-
-    slowReadBuff.position(slowReadBuff.limit());
-    checksumBuff.position(checksumBuff.limit());
-  
-    IOUtils.skipFully(dataIn, toskip);
-    long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
-    IOUtils.skipFully(checksumIn, checkSumOffset);
-
-    // read into the middle of the chunk
-    if (skipBuf == null) {
-      skipBuf = new byte[bytesPerChecksum];
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("skip(n=" + n + ", block=" + block + ", filename=" + 
+        filename + "): discarded " + discardedFromBuf + " bytes from " +
+        "dataBuf and advanced dataPos by " + remaining);
     }
-    assert skipBuf.length == bytesPerChecksum;
-    assert myOffsetFromChunkBoundary < bytesPerChecksum;
-
-    int ret = read(skipBuf, 0, myOffsetFromChunkBoundary);
+    dataPos += remaining;
+    return n;
+  }
 
-    if (ret == -1) {  // EOS
-      return toskip;
-    } else {
-      return (toskip + ret);
-    }
+  @Override
+  public int available() throws IOException {
+    // We never do network I/O in BlockReaderLocal.
+    return Integer.MAX_VALUE;
   }
 
   @Override
   public synchronized void close() throws IOException {
+    if (closed) return;
+    closed = true;
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("close(filename=" + filename + ", block=" + block + ")");
+    }
     if (clientMmap != null) {
       clientMmap.unref();
       clientMmap = null;
@@ -504,58 +644,55 @@ class BlockReaderLocal implements BlockR
         LOG.debug("putting FileInputStream for " + filename +
             " back into FileInputStreamCache");
       }
-      fisCache.put(datanodeID, block, new FileInputStream[] {dataIn, checksumIn});
+      fisCache.put(datanodeID, block, streams);
     } else {
       LOG.debug("closing FileInputStream for " + filename);
       IOUtils.cleanup(LOG, dataIn, checksumIn);
     }
-    if (slowReadBuff != null) {
-      bufferPool.returnBuffer(slowReadBuff);
-      slowReadBuff = null;
-    }
-    if (checksumBuff != null) {
-      bufferPool.returnBuffer(checksumBuff);
-      checksumBuff = null;
-    }
-    startOffset = -1;
-    checksum = null;
+    freeDataBufIfExists();
+    freeChecksumBufIfExists();
   }
 
   @Override
-  public int readAll(byte[] buf, int offset, int len) throws IOException {
-    return BlockReaderUtil.readAll(this, buf, offset, len);
-  }
-
-  @Override
-  public void readFully(byte[] buf, int off, int len) throws IOException {
-    BlockReaderUtil.readFully(this, buf, off, len);
+  public synchronized void readFully(byte[] arr, int off, int len)
+      throws IOException {
+    BlockReaderUtil.readFully(this, arr, off, len);
   }
 
   @Override
-  public int available() throws IOException {
-    // We never do network I/O in BlockReaderLocal.
-    return Integer.MAX_VALUE;
+  public synchronized int readAll(byte[] buf, int off, int len)
+      throws IOException {
+    return BlockReaderUtil.readAll(this, buf, off, len);
   }
 
   @Override
   public boolean isLocal() {
     return true;
   }
-  
+
   @Override
   public boolean isShortCircuit() {
     return true;
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public synchronized ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
+    if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) &&
+          verifyChecksum && (!mlocked.get())) {
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("can't get an mmap for " + block + " of " + filename + 
+            " since SKIP_CHECKSUMS was not given, " +
+            "we aren't skipping checksums, and the block is not mlocked.");
+      }
+      return null;
+    }
     if (clientMmap == null) {
       if (mmapDisabled) {
         return null;
       }
       try {
-        clientMmap = mmapManager.fetch(datanodeID, block, dataIn);
+        clientMmap = mmapManager.fetch(datanodeID, block, streams[0]);
         if (clientMmap == null) {
           mmapDisabled = true;
           return null;
@@ -572,4 +709,24 @@ class BlockReaderLocal implements BlockR
     }
     return clientMmap;
   }
+
+  /**
+   * Set the mlocked state of the BlockReader.
+   * This method does NOT need to be synchronized because mlocked is atomic.
+   *
+   * @param mlocked  the new mlocked state of the BlockReader.
+   */
+  public void setMlocked(boolean mlocked) {
+    this.mlocked.set(mlocked);
+  }
+  
+  @VisibleForTesting
+  boolean getVerifyChecksum() {
+    return this.verifyChecksum;
+  }
+
+  @VisibleForTesting
+  int getMaxReadaheadLength() {
+    return this.maxReadaheadLength;
+  }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocalLegacy.java Fri Jan  3 07:26:52 2014
@@ -24,10 +24,12 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.Map;
 
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.commons.logging.Log;
@@ -706,8 +708,8 @@ class BlockReaderLocalLegacy implements 
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Fri Jan  3 07:26:52 2014
@@ -85,6 +85,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -98,10 +99,10 @@ import org.apache.hadoop.fs.MD5MD5CRC32C
 import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
 import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
 import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.Options.ChecksumOpt;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -109,6 +110,7 @@ import org.apache.hadoop.hdfs.client.Cli
 import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveIterator;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@@ -121,7 +123,6 @@ import org.apache.hadoop.hdfs.protocol.D
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -282,6 +283,8 @@ public class DFSClient implements java.i
     final boolean getHdfsBlocksMetadataEnabled;
     final int getFileBlockStorageLocationsNumThreads;
     final int getFileBlockStorageLocationsTimeout;
+    final int retryTimesForGetLastBlockLength;
+    final int retryIntervalForGetLastBlockLength;
 
     final boolean useLegacyBlockReader;
     final boolean useLegacyBlockReaderLocal;
@@ -355,6 +358,12 @@ public class DFSClient implements java.i
       getFileBlockStorageLocationsTimeout = conf.getInt(
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT,
           DFSConfigKeys.DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT);
+      retryTimesForGetLastBlockLength = conf.getInt(
+          DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH,
+          DFSConfigKeys.DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT);
+      retryIntervalForGetLastBlockLength = conf.getInt(
+        DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH,
+        DFSConfigKeys.DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT);
 
       useLegacyBlockReader = conf.getBoolean(
           DFSConfigKeys.DFS_CLIENT_USE_LEGACY_BLOCKREADER,
@@ -2295,20 +2304,20 @@ public class DFSClient implements java.i
   }
 
   public long addCacheDirective(
-      CacheDirectiveInfo info) throws IOException {
+      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
     checkOpen();
     try {
-      return namenode.addCacheDirective(info);
+      return namenode.addCacheDirective(info, flags);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException();
     }
   }
   
   public void modifyCacheDirective(
-      CacheDirectiveInfo info) throws IOException {
+      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
     checkOpen();
     try {
-      namenode.modifyCacheDirective(info);
+      namenode.modifyCacheDirective(info, flags);
     } catch (RemoteException re) {
       throw re.unwrapRemoteException();
     }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Jan  3 07:26:52 2014
@@ -65,6 +65,10 @@ public class DFSConfigKeys extends Commo
   public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_NUM_THREADS_DEFAULT = 10;
   public static final String  DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT = "dfs.client.file-block-storage-locations.timeout";
   public static final int     DFS_CLIENT_FILE_BLOCK_STORAGE_LOCATIONS_TIMEOUT_DEFAULT = 60;
+  public static final String  DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.times.get-last-block-length";
+  public static final int     DFS_CLIENT_RETRY_TIMES_GET_LAST_BLOCK_LENGTH_DEFAULT = 3;
+  public static final String  DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH = "dfs.client.retry.interval-ms.get-last-block-length";
+  public static final int     DFS_CLIENT_RETRY_INTERVAL_GET_LAST_BLOCK_LENGTH_DEFAULT = 4000;
 
   // HA related configuration
   public static final String  DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@@ -104,8 +108,9 @@ public class DFSConfigKeys extends Commo
   public static final long    DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT = 0;
   public static final String  DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY = "dfs.datanode.fsdatasetcache.max.threads.per.volume";
   public static final int     DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT = 4;
-  public static final String  DFS_NAMENODE_CACHING_ENABLED_KEY = "dfs.namenode.caching.enabled";
-  public static final boolean DFS_NAMENODE_CACHING_ENABLED_DEFAULT = false;
+  public static final String  DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT =
+    "dfs.namenode.path.based.cache.block.map.allocation.percent";
+  public static final float    DFS_NAMENODE_PATH_BASED_CACHE_BLOCK_MAP_ALLOCATION_PERCENT_DEFAULT = 0.25f;
 
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Fri Jan  3 07:26:52 2014
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.datatransfer.InvalidEncryptionKeyException;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
+import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.io.ByteBufferPool;
@@ -65,6 +66,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.IdentityHashStore;
 
@@ -226,7 +228,7 @@ implements ByteBufferReadable, CanSetDro
         dfsClient.getConf().shortCircuitStreamsCacheSize,
         dfsClient.getConf().shortCircuitStreamsCacheExpiryMs);
     this.cachingStrategy =
-        dfsClient.getDefaultReadCachingStrategy().duplicate();
+        dfsClient.getDefaultReadCachingStrategy();
     openInfo();
   }
 
@@ -235,7 +237,7 @@ implements ByteBufferReadable, CanSetDro
    */
   synchronized void openInfo() throws IOException, UnresolvedLinkException {
     lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
-    int retriesForLastBlockLength = 3;
+    int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
     while (retriesForLastBlockLength > 0) {
       // Getting last block length as -1 is a special case. When cluster
       // restarts, DNs may not report immediately. At this time partial block
@@ -245,7 +247,7 @@ implements ByteBufferReadable, CanSetDro
         DFSClient.LOG.warn("Last block locations not available. "
             + "Datanodes might not have reported blocks completely."
             + " Will retry for " + retriesForLastBlockLength + " times");
-        waitFor(4000);
+        waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
         lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
       } else {
         break;
@@ -572,7 +574,7 @@ implements ByteBufferReadable, CanSetDro
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
         blockReader = getBlockReader(targetAddr, chosenNode, src, blk,
             accessToken, offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
-            buffersize, verifyChecksum, dfsClient.clientName);
+            buffersize, verifyChecksum, dfsClient.clientName, cachingStrategy);
         if(connectFailedOnce) {
           DFSClient.LOG.info("Successfully connected to " + targetAddr +
                              " for " + blk);
@@ -590,20 +592,7 @@ implements ByteBufferReadable, CanSetDro
           // The encryption key used is invalid.
           refetchEncryptionKey--;
           dfsClient.clearDataEncryptionKey();
-        } else if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will fetch a new access token and retry, " 
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + ex);
-          /*
-           * Get a new access token and retry. Retry is needed in 2 cases. 1)
-           * When both NN and DN re-started while DFSClient holding a cached
-           * access token. 2) In the case that NN fails to update its
-           * access key at pre-set interval (by a wide margin) and
-           * subsequently restarts. In this case, DN re-registers itself with
-           * NN and receives a new access key, but DN will delete the old
-           * access key from its memory since it's considered expired based on
-           * the estimated expiration date.
-           */
+        } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
           refetchToken--;
           fetchBlockAt(target);
         } else {
@@ -939,7 +928,11 @@ implements ByteBufferReadable, CanSetDro
       // cached block locations may have been updated by chooseDataNode()
       // or fetchBlockAt(). Always get the latest list of locations at the 
       // start of the loop.
-      block = getBlockAt(block.getStartOffset(), false);
+      CachingStrategy curCachingStrategy;
+      synchronized (this) {
+        block = getBlockAt(block.getStartOffset(), false);
+        curCachingStrategy = cachingStrategy;
+      }
       DNAddrPair retval = chooseDataNode(block);
       DatanodeInfo chosenNode = retval.info;
       InetSocketAddress targetAddr = retval.addr;
@@ -951,7 +944,7 @@ implements ByteBufferReadable, CanSetDro
         int len = (int) (end - start + 1);
         reader = getBlockReader(targetAddr, chosenNode, src, block.getBlock(),
             blockToken, start, len, buffersize, verifyChecksum,
-            dfsClient.clientName);
+            dfsClient.clientName, curCachingStrategy);
         int nread = reader.readAll(buf, offset, len);
         if (nread != len) {
           throw new IOException("truncated return from reader.read(): " +
@@ -976,10 +969,7 @@ implements ByteBufferReadable, CanSetDro
           // The encryption key used is invalid.
           refetchEncryptionKey--;
           dfsClient.clearDataEncryptionKey();
-        } else if (e instanceof InvalidBlockTokenException && refetchToken > 0) {
-          DFSClient.LOG.info("Will get a new access token and retry, "
-              + "access token was invalid when connecting to " + targetAddr
-              + " : " + e);
+        } else if (refetchToken > 0 && tokenRefetchNeeded(e, targetAddr)) {
           refetchToken--;
           fetchBlockAt(block.getStartOffset());
           continue;
@@ -1000,6 +990,34 @@ implements ByteBufferReadable, CanSetDro
     }
   }
 
+  /**
+   * Should the block access token be refetched on an exception
+   * 
+   * @param ex Exception received
+   * @param targetAddr Target datanode address from where exception was received
+   * @return true if block access token has expired or invalid and it should be
+   *         refetched
+   */
+  private static boolean tokenRefetchNeeded(IOException ex,
+      InetSocketAddress targetAddr) {
+    /*
+     * Get a new access token and retry. Retry is needed in 2 cases. 1)
+     * When both NN and DN re-started while DFSClient holding a cached
+     * access token. 2) In the case that NN fails to update its
+     * access key at pre-set interval (by a wide margin) and
+     * subsequently restarts. In this case, DN re-registers itself with
+     * NN and receives a new access key, but DN will delete the old
+     * access key from its memory since it's considered expired based on
+     * the estimated expiration date.
+     */
+    if (ex instanceof InvalidBlockTokenException || ex instanceof InvalidToken) {
+      DFSClient.LOG.info("Access token was invalid when connecting to "
+          + targetAddr + " : " + ex);
+      return true;
+    }
+    return false;
+  }
+
   private Peer newTcpPeer(InetSocketAddress addr) throws IOException {
     Peer peer = null;
     boolean success = false;
@@ -1039,6 +1057,7 @@ implements ByteBufferReadable, CanSetDro
    * @param bufferSize  The IO buffer size (not the client buffer size)
    * @param verifyChecksum  Whether to verify checksum
    * @param clientName  Client name
+   * @param CachingStrategy  caching strategy to use
    * @return New BlockReader instance
    */
   protected BlockReader getBlockReader(InetSocketAddress dnAddr,
@@ -1050,7 +1069,8 @@ implements ByteBufferReadable, CanSetDro
                                        long len,
                                        int bufferSize,
                                        boolean verifyChecksum,
-                                       String clientName)
+                                       String clientName,
+                                       CachingStrategy curCachingStrategy)
       throws IOException {
     // Firstly, we check to see if we have cached any file descriptors for
     // local blocks.  If so, we can just re-use those file descriptors.
@@ -1060,9 +1080,18 @@ implements ByteBufferReadable, CanSetDro
         DFSClient.LOG.debug("got FileInputStreams for " + block + " from " +
             "the FileInputStreamCache.");
       }
-      return new BlockReaderLocal(dfsClient.getConf(), file,
-        block, startOffset, len, fis[0], fis[1], chosenNode, verifyChecksum,
-        fileInputStreamCache);
+      return new BlockReaderLocal.Builder(dfsClient.getConf()).
+          setFilename(file).
+          setBlock(block).
+          setStartOffset(startOffset).
+          setStreams(fis).
+          setDatanodeID(chosenNode).
+          setVerifyChecksum(verifyChecksum).
+          setBlockMetadataHeader(BlockMetadataHeader.
+              preadHeader(fis[1].getChannel())).
+          setFileInputStreamCache(fileInputStreamCache).
+          setCachingStrategy(curCachingStrategy).
+          build();
     }
     
     // If the legacy local block reader is enabled and we are reading a local
@@ -1096,7 +1125,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, cachingStrategy);
+            allowShortCircuitLocalReads, curCachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader with DomainSocket. " +
@@ -1119,7 +1148,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode,
             dsFactory, peerCache, fileInputStreamCache,
-            allowShortCircuitLocalReads, cachingStrategy);
+            allowShortCircuitLocalReads, curCachingStrategy);
         return reader;
       } catch (IOException e) {
         DFSClient.LOG.warn("failed to connect to " + domSock, e);
@@ -1143,7 +1172,7 @@ implements ByteBufferReadable, CanSetDro
             dfsClient.getConf(), file, block, blockToken, startOffset,
             len, verifyChecksum, clientName, peer, chosenNode, 
             dsFactory, peerCache, fileInputStreamCache, false,
-            cachingStrategy);
+            curCachingStrategy);
         return reader;
       } catch (IOException ex) {
         DFSClient.LOG.debug("Error making BlockReader. Closing stale " +
@@ -1159,11 +1188,21 @@ implements ByteBufferReadable, CanSetDro
     }
     // Try to create a new remote peer.
     Peer peer = newTcpPeer(dnAddr);
-    return BlockReaderFactory.newBlockReader(
-        dfsClient.getConf(), file, block, blockToken, startOffset,
-        len, verifyChecksum, clientName, peer, chosenNode, 
-        dsFactory, peerCache, fileInputStreamCache, false,
-        cachingStrategy);
+    try {
+      reader = BlockReaderFactory.newBlockReader(dfsClient.getConf(), file,
+          block, blockToken, startOffset, len, verifyChecksum, clientName,
+          peer, chosenNode, dsFactory, peerCache, fileInputStreamCache, false,
+          curCachingStrategy);
+      return reader;
+    } catch (IOException ex) {
+      DFSClient.LOG.debug(
+          "Exception while getting block reader, closing stale " + peer, ex);
+      throw ex;
+    } finally {
+      if (reader == null) {
+        IOUtils.closeQuietly(peer);
+      }
+    }
   }
 
 
@@ -1344,7 +1383,7 @@ implements ByteBufferReadable, CanSetDro
        * deadNodes and added currentNode again. Thats ok. */
       deadNodes.remove(oldNode);
     }
-    if (!oldNode.getStorageID().equals(newNode.getStorageID())) {
+    if (!oldNode.getDatanodeUuid().equals(newNode.getDatanodeUuid())) {
       currentNode = newNode;
       return true;
     } else {
@@ -1437,14 +1476,18 @@ implements ByteBufferReadable, CanSetDro
   @Override
   public synchronized void setReadahead(Long readahead)
       throws IOException {
-    this.cachingStrategy.setReadahead(readahead);
+    this.cachingStrategy =
+        new CachingStrategy.Builder(this.cachingStrategy).
+            setReadahead(readahead).build();
     closeCurrentBlockReader();
   }
 
   @Override
   public synchronized void setDropBehind(Boolean dropBehind)
       throws IOException {
-    this.cachingStrategy.setDropBehind(dropBehind);
+    this.cachingStrategy =
+        new CachingStrategy.Builder(this.cachingStrategy).
+            setDropBehind(dropBehind).build();
     closeCurrentBlockReader();
   }
 
@@ -1466,23 +1509,19 @@ implements ByteBufferReadable, CanSetDro
             "at position " + pos);
       }
     }
-    boolean canSkipChecksums = opts.contains(ReadOption.SKIP_CHECKSUMS);
-    if (canSkipChecksums) {
-      ByteBuffer buffer = tryReadZeroCopy(maxLength);
-      if (buffer != null) {
-        return buffer;
-      }
+    ByteBuffer buffer = tryReadZeroCopy(maxLength, opts);
+    if (buffer != null) {
+      return buffer;
     }
-    ByteBuffer buffer = ByteBufferUtil.
-        fallbackRead(this, bufferPool, maxLength);
+    buffer = ByteBufferUtil.fallbackRead(this, bufferPool, maxLength);
     if (buffer != null) {
       extendedReadBuffers.put(buffer, bufferPool);
     }
     return buffer;
   }
 
-  private synchronized ByteBuffer tryReadZeroCopy(int maxLength)
-      throws IOException {
+  private synchronized ByteBuffer tryReadZeroCopy(int maxLength,
+      EnumSet<ReadOption> opts) throws IOException {
     // Java ByteBuffers can't be longer than 2 GB, because they use
     // 4-byte signed integers to represent capacity, etc.
     // So we can't mmap the parts of the block higher than the 2 GB offset.
@@ -1505,8 +1544,7 @@ implements ByteBufferReadable, CanSetDro
     long blockPos = curPos - blockStartInFile;
     long limit = blockPos + length;
     ClientMmap clientMmap =
-        blockReader.getClientMmap(currentLocatedBlock,
-            dfsClient.getMmapManager());
+        blockReader.getClientMmap(opts, dfsClient.getMmapManager());
     if (clientMmap == null) {
       if (DFSClient.LOG.isDebugEnabled()) {
         DFSClient.LOG.debug("unable to perform a zero-copy read from offset " +

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Fri Jan  3 07:26:52 2014
@@ -150,7 +150,7 @@ public class DFSOutputStream extends FSO
   private Progressable progress;
   private final short blockReplication; // replication factor of file
   private boolean shouldSyncBlock = false; // force blocks to disk upon close
-  private CachingStrategy cachingStrategy;
+  private AtomicReference<CachingStrategy> cachingStrategy;
   private boolean failPacket = false;
   
   private static class Packet {
@@ -312,6 +312,7 @@ public class DFSOutputStream extends FSO
     private DataInputStream blockReplyStream;
     private ResponseProcessor response = null;
     private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+    private volatile String[] storageIDs = null;
     private LoadingCache<DatanodeInfo, DatanodeInfo> excludedNodes =
         CacheBuilder.newBuilder()
         .expireAfterWrite(
@@ -402,7 +403,7 @@ public class DFSOutputStream extends FSO
       }
 
       // setup pipeline to append to the last block XXX retries??
-      nodes = lastBlock.getLocations();
+      setPipeline(lastBlock);
       errorIndex = -1;   // no errors yet.
       if (nodes.length < 1) {
         throw new IOException("Unable to retrieve blocks locations " +
@@ -411,6 +412,14 @@ public class DFSOutputStream extends FSO
 
       }
     }
+    
+    private void setPipeline(LocatedBlock lb) {
+      setPipeline(lb.getLocations(), lb.getStorageIDs());
+    }
+    private void setPipeline(DatanodeInfo[] nodes, String[] storageIDs) {
+      this.nodes = nodes;
+      this.storageIDs = storageIDs;
+    }
 
     private void setFavoredNodes(String[] favoredNodes) {
       this.favoredNodes = favoredNodes;
@@ -434,7 +443,7 @@ public class DFSOutputStream extends FSO
       this.setName("DataStreamer for file " + src);
       closeResponder();
       closeStream();
-      nodes = null;
+      setPipeline(null, null);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -503,7 +512,7 @@ public class DFSOutputStream extends FSO
             if(DFSClient.LOG.isDebugEnabled()) {
               DFSClient.LOG.debug("Allocating new block");
             }
-            nodes = nextBlockOutputStream();
+            setPipeline(nextBlockOutputStream());
             initDataStreaming();
           } else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
             if(DFSClient.LOG.isDebugEnabled()) {
@@ -917,9 +926,10 @@ public class DFSOutputStream extends FSO
       //get a new datanode
       final DatanodeInfo[] original = nodes;
       final LocatedBlock lb = dfsClient.namenode.getAdditionalDatanode(
-          src, block, nodes, failed.toArray(new DatanodeInfo[failed.size()]),
+          src, block, nodes, storageIDs,
+          failed.toArray(new DatanodeInfo[failed.size()]),
           1, dfsClient.clientName);
-      nodes = lb.getLocations();
+      setPipeline(lb);
 
       //find the new datanode
       final int d = findNewDatanode(original);
@@ -1019,7 +1029,14 @@ public class DFSOutputStream extends FSO
           System.arraycopy(nodes, 0, newnodes, 0, errorIndex);
           System.arraycopy(nodes, errorIndex+1, newnodes, errorIndex,
               newnodes.length-errorIndex);
-          nodes = newnodes;
+
+          final String[] newStorageIDs = new String[newnodes.length];
+          System.arraycopy(storageIDs, 0, newStorageIDs, 0, errorIndex);
+          System.arraycopy(storageIDs, errorIndex+1, newStorageIDs, errorIndex,
+              newStorageIDs.length-errorIndex);
+          
+          setPipeline(newnodes, newStorageIDs);
+
           hasError = false;
           lastException.set(null);
           errorIndex = -1;
@@ -1055,7 +1072,8 @@ public class DFSOutputStream extends FSO
         // update pipeline at the namenode
         ExtendedBlock newBlock = new ExtendedBlock(
             block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock, nodes);
+        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
+            nodes, storageIDs);
         // update client side generation stamp
         block = newBlock;
       }
@@ -1068,7 +1086,7 @@ public class DFSOutputStream extends FSO
      * Must get block ID and the IDs of the destinations from the namenode.
      * Returns the list of target datanodes.
      */
-    private DatanodeInfo[] nextBlockOutputStream() throws IOException {
+    private LocatedBlock nextBlockOutputStream() throws IOException {
       LocatedBlock lb = null;
       DatanodeInfo[] nodes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
@@ -1110,7 +1128,7 @@ public class DFSOutputStream extends FSO
       if (!success) {
         throw new IOException("Unable to create new block.");
       }
-      return nodes;
+      return lb;
     }
 
     // connects to the first datanode in the pipeline
@@ -1165,7 +1183,7 @@ public class DFSOutputStream extends FSO
           new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
               nodes, null, recoveryFlag? stage.getRecoveryStage() : stage, 
               nodes.length, block.getNumBytes(), bytesSent, newGS, checksum,
-              cachingStrategy);
+              cachingStrategy.get());
   
           // receive ack for connect
           BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
@@ -1360,8 +1378,8 @@ public class DFSOutputStream extends FSO
     this.blockSize = stat.getBlockSize();
     this.blockReplication = stat.getReplication();
     this.progress = progress;
-    this.cachingStrategy =
-        dfsClient.getDefaultWriteCachingStrategy().duplicate();
+    this.cachingStrategy = new AtomicReference<CachingStrategy>(
+        dfsClient.getDefaultWriteCachingStrategy());
     if ((progress != null) && DFSClient.LOG.isDebugEnabled()) {
       DFSClient.LOG.debug(
           "Set non-null progress callback on DFSOutputStream " + src);
@@ -1975,7 +1993,14 @@ public class DFSOutputStream extends FSO
 
   @Override
   public void setDropBehind(Boolean dropBehind) throws IOException {
-    this.cachingStrategy.setDropBehind(dropBehind);
+    CachingStrategy prevStrategy, nextStrategy;
+    // CachingStrategy is immutable.  So build a new CachingStrategy with the
+    // modifications we want, and compare-and-swap it in.
+    do {
+      prevStrategy = this.cachingStrategy.get();
+      nextStrategy = new CachingStrategy.Builder(prevStrategy).
+                        setDropBehind(dropBehind).build();
+    } while (!this.cachingStrategy.compareAndSet(prevStrategy, nextStrategy));
   }
 
   @VisibleForTesting

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Fri Jan  3 07:26:52 2014
@@ -145,6 +145,23 @@ public class DFSUtil {
     return SECURE_RANDOM.get();
   }
 
+  /** Shuffle the elements in the given array. */
+  public static <T> T[] shuffle(final T[] array) {
+    if (array != null && array.length > 0) {
+      final Random random = getRandom();
+      for (int n = array.length; n > 1; ) {
+        final int randomIndex = random.nextInt(n);
+        n--;
+        if (n != randomIndex) {
+          final T tmp = array[randomIndex];
+          array[randomIndex] = array[n];
+          array[n] = tmp;
+        }
+      }
+    }
+    return array;
+  }
+
   /**
    * Compartor for sorting DataNodeInfo[] based on decommissioned states.
    * Decommissioned nodes are moved to the end of the array on sorting with
@@ -1529,7 +1546,11 @@ public class DFSUtil {
    * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
    */
   public static String durationToString(long durationMs) {
-    Preconditions.checkArgument(durationMs >= 0, "Invalid negative duration");
+    boolean negative = false;
+    if (durationMs < 0) {
+      negative = true;
+      durationMs = -durationMs;
+    }
     // Chop off the milliseconds
     long durationSec = durationMs / 1000;
     final int secondsPerMinute = 60;
@@ -1542,7 +1563,12 @@ public class DFSUtil {
     final long minutes = durationSec / secondsPerMinute;
     durationSec -= minutes * secondsPerMinute;
     final long seconds = durationSec;
-    return String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds);
+    final long milliseconds = durationMs % 1000;
+    String format = "%03d:%02d:%02d:%02d.%03d";
+    if (negative)  {
+      format = "-" + format;
+    }
+    return String.format(format, days, hours, minutes, seconds, milliseconds);
   }
 
   /**
@@ -1554,9 +1580,9 @@ public class DFSUtil {
           + ": too short");
     }
     String ttlString = relTime.substring(0, relTime.length()-1);
-    int ttl;
+    long ttl;
     try {
-      ttl = Integer.parseInt(ttlString);
+      ttl = Long.parseLong(ttlString);
     } catch (NumberFormatException e) {
       throw new IOException("Unable to parse relative time value of " + relTime
           + ": " + ttlString + " is not a number");

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Fri Jan  3 07:26:52 2014
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.BlockLocation;
 import org.apache.hadoop.fs.BlockStorageLocation;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.FSDataInputStream;
@@ -1586,39 +1587,55 @@ public class DistributedFileSystem exten
   }
 
   /**
+   * @see {@link #addCacheDirective(CacheDirectiveInfo, EnumSet)}
+   */
+  public long addCacheDirective(CacheDirectiveInfo info) throws IOException {
+    return addCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+  }
+
+  /**
    * Add a new CacheDirective.
    * 
    * @param info Information about a directive to add.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @return the ID of the directive that was created.
    * @throws IOException if the directive could not be added
    */
   public long addCacheDirective(
-      CacheDirectiveInfo info) throws IOException {
+      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
     Preconditions.checkNotNull(info.getPath());
     Path path = new Path(getPathName(fixRelativePart(info.getPath()))).
         makeQualified(getUri(), getWorkingDirectory());
     return dfs.addCacheDirective(
         new CacheDirectiveInfo.Builder(info).
             setPath(path).
-            build());
+            build(),
+        flags);
   }
-  
+
+  /**
+   * @see {@link #modifyCacheDirective(CacheDirectiveInfo, EnumSet)}
+   */
+  public void modifyCacheDirective(CacheDirectiveInfo info) throws IOException {
+    modifyCacheDirective(info, EnumSet.noneOf(CacheFlag.class));
+  }
+
   /**
    * Modify a CacheDirective.
    * 
-   * @param info Information about the directive to modify.
-   *             You must set the ID to indicate which CacheDirective you want
-   *             to modify.
+   * @param info Information about the directive to modify. You must set the ID
+   *          to indicate which CacheDirective you want to modify.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @throws IOException if the directive could not be modified
    */
   public void modifyCacheDirective(
-      CacheDirectiveInfo info) throws IOException {
+      CacheDirectiveInfo info, EnumSet<CacheFlag> flags) throws IOException {
     if (info.getPath() != null) {
       info = new CacheDirectiveInfo.Builder(info).
           setPath(new Path(getPathName(fixRelativePart(info.getPath()))).
               makeQualified(getUri(), getWorkingDirectory())).build();
     }
-    dfs.modifyCacheDirective(info);
+    dfs.modifyCacheDirective(info, flags);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Fri Jan  3 07:26:52 2014
@@ -23,10 +23,12 @@ import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FSInputChecker;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -490,8 +492,8 @@ public class RemoteBlockReader extends F
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager mmapManager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java Fri Jan  3 07:26:52 2014
@@ -25,10 +25,12 @@ import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
+import java.util.EnumSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.ReadOption;
 import org.apache.hadoop.hdfs.client.ClientMmap;
 import org.apache.hadoop.hdfs.client.ClientMmapManager;
 import org.apache.hadoop.hdfs.net.Peer;
@@ -455,8 +457,8 @@ public class RemoteBlockReader2  impleme
   }
 
   @Override
-  public ClientMmap getClientMmap(LocatedBlock curBlock,
-      ClientMmapManager manager) {
+  public ClientMmap getClientMmap(EnumSet<ReadOption> opts,
+        ClientMmapManager mmapManager) {
     return null;
   }
 }

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java Fri Jan  3 07:26:52 2014
@@ -19,10 +19,12 @@ package org.apache.hadoop.hdfs.client;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.EnumSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CacheFlag;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
@@ -131,25 +133,26 @@ public class HdfsAdmin {
    * Add a new CacheDirectiveInfo.
    * 
    * @param info Information about a directive to add.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @return the ID of the directive that was created.
    * @throws IOException if the directive could not be added
    */
-  public long addCacheDirective(CacheDirectiveInfo info)
-      throws IOException {
-    return dfs.addCacheDirective(info);
+  public long addCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+  return dfs.addCacheDirective(info, flags);
   }
   
   /**
    * Modify a CacheDirective.
    * 
-   * @param info Information about the directive to modify.
-   *             You must set the ID to indicate which CacheDirective you want
-   *             to modify.
+   * @param info Information about the directive to modify. You must set the ID
+   *          to indicate which CacheDirective you want to modify.
+   * @param flags {@link CacheFlag}s to use for this operation.
    * @throws IOException if the directive could not be modified
    */
-  public void modifyCacheDirective(CacheDirectiveInfo info)
-      throws IOException {
-    dfs.modifyCacheDirective(info);
+  public void modifyCacheDirective(CacheDirectiveInfo info,
+      EnumSet<CacheFlag> flags) throws IOException {
+    dfs.modifyCacheDirective(info, flags);
   }
 
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java Fri Jan  3 07:26:52 2014
@@ -19,7 +19,9 @@ package org.apache.hadoop.hdfs.protocol;
 
 import java.util.Iterator;
 import java.util.List;
+import java.util.Random;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -250,33 +252,28 @@ public class BlockListAsLongs implements
   }
 
   /**
-   * The block-id of the indexTh block
-   * @param index - the block whose block-id is desired
-   * @return the block-id
-   */
-  @Deprecated
-  public long getBlockId(final int index)  {
-    return blockId(index);
-  }
-  
-  /**
-   * The block-len of the indexTh block
-   * @param index - the block whose block-len is desired
-   * @return - the block-len
+   * Corrupt the generation stamp of the block with the given index.
+   * Not meant to be used outside of tests.
    */
-  @Deprecated
-  public long getBlockLen(final int index)  {
-    return blockLength(index);
+  @VisibleForTesting
+  public long corruptBlockGSForTesting(final int blockIndex, Random rand) {
+    long oldGS = blockList[index2BlockId(blockIndex) + 2];
+    while (blockList[index2BlockId(blockIndex) + 2] == oldGS) {
+      blockList[index2BlockId(blockIndex) + 2] = rand.nextInt();
+    }
+    return oldGS;
   }
 
   /**
-   * The generation stamp of the indexTh block
-   * @param index - the block whose block-len is desired
-   * @return - the generation stamp
+   * Corrupt the length of the block with the given index by truncation.
+   * Not meant to be used outside of tests.
    */
-  @Deprecated
-  public long getBlockGenStamp(final int index)  {
-    return blockGenerationStamp(index);
+  @VisibleForTesting
+  public long corruptBlockLengthForTesting(final int blockIndex, Random rand) {
+    long oldLength = blockList[index2BlockId(blockIndex) + 1];
+    blockList[index2BlockId(blockIndex) + 1] =
+        rand.nextInt((int) oldLength - 1);
+    return oldLength;
   }
   
   /**

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java Fri Jan  3 07:26:52 2014
@@ -52,6 +52,14 @@ public final class CacheDirective implem
   private Element prev;
   private Element next;
 
+  public CacheDirective(CacheDirectiveInfo info) {
+    this(
+        info.getId(),
+        info.getPath().toUri().getPath(),
+        info.getReplication(),
+        info.getExpiration().getAbsoluteMillis());
+  }
+
   public CacheDirective(long id, String path,
       short replication, long expiryTime) {
     Preconditions.checkArgument(id > 0);

Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java?rev=1555021&r1=1555020&r2=1555021&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java (original)
+++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java Fri Jan  3 07:26:52 2014
@@ -26,6 +26,8 @@ import org.apache.hadoop.classification.
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSUtil;
 
+import com.google.common.base.Preconditions;
+
 /**
  * Describes a path-based cache directive.
  */
@@ -138,11 +140,22 @@ public class CacheDirectiveInfo {
    */
   public static class Expiration {
 
-    /** Denotes a CacheDirectiveInfo that never expires **/
-    public static final int EXPIRY_NEVER = -1;
+    /**
+     * The maximum value we accept for a relative expiry.
+     */
+    public static final long MAX_RELATIVE_EXPIRY_MS =
+        Long.MAX_VALUE / 4; // This helps prevent weird overflow bugs
+
+    /**
+     * An relative Expiration that never expires.
+     */
+    public static final Expiration NEVER = newRelative(MAX_RELATIVE_EXPIRY_MS);
 
     /**
      * Create a new relative Expiration.
+     * <p>
+     * Use {@link Expiration#NEVER} to indicate an Expiration that never
+     * expires.
      * 
      * @param ms how long until the CacheDirective expires, in milliseconds
      * @return A relative Expiration
@@ -153,6 +166,9 @@ public class CacheDirectiveInfo {
 
     /**
      * Create a new absolute Expiration.
+     * <p>
+     * Use {@link Expiration#NEVER} to indicate an Expiration that never
+     * expires.
      * 
      * @param date when the CacheDirective expires
      * @return An absolute Expiration
@@ -163,6 +179,9 @@ public class CacheDirectiveInfo {
 
     /**
      * Create a new absolute Expiration.
+     * <p>
+     * Use {@link Expiration#NEVER} to indicate an Expiration that never
+     * expires.
      * 
      * @param ms when the CacheDirective expires, in milliseconds since the Unix
      *          epoch.
@@ -176,6 +195,10 @@ public class CacheDirectiveInfo {
     private final boolean isRelative;
 
     private Expiration(long ms, boolean isRelative) {
+      if (isRelative) {
+        Preconditions.checkArgument(ms <= MAX_RELATIVE_EXPIRY_MS,
+            "Expiration time is too far in the future!");
+      }
       this.ms = ms;
       this.isRelative = isRelative;
     }



Mime
View raw message