Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D9AD37FB2 for ; Fri, 28 Oct 2011 22:18:46 +0000 (UTC) Received: (qmail 74117 invoked by uid 500); 28 Oct 2011 22:18:46 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 74085 invoked by uid 500); 28 Oct 2011 22:18:46 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 74076 invoked by uid 99); 28 Oct 2011 22:18:46 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2011 22:18:46 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 28 Oct 2011 22:18:44 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A27F023888CD; Fri, 28 Oct 2011 22:18:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1190625 - in /hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/datanode/ Date: Fri, 28 Oct 2011 22:18:24 -0000 To: hdfs-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111028221824.A27F023888CD@eris.apache.org> Author: todd Date: Fri Oct 28 22:18:23 2011 New Revision: 1190625 URL: http://svn.apache.org/viewvc?rev=1190625&view=rev Log: HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. Contributed by Todd Lipcon. Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1190625&r1=1190624&r2=1190625&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Fri Oct 28 22:18:23 2011 @@ -772,6 +772,8 @@ Release 0.23.0 - Unreleased HDFS-2500. Avoid file system operations in BPOfferService thread while processing deletes. (todd) + HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd) + BUG FIXES HDFS-2344. Fix the TestOfflineEditsViewer test failure in 0.23 branch. Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1190625&r1=1190624&r2=1190625&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Fri Oct 28 22:18:23 2011 @@ -54,6 +54,15 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_BACKUP_SERVICE_RPC_ADDRESS_KEY = "dfs.namenode.backup.dnrpc-address"; public static final String DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY = "dfs.datanode.balance.bandwidthPerSec"; public static final long DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT = 1024*1024; + public static final String DFS_DATANODE_READAHEAD_BYTES_KEY = "dfs.datanode.readahead.bytes"; + public static final long DFS_DATANODE_READAHEAD_BYTES_DEFAULT = 0; + public static final String DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY = "dfs.datanode.drop.cache.behind.writes"; + public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT = false; + public static final String DFS_DATANODE_SYNC_BEHIND_WRITES_KEY = "dfs.datanode.sync.behind.writes"; + public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; + public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; + public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; + public static final String DFS_NAMENODE_HTTP_ADDRESS_KEY = "dfs.namenode.http-address"; public static final String DFS_NAMENODE_HTTP_ADDRESS_DEFAULT = "0.0.0.0:50070"; public static final String DFS_NAMENODE_RPC_ADDRESS_KEY = "dfs.namenode.rpc-address"; Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1190625&r1=1190624&r2=1190625&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Oct 28 22:18:23 2011 @@ -24,6 +24,7 @@ import java.io.Closeable; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.FileDescriptor; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; @@ -46,6 +47,7 @@ import org.apache.hadoop.hdfs.server.pro import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.PureJavaCrc32; @@ -57,10 +59,13 @@ import org.apache.hadoop.util.PureJavaCr class BlockReceiver implements Closeable { public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; + + private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; private DataInputStream in = null; // from where data are read private DataChecksum checksum; // from where chunks of a block can be read private OutputStream out = null; // to block file at local disk + private FileDescriptor outFd; private OutputStream cout = null; // output stream for cehcksum file private DataOutputStream checksumOut = null; // to crc file at local disk private int bytesPerChecksum; @@ -80,6 +85,11 @@ class BlockReceiver implements Closeable private final DataNode datanode; volatile private boolean mirrorError; + // Cache management state + private boolean dropCacheBehindWrites; + private boolean syncBehindWrites; + private long lastCacheDropOffset = 0; + /** The client name. It is empty if a datanode is the client */ private final String clientname; private final boolean isClient; @@ -170,6 +180,8 @@ class BlockReceiver implements Closeable this.checksum = DataChecksum.newDataChecksum(in); this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); + this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); + this.syncBehindWrites = datanode.shouldSyncBehindWrites(); final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; @@ -177,6 +189,12 @@ class BlockReceiver implements Closeable this.bytesPerChecksum, this.checksumSize); if (streams != null) { this.out = streams.dataOut; + if (out instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)out).getFD(); + } else { + LOG.warn("Could not get file descriptor for outputstream of class " + + out.getClass()); + } this.cout = streams.checksumOut; this.checksumOut = new DataOutputStream(new BufferedOutputStream( streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); @@ -631,6 +649,8 @@ class BlockReceiver implements Closeable ); datanode.metrics.incrBytesWritten(len); + + dropOsCacheBehindWriter(offsetInBlock); } } catch (IOException iex) { datanode.checkDiskError(iex); @@ -645,6 +665,28 @@ class BlockReceiver implements Closeable return lastPacketInBlock?-1:len; } + private void dropOsCacheBehindWriter(long offsetInBlock) throws IOException { + try { + if (outFd != null && + offsetInBlock > lastCacheDropOffset + CACHE_DROP_LAG_BYTES) { + long twoWindowsAgo = lastCacheDropOffset - CACHE_DROP_LAG_BYTES; + if (twoWindowsAgo > 0 && dropCacheBehindWrites) { + NativeIO.posixFadviseIfPossible(outFd, 0, lastCacheDropOffset, + NativeIO.POSIX_FADV_DONTNEED); + } + + if (syncBehindWrites) { + NativeIO.syncFileRangeIfPossible(outFd, lastCacheDropOffset, CACHE_DROP_LAG_BYTES, + NativeIO.SYNC_FILE_RANGE_WRITE); + } + + lastCacheDropOffset += CACHE_DROP_LAG_BYTES; + } + } catch (Throwable t) { + LOG.warn("Couldn't drop os cache behind writer for " + block, t); + } + } + void writeChecksumHeader(DataOutputStream mirrorOut) throws IOException { checksum.writeHeader(mirrorOut); } Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java?rev=1190625&r1=1190624&r2=1190625&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java Fri Oct 28 22:18:23 2011 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.da import java.io.BufferedInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; +import java.io.FileDescriptor; import java.io.FileInputStream; import java.io.IOException; import java.io.InputStream; @@ -36,6 +37,9 @@ import org.apache.hadoop.hdfs.protocol.H import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.ReadaheadPool; +import org.apache.hadoop.io.ReadaheadPool.ReadaheadRequest; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.SocketOutputStream; import org.apache.hadoop.util.DataChecksum; @@ -118,7 +122,9 @@ class BlockSender implements java.io.Clo private DataInputStream checksumIn; /** Checksum utility */ private final DataChecksum checksum; - /** Starting position to read */ + /** Initial position to read */ + private long initialOffset; + /** Current position of read */ private long offset; /** Position of last byte to read from block file */ private final long endOffset; @@ -142,6 +148,24 @@ class BlockSender implements java.io.Clo private final String clientTraceFmt; private volatile ChunkChecksum lastChunkChecksum = null; + /** The file descriptor of the block being sent */ + private FileDescriptor blockInFd; + + // Cache-management related fields + private final long readaheadLength; + private boolean shouldDropCacheBehindRead; + private ReadaheadRequest curReadahead; + private long lastCacheDropOffset; + private static final long CACHE_DROP_INTERVAL_BYTES = 1024 * 1024; // 1MB + /** + * Minimum length of read below which management of the OS + * buffer cache is disabled. + */ + private static final long LONG_READ_THRESHOLD_BYTES = 256 * 1024; + + private static ReadaheadPool readaheadPool = + ReadaheadPool.getInstance(); + /** * Constructor * @@ -165,6 +189,8 @@ class BlockSender implements java.io.Clo this.corruptChecksumOk = corruptChecksumOk; this.verifyChecksum = verifyChecksum; this.clientTraceFmt = clientTraceFmt; + this.readaheadLength = datanode.getReadaheadLength(); + this.shouldDropCacheBehindRead = datanode.shouldDropCacheBehindReads(); synchronized(datanode.data) { this.replica = getReplica(block, datanode); @@ -277,6 +303,11 @@ class BlockSender implements java.io.Clo DataNode.LOG.debug("replica=" + replica); } blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset + if (blockIn instanceof FileInputStream) { + blockInFd = ((FileInputStream)blockIn).getFD(); + } else { + blockInFd = null; + } } catch (IOException ioe) { IOUtils.closeStream(this); IOUtils.closeStream(blockIn); @@ -288,6 +319,20 @@ class BlockSender implements java.io.Clo * close opened files. */ public void close() throws IOException { + if (blockInFd != null && shouldDropCacheBehindRead) { + // drop the last few MB of the file from cache + try { + NativeIO.posixFadviseIfPossible( + blockInFd, lastCacheDropOffset, offset - lastCacheDropOffset, + NativeIO.POSIX_FADV_DONTNEED); + } catch (Exception e) { + LOG.warn("Unable to drop cache on file close", e); + } + } + if (curReadahead != null) { + curReadahead.cancel(); + } + IOException ioe = null; if(checksumIn!=null) { try { @@ -304,6 +349,7 @@ class BlockSender implements java.io.Clo ioe = e; } blockIn = null; + blockInFd = null; } // throw IOException if there is any if(ioe!= null) { @@ -538,10 +584,20 @@ class BlockSender implements java.io.Clo if (out == null) { throw new IOException( "out stream is null" ); } - final long initialOffset = offset; + initialOffset = offset; long totalRead = 0; OutputStream streamForSendChunks = out; + lastCacheDropOffset = initialOffset; + + if (isLongRead() && blockInFd != null) { + // Advise that this file descriptor will be accessed sequentially. + NativeIO.posixFadviseIfPossible(blockInFd, 0, 0, NativeIO.POSIX_FADV_SEQUENTIAL); + } + + // Trigger readahead of beginning of file if configured. + manageOsCache(); + final long startTime = ClientTraceLog.isInfoEnabled() ? System.nanoTime() : 0; try { writeChecksumHeader(out); @@ -569,6 +625,7 @@ class BlockSender implements java.io.Clo ByteBuffer pktBuf = ByteBuffer.allocate(pktSize); while (endOffset > offset) { + manageOsCache(); long len = sendPacket(pktBuf, maxChunksPerPacket, streamForSendChunks, transferTo, throttler); offset += len; @@ -595,6 +652,45 @@ class BlockSender implements java.io.Clo } return totalRead; } + + /** + * Manage the OS buffer cache by performing read-ahead + * and drop-behind. + */ + private void manageOsCache() throws IOException { + if (!isLongRead() || blockInFd == null) { + // don't manage cache manually for short-reads, like + // HBase random read workloads. + return; + } + + // Perform readahead if necessary + if (readaheadLength > 0 && readaheadPool != null) { + curReadahead = readaheadPool.readaheadStream( + clientTraceFmt, blockInFd, + offset, readaheadLength, Long.MAX_VALUE, + curReadahead); + } + + // Drop what we've just read from cache, since we aren't + // likely to need it again + long nextCacheDropOffset = lastCacheDropOffset + CACHE_DROP_INTERVAL_BYTES; + if (shouldDropCacheBehindRead && + offset >= nextCacheDropOffset) { + long dropLength = offset - lastCacheDropOffset; + if (dropLength >= 1024) { + NativeIO.posixFadviseIfPossible(blockInFd, + lastCacheDropOffset, dropLength, + NativeIO.POSIX_FADV_DONTNEED); + } + lastCacheDropOffset += CACHE_DROP_INTERVAL_BYTES; + } + } + + private boolean isLongRead() { + return (endOffset - offset) > LONG_READ_THRESHOLD_BYTES; + } + /** * Write checksum header to the output stream Modified: hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1190625&r1=1190624&r2=1190625&view=diff ============================================================================== --- hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/branch-0.23/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 28 22:18:23 2011 @@ -104,6 +104,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -410,6 +411,11 @@ public class DataNode extends Configured int socketTimeout; int socketWriteTimeout = 0; boolean transferToAllowed = true; + private boolean dropCacheBehindWrites = false; + private boolean syncBehindWrites = false; + private boolean dropCacheBehindReads = false; + private long readaheadLength = 0; + int writePacketSize = 0; boolean isBlockTokenEnabled; BlockPoolTokenSecretManager blockPoolTokenSecretManager; @@ -493,6 +499,20 @@ public class DataNode extends Configured DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT); this.writePacketSize = conf.getInt(DFS_CLIENT_WRITE_PACKET_SIZE_KEY, DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT); + + this.readaheadLength = conf.getLong( + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_KEY, + DFSConfigKeys.DFS_DATANODE_READAHEAD_BYTES_DEFAULT); + this.dropCacheBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_WRITES_DEFAULT); + this.syncBehindWrites = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_KEY, + DFSConfigKeys.DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT); + this.dropCacheBehindReads = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, + DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); + this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); this.initialBlockReportDelay = conf.getLong( @@ -2859,4 +2879,20 @@ public class DataNode extends Configured (DataXceiverServer) this.dataXceiverServer.getRunnable(); return dxcs.balanceThrottler.getBandwidth(); } + + long getReadaheadLength() { + return readaheadLength; + } + + boolean shouldDropCacheBehindWrites() { + return dropCacheBehindWrites; + } + + boolean shouldDropCacheBehindReads() { + return dropCacheBehindReads; + } + + boolean shouldSyncBehindWrites() { + return syncBehindWrites; + } }