hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sur...@apache.org
Subject svn commit: r1362670 - in /hadoop/common/branches/branch-1.1: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/mapred/
Date Tue, 17 Jul 2012 21:38:18 GMT
Author: suresh
Date: Tue Jul 17 21:38:17 2012
New Revision: 1362670

URL: http://svn.apache.org/viewvc?rev=1362670&view=rev
Log:
HDFS-2465. Backporting from trunk - Add HDFS support for fadvise readahead and drop-behind.
Contributed by Todd Lipcon. Backported by Brandon Li.

Modified:
    hadoop/common/branches/branch-1.1/   (props changed)
    hadoop/common/branches/branch-1.1/CHANGES.txt   (contents, props changed)
    hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
    hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
    hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/branches/branch-1.1/src/mapred/   (props changed)

Propchange: hadoop/common/branches/branch-1.1/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1:r1360944

Modified: hadoop/common/branches/branch-1.1/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/CHANGES.txt?rev=1362670&r1=1362669&r2=1362670&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/CHANGES.txt (original)
+++ hadoop/common/branches/branch-1.1/CHANGES.txt Tue Jul 17 21:38:17 2012
@@ -8,13 +8,8 @@ Release 1.1.1 - Unreleased
 
   IMPROVEMENTS
 
-    HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add
-    ReadaheadPool infrastructure for use in HDFS and MR.
-    (Brandon Li and todd via suresh)
-
   BUG FIXES
 
-
 Release 1.1.0 - 2012.07.09
 
   INCOMPATIBLE CHANGES
@@ -132,6 +127,14 @@ Release 1.1.0 - 2012.07.09
 
     HDFS-3516. Check content-type in WebHdfsFileSystem.  (szetszwo)
 
+    HADOOP-7753. Support fadvise and sync_file_range in NativeIO. Add
+    ReadaheadPool infrastructure for use in HDFS and MR.
+    (Brandon Li and todd via suresh)
+
+    HDFS-2465. Add HDFS support for fadvise readahead and drop-behind.
+    (todd via suresh)
+
+
   BUG FIXES
 
     MAPREDUCE-4087. [Gridmix] GenerateDistCacheData job of Gridmix can

Propchange: hadoop/common/branches/branch-1.1/CHANGES.txt
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/CHANGES.txt:r1360944

Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1362670&r1=1362669&r2=1362670&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue
Jul 17 21:38:17 2012
@@ -52,6 +52,15 @@ public class DFSConfigKeys extends Commo
   public static final String  DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address";
   public static final String  DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec";
   public static final long    DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024;
+  public static final String  DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes";
+  public static final long    DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0;
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes";
+  public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
+  public static final String  DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
+  public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
+  
   public static final String  DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
   public static final int     DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;
   public static final String  DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address";

Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1362670&r1=1362669&r2=1362670&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
(original)
+++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
Tue Jul 17 21:38:17 2012
@@ -21,6 +21,8 @@ import java.io.BufferedOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.EOFException;
+import java.io.FileDescriptor;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.nio.ByteBuffer;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.DataTransferProtocol.PipelineAck;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
 import org.apache.hadoop.util.StringUtils;
@@ -51,11 +54,14 @@ class BlockReceiver implements java.io.C
   public static final Log LOG = DataNode.LOG;
   static final Log ClientTraceLog = DataNode.ClientTraceLog;
   
+  private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
+  
   private Block block; // the block to receive
   protected boolean finalized;
   private DataInputStream in = null; // from where data are read
   private DataChecksum checksum; // from where chunks of a block can be read
   private OutputStream out = null; // to block file at local disk
+  private FileDescriptor outFd;
   private DataOutputStream checksumOut = null; // to crc file at local disk
   private int bytesPerChecksum;
   private int checksumSize;
@@ -77,6 +83,11 @@ class BlockReceiver implements java.io.C
   private DataNode datanode = null;
   volatile private boolean mirrorError;
 
+  // Cache management state
+  private boolean dropCacheBehindWrites;
+  private boolean syncBehindWrites;
+  private long lastCacheDropOffset = 0;
+  
   BlockReceiver(Block block, DataInputStream in, String inAddr,
                 String myAddr, boolean isRecovery, String clientName, 
                 DatanodeInfo srcDataNode, DataNode datanode) throws IOException {
@@ -93,6 +104,8 @@ class BlockReceiver implements java.io.C
       this.checksum = DataChecksum.newDataChecksum(in);
       this.bytesPerChecksum = checksum.getBytesPerChecksum();
       this.checksumSize = checksum.getChecksumSize();
+      this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
+      this.syncBehindWrites = datanode.shouldSyncBehindWrites();
       //
       // Open local disk out
       //
@@ -101,6 +114,12 @@ class BlockReceiver implements java.io.C
       this.finalized = false;
       if (streams != null) {
         this.out = streams.dataOut;
+        if (out instanceof FileOutputStream) {
+          this.outFd = ((FileOutputStream) out).getFD();
+        } else {
+          LOG.warn("Could not get file descriptor for outputstream of class "
+              + out.getClass());
+        }
         this.checksumOut = new DataOutputStream(new BufferedOutputStream(
                                                   streams.checksumOut, 
                                                   SMALL_BUFFER_SIZE));
@@ -471,12 +490,12 @@ class BlockReceiver implements java.io.C
             checksumOut.write(pktBuf, checksumOff, checksumLen);
           }
           datanode.myMetrics.incrBytesWritten(len);
-
           /// flush entire packet before sending ack
           flush();
           
           // update length only after flush to disk
           datanode.data.setVisibleLength(block, offsetInBlock);
+          dropOsCacheBehindWriter(offsetInBlock);
         }
       } catch (IOException iex) {
         datanode.checkDiskError(iex);
@@ -497,6 +516,28 @@ class BlockReceiver implements java.io.C
     return payloadLen;
   }
 
+  private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException {
+    try {
+      if (outFd != null
+          && offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) {
+        long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES;
+        if (twoWindowsAgo > 0 && dropCacheBehindWrites) {
+          NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset,
+              NativeIO.POSIX_FADV_DONTNEED);
+        }
+
+        if (syncBehindWrites) {
+          NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset,
+              CACHE_DROP_LAG_BYTES, NativeIO.SYNC_FILE_RANGE_WRITE);
+        }
+
+        lastCacheDropOffset += CACHE_DROP_LAG_BYTES;
+      }
+    } catch (Throwable t) {
+      LOG.warn("Couldn't drop os cache behind writer for " + block, t);
+    }
+  }
+
   void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException {
     checksum.writeHeader(mirrorOut);
   }

Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1362670&r1=1362669&r2=1362670&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
(original)
+++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
Tue Jul 17 21:38:17 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da
 import java.io.BufferedInputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
+import java.io.FileDescriptor;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,6 +35,9 @@ import org.apache.hadoop.fs.ChecksumExce
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.FSConstants;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.net.SocketOutputStream;
 import org.apache.hadoop.util.ChecksumUtil;
 import org.apache.hadoop.util.DataChecksum;
@@ -51,6 +55,7 @@ class BlockSender implements java.io.Clo
   private long blockInPosition = -1; // updated while using transferTo().
   private DataInputStream checksumIn; // checksum datastream
   private DataChecksum checksum; // checksum stream
+  private long initialOffset; // initial position to read
   private long offset; // starting position to read
   private long endOffset; // ending position
   private long blockLength;
@@ -74,6 +79,22 @@ class BlockSender implements java.io.Clo
    */
   private static final int MIN_BUFFER_WITH_TRANSFERTO = 64*1024;
 
+  /** The file descriptor of the block being sent */
+  private FileDescriptor blockInFd;
+
+  // Cache-management related fields
+  private final long readaheadLength;
+  private boolean shouldDropCacheBehindRead;
+  private ReadaheadRequest curReadahead;
+  private long lastCacheDropOffset;
+  private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB
+  /**
+   * Minimum length of read below which management of the OS buffer cache is
+   * disabled.
+   */
+  private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024;
+
+  private static ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
   
   BlockSender(Block block, long startOffset, long length,
               boolean corruptChecksumOk, boolean chunkOffsetOK,
@@ -94,7 +115,9 @@ class BlockSender implements java.io.Clo
       this.blockLength = datanode.data.getVisibleLength(block);
       this.transferToAllowed = datanode.transferToAllowed;
       this.clientTraceFmt = clientTraceFmt;
-
+      this.readaheadLength = datanode.getReadaheadLength();
+      this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads();
+      
       if ( !corruptChecksumOk || datanode.data.metaFileExists(block) ) {
         checksumIn = new DataInputStream(
                 new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
@@ -166,6 +189,11 @@ class BlockSender implements java.io.Clo
       seqno = 0;
 
       blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
+      if (blockIn instanceof FileInputStream) {
+        blockInFd = ((FileInputStream) blockIn).getFD();
+      } else {
+        blockInFd = null;
+      }
       memoizedBlock = new MemoizedBlock(blockIn, blockLength, datanode.data, block);
     } catch (IOException ioe) {
       IOUtils.closeStream(this);
@@ -178,6 +206,19 @@ class BlockSender implements java.io.Clo
    * close opened files.
    */
   public void close() throws IOException {
+    if (blockInFd != null && shouldDropCacheBehindRead) {
+      // drop the last few MB of the file from cache
+      try {
+        NativeIO.posixFadviseIfPossible(blockInFd, lastCacheDropOffset, offset
+            - lastCacheDropOffset, NativeIO.POSIX_FADV_DONTNEED);
+      } catch (Exception e) {
+        LOG.warn("Unable to drop cache on file close", e);
+      }
+    }
+    if (curReadahead != null) {
+      curReadahead.cancel();
+    }
+  
     IOException ioe = null;
     // close checksum file
     if(checksumIn!=null) {
@@ -196,6 +237,7 @@ class BlockSender implements java.io.Clo
         ioe = e;
       }
       blockIn = null;
+      blockInFd = null;
     }
     // throw IOException if there is any
     if(ioe!= null) {
@@ -387,10 +429,21 @@ class BlockSender implements java.io.Clo
     }
     this.throttler = throttler;
 
-    long initialOffset = offset;
+    initialOffset = offset;
     long totalRead = 0;
     OutputStream streamForSendChunks = out;
     
+    lastCacheDropOffset = initialOffset;
+
+    if (isLongRead() && blockInFd != null) {
+      // Advise that this file descriptor will be accessed sequentially.
+      NativeIO.posixFadviseIfPossible(blockInFd, 0, 0,
+          NativeIO.POSIX_FADV_SEQUENTIAL);
+    }
+
+    // Trigger readahead of beginning of file if configured.
+    manageOsCache();
+
     final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; 
     try {
       try {
@@ -433,6 +486,7 @@ class BlockSender implements java.io.Clo
       ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
 
       while (endOffset > offset) {
+        manageOsCache();
         long len = sendChunks(pktBuf, maxChunksPerPacket, 
                               streamForSendChunks);
         offset += len;
@@ -465,6 +519,39 @@ class BlockSender implements java.io.Clo
     return totalRead;
   }
   
+  /**
+   * Manage the OS buffer cache by performing read-ahead and drop-behind.
+   */
+  private void manageOsCache() throws IOException {
+    if (!isLongRead() || blockInFd == null) {
+      // don't manage cache manually for short-reads, like
+      // HBase random read workloads.
+      return;
+    }
+
+    // Perform readahead if necessary
+    if (readaheadLength > 0 && readaheadPool != null) {
+      curReadahead = readaheadPool.readaheadStream(clientTraceFmt, blockInFd,
+          offset, readaheadLength, Long.MAX_VALUE, curReadahead);
+    }
+
+    // Drop what we've just read from cache, since we aren't
+    // likely to need it again
+    long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES;
+    if (shouldDropCacheBehindRead && offset >= nextCacheDropOffset) {
+      long dropLength = offset - lastCacheDropOffset;
+      if (dropLength >= 1024) {
+        NativeIO.posixFadviseIfPossible(blockInFd, lastCacheDropOffset,
+            dropLength, NativeIO.POSIX_FADV_DONTNEED);
+      }
+      lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES;
+    }
+  }
+
+  private boolean isLongRead() {
+    return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES;
+  }
+
   boolean isBlockReadFully() {
     return blockReadFully;
   }

Modified: hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1362670&r1=1362669&r2=1362670&view=diff
==============================================================================
--- hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
(original)
+++ hadoop/common/branches/branch-1.1/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
Tue Jul 17 21:38:17 2012
@@ -227,6 +227,11 @@ public class DataNode extends Configured
   int socketTimeout;
   int socketWriteTimeout = 0;  
   boolean transferToAllowed = true;
+  private boolean dropCacheBehindWrites = false;
+  private boolean syncBehindWrites = false;
+  private boolean dropCacheBehindReads = false;
+  private long readaheadLength = 0;
+
   int writePacketSize = 0;
   private boolean durableSync;
   boolean isBlockTokenEnabled;
@@ -425,6 +430,19 @@ public class DataNode extends Configured
         new DataXceiverServer(ss, conf, this));
     this.threadGroup.setDaemon(true); // auto destroy when empty
 
+    this.readaheadLength = conf.getLong(
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY,
+        DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT);
+    this.dropCacheBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT);
+    this.syncBehindWrites = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY,
+        DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT);
+    this.dropCacheBehindReads = conf.getBoolean(
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
+        DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
+
     this.blockReportInterval =
       conf.getLong("dfs.blockreport.intervalMsec", BLOCKREPORT_INTERVAL);
     this.initialBlockReportDelay = conf.getLong("dfs.blockreport.initialDelay",
@@ -2172,4 +2190,20 @@ public class DataNode extends Configured
                        (DataXceiverServer) this.dataXceiverServer.getRunnable();
     return dxcs.balanceThrottler.getBandwidth();
   }
+
+  long getReadaheadLength() {
+    return readaheadLength;
+  }
+
+  boolean shouldDropCacheBehindWrites() {
+    return dropCacheBehindWrites;
+  }
+
+  boolean shouldDropCacheBehindReads() {
+    return dropCacheBehindReads;
+  }
+
+  boolean shouldSyncBehindWrites() {
+    return syncBehindWrites;
+  }
 }

Propchange: hadoop/common/branches/branch-1.1/src/mapred/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/branch-1/src/mapred:r1360944



Mime
View raw message