hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ka...@apache.org
Subject [06/41] hadoop git commit: HDFS-6735. A minor optimization to avoid pread() be blocked by read() inside the same DFSInputStream (Lars Hofhansl via stack)
Date Tue, 09 Dec 2014 03:31:00 GMT
HDFS-6735. A minor optimization to avoid pread() be blocked by read() inside the same DFSInputStream
(Lars Hofhansl via stack)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7caa3bc9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7caa3bc9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7caa3bc9

Branch: refs/heads/YARN-2139
Commit: 7caa3bc98e6880f98c5c32c486a0c539f9fd3f5f
Parents: 92ce6ed
Author: stack <stack@apache.org>
Authored: Tue Dec 2 20:54:03 2014 -0800
Committer: stack <stack@duboce.net>
Committed: Tue Dec 2 20:57:38 2014 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../dev-support/findbugsExcludeFile.xml         |   9 +
 .../org/apache/hadoop/hdfs/DFSInputStream.java  | 320 +++++++++++--------
 .../hadoop/hdfs/protocol/LocatedBlocks.java     |   9 +-
 .../hdfs/shortcircuit/ShortCircuitCache.java    |  19 +-
 .../hdfs/shortcircuit/ShortCircuitReplica.java  |  12 +-
 6 files changed, 217 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 13dda88..85d00b7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -422,6 +422,9 @@ Release 2.7.0 - UNRELEASED
     HDFS-7446. HDFS inotify should have the ability to determine what txid it
     has read up to (cmccabe)
 
+    HDFS-6735. A minor optimization to avoid pread() be blocked by read()
+    inside the same DFSInputStream (Lars Hofhansl via stack)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
index 2ddc4cc..dedeece 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml
@@ -207,4 +207,13 @@
       <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
     </Match>
 
+    <!--
+     We use a separate lock to guard cachingStrategy in order to separate
+     locks for p-reads from seek + read invocations.
+    -->
+    <Match>
+        <Class name="org.apache.hadoop.hdfs.DFSInputStream" />
+        <Field name="cachingStrategy" />
+        <Bug pattern="IS2_INCONSISTENT_SYNC" />
+    </Match>
  </FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
index 9794eec..b8b1d90 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
@@ -92,17 +92,32 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   private final DFSClient dfsClient;
   private boolean closed = false;
   private final String src;
-  private BlockReader blockReader = null;
   private final boolean verifyChecksum;
-  private LocatedBlocks locatedBlocks = null;
-  private long lastBlockBeingWrittenLength = 0;
-  private FileEncryptionInfo fileEncryptionInfo = null;
+
+  // state by stateful read only:
+  // (protected by lock on this)
+  /////
   private DatanodeInfo currentNode = null;
   private LocatedBlock currentLocatedBlock = null;
   private long pos = 0;
   private long blockEnd = -1;
+  private BlockReader blockReader = null;
+  ////
+
+  // state shared by stateful and positional read:
+  // (protected by lock on infoLock)
+  ////
+  private LocatedBlocks locatedBlocks = null;
+  private long lastBlockBeingWrittenLength = 0;
+  private FileEncryptionInfo fileEncryptionInfo = null;
   private CachingStrategy cachingStrategy;
+  ////
+
   private final ReadStatistics readStatistics = new ReadStatistics();
+  // lock for state shared between read and pread
+  // Note: Never acquire a lock on <this> with this lock held to avoid deadlocks
+  //       (it's OK to acquire this lock when the lock on <this> is held)
+  private final Object infoLock = new Object();
 
   /**
    * Track the ByteBuffers that we have handed out to readers.
@@ -226,35 +241,38 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     this.dfsClient = dfsClient;
     this.verifyChecksum = verifyChecksum;
     this.src = src;
-    this.cachingStrategy =
-        dfsClient.getDefaultReadCachingStrategy();
+    synchronized (infoLock) {
+      this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
+    }
     openInfo();
   }
 
   /**
    * Grab the open-file info from namenode
    */
-  synchronized void openInfo() throws IOException, UnresolvedLinkException {
-    lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
-    int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
-    while (retriesForLastBlockLength > 0) {
-      // Getting last block length as -1 is a special case. When cluster
-      // restarts, DNs may not report immediately. At this time partial block
-      // locations will not be available with NN for getting the length. Lets
-      // retry for 3 times to get the length.
-      if (lastBlockBeingWrittenLength == -1) {
-        DFSClient.LOG.warn("Last block locations not available. "
-            + "Datanodes might not have reported blocks completely."
-            + " Will retry for " + retriesForLastBlockLength + " times");
-        waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
-        lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
-      } else {
-        break;
+  void openInfo() throws IOException, UnresolvedLinkException {
+    synchronized(infoLock) {
+      lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+      int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
+      while (retriesForLastBlockLength > 0) {
+        // Getting last block length as -1 is a special case. When cluster
+        // restarts, DNs may not report immediately. At this time partial block
+        // locations will not be available with NN for getting the length. Lets
+        // retry for 3 times to get the length.
+        if (lastBlockBeingWrittenLength == -1) {
+          DFSClient.LOG.warn("Last block locations not available. "
+              + "Datanodes might not have reported blocks completely."
+              + " Will retry for " + retriesForLastBlockLength + " times");
+          waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
+          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
+        } else {
+          break;
+        }
+        retriesForLastBlockLength--;
+      }
+      if (retriesForLastBlockLength == 0) {
+        throw new IOException("Could not obtain the last block locations.");
       }
-      retriesForLastBlockLength--;
-    }
-    if (retriesForLastBlockLength == 0) {
-      throw new IOException("Could not obtain the last block locations.");
     }
   }
 
@@ -306,7 +324,6 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
 
     fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
 
-    currentNode = null;
     return lastBlockBeingWrittenLength;
   }
 
@@ -359,21 +376,25 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     throw new IOException("Cannot obtain block length for " + locatedblock);
   }
   
-  public synchronized long getFileLength() {
-    return locatedBlocks == null? 0:
-        locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
+  public long getFileLength() {
+    synchronized(infoLock) {
+      return locatedBlocks == null? 0:
+          locatedBlocks.getFileLength() + lastBlockBeingWrittenLength;
+    }
   }
 
   // Short circuit local reads are forbidden for files that are
   // under construction.  See HDFS-2757.
-  synchronized boolean shortCircuitForbidden() {
-    return locatedBlocks.isUnderConstruction();
+  boolean shortCircuitForbidden() {
+    synchronized(infoLock) {
+      return locatedBlocks.isUnderConstruction();
+    }
   }
 
   /**
    * Returns the datanode from which the stream is currently reading.
    */
-  public DatanodeInfo getCurrentDatanode() {
+  public synchronized DatanodeInfo getCurrentDatanode() {
     return currentNode;
   }
 
@@ -403,59 +424,67 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return located block
    * @throws IOException
    */
-  private synchronized LocatedBlock getBlockAt(long offset,
+  private LocatedBlock getBlockAt(long offset,
       boolean updatePosition) throws IOException {
-    assert (locatedBlocks != null) : "locatedBlocks is null";
+    synchronized(infoLock) {
+      assert (locatedBlocks != null) : "locatedBlocks is null";
 
-    final LocatedBlock blk;
+      final LocatedBlock blk;
 
-    //check offset
-    if (offset < 0 || offset >= getFileLength()) {
-      throw new IOException("offset < 0 || offset >= getFileLength(), offset="
-          + offset
-          + ", updatePosition=" + updatePosition
-          + ", locatedBlocks=" + locatedBlocks);
-    }
-    else if (offset >= locatedBlocks.getFileLength()) {
-      // offset to the portion of the last block,
-      // which is not known to the name-node yet;
-      // getting the last block 
-      blk = locatedBlocks.getLastLocatedBlock();
-    }
-    else {
-      // search cached blocks first
-      int targetBlockIdx = locatedBlocks.findBlock(offset);
-      if (targetBlockIdx < 0) { // block is not cached
-        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-        // fetch more blocks
-        final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-        assert (newBlocks != null) : "Could not find target position " + offset;
-        locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+      //check offset
+      if (offset < 0 || offset >= getFileLength()) {
+        throw new IOException("offset < 0 || offset >= getFileLength(), offset="
+            + offset
+            + ", updatePosition=" + updatePosition
+            + ", locatedBlocks=" + locatedBlocks);
+      }
+      else if (offset >= locatedBlocks.getFileLength()) {
+        // offset to the portion of the last block,
+        // which is not known to the name-node yet;
+        // getting the last block
+        blk = locatedBlocks.getLastLocatedBlock();
+      }
+      else {
+        // search cached blocks first
+        int targetBlockIdx = locatedBlocks.findBlock(offset);
+        if (targetBlockIdx < 0) { // block is not cached
+          targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+          // fetch more blocks
+          final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
+          assert (newBlocks != null) : "Could not find target position " + offset;
+          locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
+        }
+        blk = locatedBlocks.get(targetBlockIdx);
       }
-      blk = locatedBlocks.get(targetBlockIdx);
-    }
 
-    // update current position
-    if (updatePosition) {
-      pos = offset;
-      blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
-      currentLocatedBlock = blk;
+      // update current position
+      if (updatePosition) {
+        // synchronized not strictly needed, since we only get here
+        // from synchronized caller methods
+        synchronized(this) {
+          pos = offset;
+          blockEnd = blk.getStartOffset() + blk.getBlockSize() - 1;
+          currentLocatedBlock = blk;
+        }
+      }
+      return blk;
     }
-    return blk;
   }
 
   /** Fetch a block from namenode and cache it */
-  private synchronized void fetchBlockAt(long offset) throws IOException {
-    int targetBlockIdx = locatedBlocks.findBlock(offset);
-    if (targetBlockIdx < 0) { // block is not cached
-      targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
-    }
-    // fetch blocks
-    final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
-    if (newBlocks == null) {
-      throw new IOException("Could not find target position " + offset);
+  private void fetchBlockAt(long offset) throws IOException {
+    synchronized(infoLock) {
+      int targetBlockIdx = locatedBlocks.findBlock(offset);
+      if (targetBlockIdx < 0) { // block is not cached
+        targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
+      }
+      // fetch blocks
+      final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
+      if (newBlocks == null) {
+        throw new IOException("Could not find target position " + offset);
+      }
+      locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
     }
-    locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
   }
 
   /**
@@ -467,7 +496,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * @return consequent segment of located blocks
    * @throws IOException
    */
-  private synchronized List<LocatedBlock> getBlockRange(long offset,
+  private List<LocatedBlock> getBlockRange(long offset,
       long length)  throws IOException {
     // getFileLength(): returns total file length
     // locatedBlocks.getFileLength(): returns length of completed blocks
@@ -475,26 +504,27 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       throw new IOException("Offset: " + offset +
         " exceeds file length: " + getFileLength());
     }
+    synchronized(infoLock) {
+      final List<LocatedBlock> blocks;
+      final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
+      final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
+      final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
 
-    final List<LocatedBlock> blocks;
-    final long lengthOfCompleteBlk = locatedBlocks.getFileLength();
-    final boolean readOffsetWithinCompleteBlk = offset < lengthOfCompleteBlk;
-    final boolean readLengthPastCompleteBlk = offset + length > lengthOfCompleteBlk;
+      if (readOffsetWithinCompleteBlk) {
+        //get the blocks of finalized (completed) block range
+        blocks = getFinalizedBlockRange(offset,
+          Math.min(length, lengthOfCompleteBlk - offset));
+      } else {
+        blocks = new ArrayList<LocatedBlock>(1);
+      }
 
-    if (readOffsetWithinCompleteBlk) {
-      //get the blocks of finalized (completed) block range
-      blocks = getFinalizedBlockRange(offset, 
-        Math.min(length, lengthOfCompleteBlk - offset));
-    } else {
-      blocks = new ArrayList<LocatedBlock>(1);
-    }
+      // get the blocks from incomplete block range
+      if (readLengthPastCompleteBlk) {
+         blocks.add(locatedBlocks.getLastLocatedBlock());
+      }
 
-    // get the blocks from incomplete block range
-    if (readLengthPastCompleteBlk) {
-       blocks.add(locatedBlocks.getLastLocatedBlock());
+      return blocks;
     }
-
-    return blocks;
   }
 
   /**
@@ -502,35 +532,37 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Includes only the complete blocks.
    * Fetch them from the namenode if not cached.
    */
-  private synchronized List<LocatedBlock> getFinalizedBlockRange(
+  private List<LocatedBlock> getFinalizedBlockRange(
       long offset, long length) throws IOException {
-    assert (locatedBlocks != null) : "locatedBlocks is null";
-    List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
-    // search cached blocks first
-    int blockIdx = locatedBlocks.findBlock(offset);
-    if (blockIdx < 0) { // block is not cached
-      blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
-    }
-    long remaining = length;
-    long curOff = offset;
-    while(remaining > 0) {
-      LocatedBlock blk = null;
-      if(blockIdx < locatedBlocks.locatedBlockCount())
-        blk = locatedBlocks.get(blockIdx);
-      if (blk == null || curOff < blk.getStartOffset()) {
-        LocatedBlocks newBlocks;
-        newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
-        locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
-        continue;
+    synchronized(infoLock) {
+      assert (locatedBlocks != null) : "locatedBlocks is null";
+      List<LocatedBlock> blockRange = new ArrayList<LocatedBlock>();
+      // search cached blocks first
+      int blockIdx = locatedBlocks.findBlock(offset);
+      if (blockIdx < 0) { // block is not cached
+        blockIdx = LocatedBlocks.getInsertIndex(blockIdx);
+      }
+      long remaining = length;
+      long curOff = offset;
+      while(remaining > 0) {
+        LocatedBlock blk = null;
+        if(blockIdx < locatedBlocks.locatedBlockCount())
+          blk = locatedBlocks.get(blockIdx);
+        if (blk == null || curOff < blk.getStartOffset()) {
+          LocatedBlocks newBlocks;
+          newBlocks = dfsClient.getLocatedBlocks(src, curOff, remaining);
+          locatedBlocks.insertRange(blockIdx, newBlocks.getLocatedBlocks());
+          continue;
+        }
+        assert curOff >= blk.getStartOffset() : "Block not found";
+        blockRange.add(blk);
+        long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
+        remaining -= bytesRead;
+        curOff += bytesRead;
+        blockIdx++;
       }
-      assert curOff >= blk.getStartOffset() : "Block not found";
-      blockRange.add(blk);
-      long bytesRead = blk.getStartOffset() + blk.getBlockSize() - curOff;
-      remaining -= bytesRead;
-      curOff += bytesRead;
-      blockIdx++;
-    }
-    return blockRange;
+      return blockRange;
+    }
   }
 
   /**
@@ -573,6 +605,12 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       try {
         ExtendedBlock blk = targetBlock.getBlock();
         Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
+        CachingStrategy curCachingStrategy;
+        boolean shortCircuitForbidden;
+        synchronized(infoLock) {
+          curCachingStrategy = cachingStrategy;
+          shortCircuitForbidden = shortCircuitForbidden();
+        }
         blockReader = new BlockReaderFactory(dfsClient.getConf()).
             setInetSocketAddress(targetAddr).
             setRemotePeerFactory(dfsClient).
@@ -585,8 +623,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             setVerifyChecksum(verifyChecksum).
             setClientName(dfsClient.clientName).
             setLength(blk.getNumBytes() - offsetIntoBlock).
-            setCachingStrategy(cachingStrategy).
-            setAllowShortCircuitLocalReads(!shortCircuitForbidden()).
+            setCachingStrategy(curCachingStrategy).
+            setAllowShortCircuitLocalReads(!shortCircuitForbidden).
             setClientCacheContext(dfsClient.getClientContext()).
             setUserGroupInformation(dfsClient.ugi).
             setConfiguration(dfsClient.getConfiguration()).
@@ -782,7 +820,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     }
   }
 
-  private int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException
{
+  private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws
IOException {
     dfsClient.checkOpen();
     if (closed) {
       throw new IOException("Stream closed");
@@ -800,9 +838,11 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
             currentNode = blockSeekTo(pos);
           }
           int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
-          if (locatedBlocks.isLastBlockComplete()) {
-            realLen = (int) Math.min(realLen,
-                locatedBlocks.getFileLength() - pos);
+          synchronized(infoLock) {
+            if (locatedBlocks.isLastBlockComplete()) {
+              realLen = (int) Math.min(realLen,
+                  locatedBlocks.getFileLength() - pos);
+            }
           }
           int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
           
@@ -1055,8 +1095,8 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
       // start of the loop.
       CachingStrategy curCachingStrategy;
       boolean allowShortCircuitLocalReads;
-      synchronized (this) {
-        block = getBlockAt(block.getStartOffset(), false);
+      block = getBlockAt(block.getStartOffset(), false);
+      synchronized(infoLock) {
         curCachingStrategy = cachingStrategy;
         allowShortCircuitLocalReads = !shortCircuitForbidden();
       }
@@ -1488,7 +1528,7 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
    * Same as {@link #seekToNewSource(long)} except that it does not exclude
    * the current datanode and might connect to the same node.
    */
-  private synchronized boolean seekToBlockSource(long targetPos)
+  private boolean seekToBlockSource(long targetPos)
                                                  throws IOException {
     currentNode = blockSeekTo(targetPos);
     return true;
@@ -1575,11 +1615,13 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
     return new ReadStatistics(readStatistics);
   }
 
-  public synchronized FileEncryptionInfo getFileEncryptionInfo() {
-    return fileEncryptionInfo;
+  public FileEncryptionInfo getFileEncryptionInfo() {
+    synchronized(infoLock) {
+      return fileEncryptionInfo;
+    }
   }
 
-  private synchronized void closeCurrentBlockReader() {
+  private void closeCurrentBlockReader() {
     if (blockReader == null) return;
     // Close the current block reader so that the new caching settings can 
     // take effect immediately.
@@ -1594,18 +1636,20 @@ implements ByteBufferReadable, CanSetDropBehind, CanSetReadahead,
   @Override
   public synchronized void setReadahead(Long readahead)
       throws IOException {
-    this.cachingStrategy =
-        new CachingStrategy.Builder(this.cachingStrategy).
-            setReadahead(readahead).build();
+    synchronized (infoLock) {
+      this.cachingStrategy =
+          new CachingStrategy.Builder(this.cachingStrategy).setReadahead(readahead).build();
+    }
     closeCurrentBlockReader();
   }
 
   @Override
   public synchronized void setDropBehind(Boolean dropBehind)
       throws IOException {
-    this.cachingStrategy =
-        new CachingStrategy.Builder(this.cachingStrategy).
-            setDropBehind(dropBehind).build();
+    synchronized (infoLock) {
+      this.cachingStrategy =
+          new CachingStrategy.Builder(this.cachingStrategy).setDropBehind(dropBehind).build();
+    }
     closeCurrentBlockReader();
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
index 436fa14..fc739cf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
@@ -34,14 +34,17 @@ public class LocatedBlocks {
   private final long fileLength;
   private final List<LocatedBlock> blocks; // array of blocks with prioritized locations
   private final boolean underConstruction;
-  private LocatedBlock lastLocatedBlock = null;
-  private boolean isLastBlockComplete = false;
-  private FileEncryptionInfo fileEncryptionInfo = null;
+  private final LocatedBlock lastLocatedBlock;
+  private final boolean isLastBlockComplete;
+  private final FileEncryptionInfo fileEncryptionInfo;
 
   public LocatedBlocks() {
     fileLength = 0;
     blocks = null;
     underConstruction = false;
+    lastLocatedBlock = null;
+    isLastBlockComplete = false;
+    fileEncryptionInfo = null;
   }
 
   public LocatedBlocks(long flength, boolean isUnderConstuction,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
index 3edf9a9..bccfd6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
@@ -400,7 +400,7 @@ public class ShortCircuitCache implements Closeable {
     lock.lock();
     try {
       Preconditions.checkArgument(replica.refCount > 0,
-          "can't ref " + replica + " because its refCount reached " +
+          "can't ref %s because its refCount reached %d", replica,
           replica.refCount);
       Long evictableTimeNs = replica.getEvictableTimeNs();
       replica.refCount++;
@@ -456,14 +456,13 @@ public class ShortCircuitCache implements Closeable {
       if (newRefCount == 0) {
         // Close replica, since there are no remaining references to it.
         Preconditions.checkArgument(replica.purged,
-            "Replica " + replica + " reached a refCount of 0 without " +
-            "being purged");
+          "Replica %s reached a refCount of 0 without being purged", replica);
         replica.close();
       } else if (newRefCount == 1) {
         Preconditions.checkState(null == replica.getEvictableTimeNs(),
-            "Replica " + replica + " had a refCount higher than 1, " +
-              "but was still evictable (evictableTimeNs = " +
-                replica.getEvictableTimeNs() + ")");
+            "Replica %s had a refCount higher than 1, " +
+              "but was still evictable (evictableTimeNs = %d)",
+              replica, replica.getEvictableTimeNs());
         if (!replica.purged) {
           // Add the replica to the end of an eviction list.
           // Eviction lists are sorted by time.
@@ -478,8 +477,8 @@ public class ShortCircuitCache implements Closeable {
         }
       } else {
         Preconditions.checkArgument(replica.refCount >= 0,
-            "replica's refCount went negative (refCount = " +
-            replica.refCount + " for " + replica + ")");
+            "replica's refCount went negative (refCount = %d" +
+            " for %s)", replica.refCount, replica);
       }
       if (LOG.isTraceEnabled()) {
         LOG.trace(this + ": unref replica " + replica +
@@ -602,7 +601,7 @@ public class ShortCircuitCache implements Closeable {
     Preconditions.checkNotNull(evictableTimeNs);
     ShortCircuitReplica removed = map.remove(evictableTimeNs);
     Preconditions.checkState(removed == replica,
-        "failed to make " + replica + " unevictable");
+        "failed to make %s unevictable", replica);
     replica.setEvictableTimeNs(null);
   }
 
@@ -859,7 +858,7 @@ public class ShortCircuitCache implements Closeable {
           Condition cond = (Condition)replica.mmapData;
           cond.awaitUninterruptibly();
         } else {
-          Preconditions.checkState(false, "invalid mmapData type " +
+          Preconditions.checkState(false, "invalid mmapData type %s",
               replica.mmapData.getClass().getName());
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7caa3bc9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
index 85c7947..1390cf3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitReplica.java
@@ -243,18 +243,22 @@ public class ShortCircuitReplica {
     String suffix = "";
     
     Preconditions.checkState(refCount == 0,
-        "tried to close replica with refCount " + refCount + ": " + this);
+        "tried to close replica with refCount %d: %s", refCount, this);
     refCount = -1;
     Preconditions.checkState(purged,
-        "tried to close unpurged replica " + this);
+        "tried to close unpurged replica %s", this);
     if (hasMmap()) {
       munmap();
-      suffix += "  munmapped.";
+      if (LOG.isTraceEnabled()) {
+        suffix += "  munmapped.";
+      }
     }
     IOUtils.cleanup(LOG, dataStream, metaStream);
     if (slot != null) {
       cache.scheduleSlotReleaser(slot);
-      suffix += "  scheduling " + slot + " for later release.";
+      if (LOG.isTraceEnabled()) {
+        suffix += "  scheduling " + slot + " for later release.";
+      }
     }
     if (LOG.isTraceEnabled()) {
       LOG.trace("closed " + this + suffix);


Mime
View raw message