Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C464A108E8 for ; Mon, 3 Mar 2014 23:52:38 +0000 (UTC) Received: (qmail 70160 invoked by uid 500); 3 Mar 2014 23:52:37 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 70119 invoked by uid 500); 3 Mar 2014 23:52:37 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 70110 invoked by uid 99); 3 Mar 2014 23:52:36 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 23:52:36 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 23:52:25 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6FE5D2388AB8; Mon, 3 Mar 2014 23:52:02 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1573821 [1/2] - in /hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatr... Date: Mon, 03 Mar 2014 23:52:00 -0000 To: hdfs-commits@hadoop.apache.org From: szetszwo@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140303235202.6FE5D2388AB8@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: szetszwo Date: Mon Mar 3 23:51:58 2014 New Revision: 1573821 URL: http://svn.apache.org/r1573821 Log: Merge r1569890 through r1573813 from trunk. Added: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShm.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/DfsClientShmManager.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java - copied unchanged from r1573813, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java Removed: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitSharedMemorySegment.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/ (props changed) hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ (props changed) hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/TestOfflineImageViewer.java Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs:r1573120-1573813 Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon Mar 3 23:51:58 2014 @@ -370,6 +370,9 @@ Release 2.4.0 - UNRELEASED HDFS-4200. Reduce the size of synchronized sections in PacketResponder. (suresh) + HDFS-5950. The DFSClient and DataNode should use shared memory segments to + communicate short-circuit information. (cmccabe) + OPTIMIZATIONS HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery @@ -514,6 +517,9 @@ Release 2.4.0 - UNRELEASED HDFS-5956. A file size is multiplied by the replication factor in 'hdfs oiv -p FileDistribution' option. (Akira Ajisaka via wheat9) + HDFS-5866. '-maxSize' and '-step' option fail in OfflineImageViewer. + (Akira Ajisaka via wheat9) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) Propchange: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/ ------------------------------------------------------------------------------ Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java:r1573120-1573813 Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReader.java Mon Mar 3 23:51:58 2014 @@ -23,7 +23,6 @@ import java.util.EnumSet; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; /** * A BlockReader is responsible for reading a single block Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderFactory.java Mon Mar 3 23:51:58 2014 @@ -24,6 +24,7 @@ import java.io.FileInputStream; import java.io.IOException; import java.net.InetSocketAddress; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.Log; import org.apache.hadoop.classification.InterfaceAudience; @@ -32,6 +33,8 @@ import org.apache.hadoop.hdfs.client.Sho import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -410,7 +413,6 @@ public class BlockReaderFactory implemen setBlock(block). setStartOffset(startOffset). setShortCircuitReplica(info.getReplica()). - setDatanodeID(datanode). setVerifyChecksum(verifyChecksum). setCachingStrategy(cachingStrategy). build(); @@ -438,12 +440,31 @@ public class BlockReaderFactory implemen while (true) { curPeer = nextDomainPeer(); if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; + Slot slot = null; + ShortCircuitCache cache = clientContext.getShortCircuitCache(); try { - ShortCircuitReplicaInfo info = requestFileDescriptors(peer); + MutableBoolean usedPeer = new MutableBoolean(false); + slot = cache.allocShmSlot(datanode, peer, usedPeer, + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()), + clientName); + if (usedPeer.booleanValue()) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + ": allocShmSlot used up our previous socket " + + peer.getDomainSocket() + ". Allocating a new one..."); + } + curPeer = nextDomainPeer(); + if (curPeer == null) break; + peer = (DomainPeer)curPeer.peer; + } + ShortCircuitReplicaInfo info = requestFileDescriptors(peer, slot); clientContext.getPeerCache().put(datanode, peer); return info; } catch (IOException e) { + if (slot != null) { + cache.freeSlot(slot); + } if (curPeer.fromCache) { // Handle an I/O error we got when using a cached socket. // These are considered less serious, because the socket may be stale. @@ -470,16 +491,22 @@ public class BlockReaderFactory implemen /** * Request file descriptors from a DomainPeer. * + * @param peer The peer to use for communication. + * @param slot If non-null, the shared memory slot to associate with the + * new ShortCircuitReplica. + * * @return A ShortCircuitReplica object if we could communicate with the * datanode; null, otherwise. * @throws IOException If we encountered an I/O exception while communicating * with the datanode. */ - private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer) - throws IOException { + private ShortCircuitReplicaInfo requestFileDescriptors(DomainPeer peer, + Slot slot) throws IOException { + ShortCircuitCache cache = clientContext.getShortCircuitCache(); final DataOutputStream out = new DataOutputStream(new BufferedOutputStream(peer.getOutputStream())); - new Sender(out).requestShortCircuitFds(block, token, 1); + SlotId slotId = slot == null ? null : slot.getSlotId(); + new Sender(out).requestShortCircuitFds(block, token, slotId, 1); DataInputStream in = new DataInputStream(peer.getInputStream()); BlockOpResponseProto resp = BlockOpResponseProto.parseFrom( PBHelper.vintPrefixed(in)); @@ -491,9 +518,10 @@ public class BlockReaderFactory implemen sock.recvFileInputStreams(fis, buf, 0, buf.length); ShortCircuitReplica replica = null; try { - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - replica = new ShortCircuitReplica(key, fis[0], fis[1], - clientContext.getShortCircuitCache(), Time.monotonicNow()); + ExtendedBlockId key = + new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + replica = new ShortCircuitReplica(key, fis[0], fis[1], cache, + Time.monotonicNow(), slot); } catch (IOException e) { // This indicates an error reading from disk, or a format error. Since // it's not a socket communication problem, we return null rather than @@ -527,8 +555,9 @@ public class BlockReaderFactory implemen } return new ShortCircuitReplicaInfo(new InvalidToken(msg)); default: - LOG.warn(this + "unknown response code " + resp.getStatus() + " while " + - "attempting to set up short-circuit access. " + resp.getMessage()); + LOG.warn(this + ": unknown response code " + resp.getStatus() + + " while attempting to set up short-circuit access. " + + resp.getMessage()); clientContext.getDomainSocketFactory() .disableShortCircuitForPath(pathInfo.getPath()); return null; @@ -565,6 +594,7 @@ public class BlockReaderFactory implemen while (true) { BlockReaderPeer curPeer = nextDomainPeer(); if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; DomainPeer peer = (DomainPeer)curPeer.peer; BlockReader blockReader = null; try { @@ -630,6 +660,7 @@ public class BlockReaderFactory implemen try { curPeer = nextTcpPeer(); if (curPeer == null) break; + if (curPeer.fromCache) remainingCacheTries--; peer = curPeer.peer; blockReader = getRemoteBlockReader(peer); return blockReader; @@ -662,7 +693,7 @@ public class BlockReaderFactory implemen return null; } - private static class BlockReaderPeer { + public static class BlockReaderPeer { final Peer peer; final boolean fromCache; @@ -681,7 +712,6 @@ public class BlockReaderFactory implemen if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, true); if (peer != null) { - remainingCacheTries--; if (LOG.isTraceEnabled()) { LOG.trace("nextDomainPeer: reusing existing peer " + peer); } @@ -706,7 +736,6 @@ public class BlockReaderFactory implemen if (remainingCacheTries > 0) { Peer peer = clientContext.getPeerCache().get(datanode, false); if (peer != null) { - remainingCacheTries--; if (LOG.isTraceEnabled()) { LOG.trace("nextTcpPeer: reusing existing peer " + peer); } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java Mon Mar 3 23:51:58 2014 @@ -17,26 +17,21 @@ */ package org.apache.hadoop.hdfs; -import java.io.FileInputStream; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.util.EnumSet; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.ReadOption; import org.apache.hadoop.hdfs.client.ClientMmap; -import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.DFSClient.Conf; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.util.DirectBufferPool; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; import com.google.common.annotations.VisibleForTesting; @@ -70,8 +65,6 @@ class BlockReaderLocal implements BlockR private String filename; private ShortCircuitReplica replica; private long dataPos; - private DatanodeID datanodeID; - private boolean mlocked; private ExtendedBlock block; public Builder(Conf conf) { @@ -108,16 +101,6 @@ class BlockReaderLocal implements BlockR return this; } - public Builder setDatanodeID(DatanodeID datanodeID) { - this.datanodeID = datanodeID; - return this; - } - - public Builder setMlocked(boolean mlocked) { - this.mlocked = mlocked; - return this; - } - public Builder setBlock(ExtendedBlock block) { this.block = block; return this; @@ -165,19 +148,9 @@ class BlockReaderLocal implements BlockR private final boolean verifyChecksum; /** - * If true, this block is mlocked on the DataNode. - */ - private final AtomicBoolean mlocked; - - /** * Name of the block, for logging purposes. */ private final String filename; - - /** - * DataNode which contained this block. - */ - private final DatanodeID datanodeID; /** * Block ID and Block Pool ID. @@ -220,8 +193,6 @@ class BlockReaderLocal implements BlockR */ private int maxReadaheadLength; - private ClientMmap clientMmap; - /** * Buffers data starting at the current dataPos and extending on * for dataBuf.limit(). @@ -247,9 +218,7 @@ class BlockReaderLocal implements BlockR this.checksum = header.getChecksum(); this.verifyChecksum = builder.verifyChecksum && (this.checksum.getChecksumType().id != DataChecksum.CHECKSUM_NULL); - this.mlocked = new AtomicBoolean(builder.mlocked); this.filename = builder.filename; - this.datanodeID = builder.datanodeID; this.block = builder.block; this.bytesPerChecksum = checksum.getBytesPerChecksum(); this.checksumSize = checksum.getChecksumSize(); @@ -380,42 +349,55 @@ class BlockReaderLocal implements BlockR return total; } - private boolean getCanSkipChecksum() { - return (!verifyChecksum) || mlocked.get(); + private boolean createNoChecksumContext() { + if (verifyChecksum) { + return replica.addNoChecksumAnchor(); + } else { + return true; + } } - + + private void releaseNoChecksumContext() { + if (verifyChecksum) { + replica.removeNoChecksumAnchor(); + } + } + @Override public synchronized int read(ByteBuffer buf) throws IOException { - boolean canSkipChecksum = getCanSkipChecksum(); - - String traceString = null; - if (LOG.isTraceEnabled()) { - traceString = new StringBuilder(). - append("read("). - append("buf.remaining=").append(buf.remaining()). - append(", block=").append(block). - append(", filename=").append(filename). - append(", canSkipChecksum=").append(canSkipChecksum). - append(")").toString(); - LOG.info(traceString + ": starting"); - } - int nRead; + boolean canSkipChecksum = createNoChecksumContext(); try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(buf); - } else { - nRead = readWithBounceBuffer(buf, canSkipChecksum); + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read("). + append("buf.remaining=").append(buf.remaining()). + append(", block=").append(block). + append(", filename=").append(filename). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.info(traceString + ": starting"); + } + int nRead; + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(buf); + } else { + nRead = readWithBounceBuffer(buf, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.info(traceString + ": I/O error", e); + } + throw e; } - } catch (IOException e) { if (LOG.isTraceEnabled()) { - LOG.info(traceString + ": I/O error", e); + LOG.info(traceString + ": returning " + nRead); } - throw e; - } - if (LOG.isTraceEnabled()) { - LOG.info(traceString + ": returning " + nRead); + return nRead; + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); } - return nRead; } private synchronized int readWithoutBounceBuffer(ByteBuffer buf) @@ -531,34 +513,38 @@ class BlockReaderLocal implements BlockR @Override public synchronized int read(byte[] arr, int off, int len) throws IOException { - boolean canSkipChecksum = getCanSkipChecksum(); - String traceString = null; - if (LOG.isTraceEnabled()) { - traceString = new StringBuilder(). - append("read(arr.length=").append(arr.length). - append(", off=").append(off). - append(", len=").append(len). - append(", filename=").append(filename). - append(", block=").append(block). - append(", canSkipChecksum=").append(canSkipChecksum). - append(")").toString(); - LOG.trace(traceString + ": starting"); - } + boolean canSkipChecksum = createNoChecksumContext(); int nRead; try { - if (canSkipChecksum && zeroReadaheadRequested) { - nRead = readWithoutBounceBuffer(arr, off, len); - } else { - nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + String traceString = null; + if (LOG.isTraceEnabled()) { + traceString = new StringBuilder(). + append("read(arr.length=").append(arr.length). + append(", off=").append(off). + append(", len=").append(len). + append(", filename=").append(filename). + append(", block=").append(block). + append(", canSkipChecksum=").append(canSkipChecksum). + append(")").toString(); + LOG.trace(traceString + ": starting"); + } + try { + if (canSkipChecksum && zeroReadaheadRequested) { + nRead = readWithoutBounceBuffer(arr, off, len); + } else { + nRead = readWithBounceBuffer(arr, off, len, canSkipChecksum); + } + } catch (IOException e) { + if (LOG.isTraceEnabled()) { + LOG.trace(traceString + ": I/O error", e); + } + throw e; } - } catch (IOException e) { if (LOG.isTraceEnabled()) { - LOG.trace(traceString + ": I/O error", e); + LOG.trace(traceString + ": returning " + nRead); } - throw e; - } - if (LOG.isTraceEnabled()) { - LOG.trace(traceString + ": returning " + nRead); + } finally { + if (canSkipChecksum) releaseNoChecksumContext(); } return nRead; } @@ -648,28 +634,45 @@ class BlockReaderLocal implements BlockR return true; } + /** + * Get or create a memory map for this replica. + * + * There are two kinds of ClientMmap objects we could fetch here: one that + * will always read pre-checksummed data, and one that may read data that + * hasn't been checksummed. + * + * If we fetch the former, "safe" kind of ClientMmap, we have to increment + * the anchor count on the shared memory slot. This will tell the DataNode + * not to munlock the block until this ClientMmap is closed. + * If we fetch the latter, we don't bother with anchoring. + * + * @param opts The options to use, such as SKIP_CHECKSUMS. + * + * @return null on failure; the ClientMmap otherwise. + */ @Override public ClientMmap getClientMmap(EnumSet opts) { - if ((!opts.contains(ReadOption.SKIP_CHECKSUMS)) && - verifyChecksum && (!mlocked.get())) { - if (LOG.isTraceEnabled()) { - LOG.trace("can't get an mmap for " + block + " of " + filename + - " since SKIP_CHECKSUMS was not given, " + - "we aren't skipping checksums, and the block is not mlocked."); + boolean anchor = verifyChecksum && + (opts.contains(ReadOption.SKIP_CHECKSUMS) == false); + if (anchor) { + if (!createNoChecksumContext()) { + if (LOG.isTraceEnabled()) { + LOG.trace("can't get an mmap for " + block + " of " + filename + + " since SKIP_CHECKSUMS was not given, " + + "we aren't skipping checksums, and the block is not mlocked."); + } + return null; } - return null; } - return replica.getOrCreateClientMmap(); - } - - /** - * Set the mlocked state of the BlockReader. - * This method does NOT need to be synchronized because mlocked is atomic. - * - * @param mlocked the new mlocked state of the BlockReader. - */ - public void setMlocked(boolean mlocked) { - this.mlocked.set(mlocked); + ClientMmap clientMmap = null; + try { + clientMmap = replica.getOrCreateClientMmap(anchor); + } finally { + if ((clientMmap == null) && anchor) { + releaseNoChecksumContext(); + } + } + return clientMmap; } @VisibleForTesting @@ -681,4 +684,22 @@ class BlockReaderLocal implements BlockR int getMaxReadaheadLength() { return this.maxReadaheadLength; } + + /** + * Make the replica anchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceAnchorable() { + replica.getSlot().makeAnchorable(); + } + + /** + * Make the replica unanchorable. Normally this can only be done by the + * DataNode. This method is only for testing. + */ + @VisibleForTesting + void forceUnanchorable() { + replica.getSlot().makeUnanchorable(); + } } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ClientContext.java Mon Mar 3 23:51:58 2014 @@ -99,7 +99,8 @@ public class ClientContext { conf.shortCircuitMmapCacheSize, conf.shortCircuitMmapCacheExpiryMs, conf.shortCircuitMmapCacheRetryTimeout, - conf.shortCircuitCacheStaleThresholdMs); + conf.shortCircuitCacheStaleThresholdMs, + conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); this.peerCache = new PeerCache(conf.socketCacheCapacity, conf.socketCacheExpiry); this.useLegacyBlockReaderLocal = conf.useLegacyBlockReaderLocal; @@ -129,7 +130,9 @@ public class ClientContext { append(", useLegacyBlockReaderLocal = "). append(conf.useLegacyBlockReaderLocal). append(", domainSocketDataTraffic = "). - append(conf.domainSocketDataTraffic); + append(conf.domainSocketDataTraffic). + append(", shortCircuitSharedMemoryWatcherInterruptCheckMs = "). + append(conf.shortCircuitSharedMemoryWatcherInterruptCheckMs); return builder.toString(); } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java Mon Mar 3 23:51:58 2014 @@ -282,6 +282,7 @@ public class DFSClient implements java.i final boolean domainSocketDataTraffic; final int shortCircuitStreamsCacheSize; final long shortCircuitStreamsCacheExpiryMs; + final int shortCircuitSharedMemoryWatcherInterruptCheckMs; final int shortCircuitMmapCacheSize; final long shortCircuitMmapCacheExpiryMs; @@ -414,6 +415,9 @@ public class DFSClient implements java.i shortCircuitCacheStaleThresholdMs = conf.getLong( DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT); + shortCircuitSharedMemoryWatcherInterruptCheckMs = conf.getInt( + DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); datanodeRestartTimeout = conf.getLong( DFS_CLIENT_DATANODE_RESTART_TIMEOUT_KEY, Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Mon Mar 3 23:51:58 2014 @@ -477,6 +477,10 @@ public class DFSConfigKeys extends Commo public static final String DFS_NAMENODE_STARTUP_KEY = "dfs.namenode.startup"; public static final String DFS_DATANODE_KEYTAB_FILE_KEY = "dfs.datanode.keytab.file"; public static final String DFS_DATANODE_USER_NAME_KEY = "dfs.datanode.kerberos.principal"; + public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH = "dfs.datanode.shared.file.descriptor.path"; + public static final String DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT = "/dev/shm"; + public static final String DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS = "dfs.short.circuit.shared.memory.watcher.interrupt.check.ms"; + public static final int DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT = 60000; public static final String DFS_NAMENODE_KEYTAB_FILE_KEY = "dfs.namenode.keytab.file"; public static final String DFS_NAMENODE_USER_NAME_KEY = "dfs.namenode.kerberos.principal"; public static final String DFS_NAMENODE_INTERNAL_SPNEGO_USER_NAME_KEY = "dfs.namenode.kerberos.internal.spnego.principal"; Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Mon Mar 3 23:51:58 2014 @@ -39,6 +39,7 @@ import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.commons.io.IOUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.fs.ByteBufferReadable; import org.apache.hadoop.fs.ByteBufferUtil; @@ -1630,7 +1631,7 @@ implements ByteBufferReadable, CanSetDro success = true; } finally { if (!success) { - clientMmap.unref(); + IOUtils.closeQuietly(clientMmap); } } return buffer; @@ -1644,7 +1645,7 @@ implements ByteBufferReadable, CanSetDro "that was not created by this stream, " + buffer); } if (val instanceof ClientMmap) { - ((ClientMmap)val).unref(); + IOUtils.closeQuietly((ClientMmap)val); } else if (val instanceof ByteBufferPool) { ((ByteBufferPool)val).putBuffer(buffer); } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ExtendedBlockId.java Mon Mar 3 23:51:58 2014 @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs; import org.apache.commons.lang.builder.EqualsBuilder; import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; /** * An immutable key which identifies a block. @@ -34,6 +35,10 @@ final public class ExtendedBlockId { */ private final String bpId; + public static ExtendedBlockId fromExtendedBlock(ExtendedBlock block) { + return new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); + } + public ExtendedBlockId(long blockId, String bpId) { this.blockId = blockId; this.bpId = bpId; Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemotePeerFactory.java Mon Mar 3 23:51:58 2014 @@ -20,9 +20,7 @@ package org.apache.hadoop.hdfs; import java.io.IOException; import java.net.InetSocketAddress; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.net.Peer; -import org.apache.hadoop.security.UserGroupInformation; public interface RemotePeerFactory { /** Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ClientMmap.java Mon Mar 3 23:51:58 2014 @@ -19,26 +19,23 @@ package org.apache.hadoop.hdfs.client; import org.apache.hadoop.classification.InterfaceAudience; +import java.io.Closeable; import java.nio.MappedByteBuffer; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; /** - * A memory-mapped region used by an HDFS client. - * - * This class includes a reference count and some other information used by - * ClientMmapManager to track and cache mmaps. + * A reference to a memory-mapped region used by an HDFS client. */ @InterfaceAudience.Private -public class ClientMmap { +public class ClientMmap implements Closeable { static final Log LOG = LogFactory.getLog(ClientMmap.class); /** * A reference to the block replica which this mmap relates to. */ - private final ShortCircuitReplica replica; + private ShortCircuitReplica replica; /** * The java ByteBuffer object. @@ -46,33 +43,30 @@ public class ClientMmap { private final MappedByteBuffer map; /** - * Reference count of this ClientMmap object. + * Whether or not this ClientMmap anchors the replica into memory while + * it exists. Closing an anchored ClientMmap unanchors the replica. */ - private final AtomicInteger refCount = new AtomicInteger(1); + private final boolean anchored; - ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map) { + ClientMmap(ShortCircuitReplica replica, MappedByteBuffer map, + boolean anchored) { this.replica = replica; this.map = map; + this.anchored = anchored; } /** - * Increment the reference count. - * - * @return The new reference count. + * Close the ClientMmap object. */ - void ref() { - refCount.addAndGet(1); - } - - /** - * Decrement the reference count. - * - * The parent replica gets unreferenced each time the reference count - * of this object goes to 0. - */ - public void unref() { - refCount.addAndGet(-1); - replica.unref(); + @Override + public void close() { + if (replica != null) { + if (anchored) { + replica.removeNoChecksumAnchor(); + } + replica.unref(); + } + replica = null; } public MappedByteBuffer getMappedByteBuffer() { Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java Mon Mar 3 23:51:58 2014 @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hdfs.client; +import java.io.BufferedOutputStream; import java.io.Closeable; +import java.io.DataInputStream; +import java.io.DataOutputStream; import org.apache.hadoop.classification.InterfaceAudience; @@ -33,14 +36,23 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; +import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; +import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -154,6 +166,69 @@ public class ShortCircuitCache implement } } + /** + * A task which asks the DataNode to release a short-circuit shared memory + * slot. If successful, this will tell the DataNode to stop monitoring + * changes to the mlock status of the replica associated with the slot. + * It will also allow us (the client) to re-use this slot for another + * replica. If we can't communicate with the DataNode for some reason, + * we tear down the shared memory segment to avoid being in an inconsistent + * state. + */ + private class SlotReleaser implements Runnable { + /** + * The slot that we need to release. + */ + private final Slot slot; + + SlotReleaser(Slot slot) { + this.slot = slot; + } + + @Override + public void run() { + if (LOG.isTraceEnabled()) { + LOG.trace(ShortCircuitCache.this + ": about to release " + slot); + } + final DfsClientShm shm = (DfsClientShm)slot.getShm(); + final DomainSocket shmSock = shm.getPeer().getDomainSocket(); + DomainSocket sock = null; + DataOutputStream out = null; + final String path = shmSock.getPath(); + boolean success = false; + try { + sock = DomainSocket.connect(path); + out = new DataOutputStream( + new BufferedOutputStream(sock.getOutputStream())); + new Sender(out).releaseShortCircuitFds(slot.getSlotId()); + DataInputStream in = new DataInputStream(sock.getInputStream()); + ReleaseShortCircuitAccessResponseProto resp = + ReleaseShortCircuitAccessResponseProto.parseFrom( + PBHelper.vintPrefixed(in)); + if (resp.getStatus() != Status.SUCCESS) { + String error = resp.hasError() ? resp.getError() : "(unknown)"; + throw new IOException(resp.getStatus().toString() + ": " + error); + } + if (LOG.isTraceEnabled()) { + LOG.trace(ShortCircuitCache.this + ": released " + slot); + } + success = true; + } catch (IOException e) { + LOG.error(ShortCircuitCache.this + ": failed to release " + + "short-circuit shared memory slot " + slot + " by sending " + + "ReleaseShortCircuitAccessRequestProto to " + path + + ". Closing shared memory segment.", e); + } finally { + if (success) { + shmManager.freeSlot(slot); + } else { + shm.getEndpointShmManager().shutdown(shm); + } + IOUtils.cleanup(LOG, sock, out); + } + } + } + public interface ShortCircuitReplicaCreator { /** * Attempt to create a ShortCircuitReplica object. @@ -173,9 +248,17 @@ public class ShortCircuitCache implement /** * The executor service that runs the cacheCleaner. */ - private final ScheduledThreadPoolExecutor executor + private final ScheduledThreadPoolExecutor cleanerExecutor + = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). + setDaemon(true).setNameFormat("ShortCircuitCache_Cleaner"). + build()); + + /** + * The executor service that runs the cacheCleaner. + */ + private final ScheduledThreadPoolExecutor releaserExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactoryBuilder(). - setDaemon(true).setNameFormat("ShortCircuitCache Cleaner"). + setDaemon(true).setNameFormat("ShortCircuitCache_SlotReleaser"). build()); /** @@ -253,6 +336,11 @@ public class ShortCircuitCache implement private int outstandingMmapCount = 0; /** + * Manages short-circuit shared memory segments for the client. + */ + private final DfsClientShmManager shmManager; + + /** * Create a {@link ShortCircuitCache} object from a {@link Configuration} */ public static ShortCircuitCache fromConf(Configuration conf) { @@ -268,12 +356,14 @@ public class ShortCircuitCache implement conf.getLong(DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS, DFSConfigKeys.DFS_CLIENT_MMAP_RETRY_TIMEOUT_MS_DEFAULT), conf.getLong(DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS, - DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT)); + DFSConfigKeys.DFS_CLIENT_SHORT_CIRCUIT_REPLICA_STALE_THRESHOLD_MS_DEFAULT), + conf.getInt(DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT)); } public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs, int maxEvictableMmapedSize, long maxEvictableMmapedLifespanMs, - long mmapRetryTimeoutMs, long staleThresholdMs) { + long mmapRetryTimeoutMs, long staleThresholdMs, int shmInterruptCheckMs) { Preconditions.checkArgument(maxTotalSize >= 0); this.maxTotalSize = maxTotalSize; Preconditions.checkArgument(maxNonMmappedEvictableLifespanMs >= 0); @@ -284,6 +374,15 @@ public class ShortCircuitCache implement this.maxEvictableMmapedLifespanMs = maxEvictableMmapedLifespanMs; this.mmapRetryTimeoutMs = mmapRetryTimeoutMs; this.staleThresholdMs = staleThresholdMs; + DfsClientShmManager shmManager = null; + if (shmInterruptCheckMs > 0) { + try { + shmManager = new DfsClientShmManager(shmInterruptCheckMs); + } catch (IOException e) { + LOG.error("failed to create ShortCircuitShmManager", e); + } + } + this.shmManager = shmManager; } public long getMmapRetryTimeoutMs() { @@ -339,7 +438,14 @@ public class ShortCircuitCache implement void unref(ShortCircuitReplica replica) { lock.lock(); try { + // If the replica is stale, but we haven't purged it yet, let's do that. + // It would be a shame to evict a non-stale replica so that we could put + // a stale one into the cache. + if ((!replica.purged) && replica.isStale()) { + purge(replica); + } String addedString = ""; + boolean shouldTrimEvictionMaps = false; int newRefCount = --replica.refCount; if (newRefCount == 0) { // Close replica, since there are no remaining references to it. @@ -362,7 +468,7 @@ public class ShortCircuitCache implement insertEvictable(System.nanoTime(), replica, evictable); addedString = "added to evictable, "; } - trimEvictionMaps(); + shouldTrimEvictionMaps = true; } } else { Preconditions.checkArgument(replica.refCount >= 0, @@ -375,6 +481,9 @@ public class ShortCircuitCache implement (newRefCount + 1) + " -> " + newRefCount + StringUtils.getStackTrace(Thread.currentThread())); } + if (shouldTrimEvictionMaps) { + trimEvictionMaps(); + } } finally { lock.unlock(); } @@ -442,7 +551,7 @@ public class ShortCircuitCache implement replica = evictable.firstEntry().getValue(); } if (LOG.isTraceEnabled()) { - LOG.trace(this + ": trimEvictionMaps is purging " + + LOG.trace(this + ": trimEvictionMaps is purging " + replica + StringUtils.getStackTrace(Thread.currentThread())); } purge(replica); @@ -542,7 +651,7 @@ public class ShortCircuitCache implement } if (LOG.isTraceEnabled()) { StringBuilder builder = new StringBuilder(); - builder.append(this).append(": ").append(": removed "). + builder.append(this).append(": ").append(": purged "). append(replica).append(" from the cache."); if (removedFromInfoMap) { builder.append(" Removed from the replicaInfoMap."); @@ -706,7 +815,7 @@ public class ShortCircuitCache implement cacheCleaner = new CacheCleaner(); long rateMs = cacheCleaner.getRateInMs(); ScheduledFuture future = - executor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, + cleanerExecutor.scheduleAtFixedRate(cacheCleaner, rateMs, rateMs, TimeUnit.MILLISECONDS); cacheCleaner.setFuture(future); if (LOG.isDebugEnabled()) { @@ -716,16 +825,16 @@ public class ShortCircuitCache implement } } - ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica) { + ClientMmap getOrCreateClientMmap(ShortCircuitReplica replica, + boolean anchored) { Condition newCond; lock.lock(); try { while (replica.mmapData != null) { - if (replica.mmapData instanceof ClientMmap) { + if (replica.mmapData instanceof MappedByteBuffer) { ref(replica); - ClientMmap clientMmap = (ClientMmap)replica.mmapData; - clientMmap.ref(); - return clientMmap; + MappedByteBuffer mmap = (MappedByteBuffer)replica.mmapData; + return new ClientMmap(replica, mmap, anchored); } else if (replica.mmapData instanceof Long) { long lastAttemptTimeMs = (Long)replica.mmapData; long delta = Time.monotonicNow() - lastAttemptTimeMs; @@ -762,12 +871,11 @@ public class ShortCircuitCache implement newCond.signalAll(); return null; } else { - ClientMmap clientMmap = new ClientMmap(replica, map); outstandingMmapCount++; - replica.mmapData = clientMmap; + replica.mmapData = map; ref(replica); newCond.signalAll(); - return clientMmap; + return new ClientMmap(replica, map, anchored); } } finally { lock.unlock(); @@ -878,4 +986,58 @@ public class ShortCircuitCache implement return "ShortCircuitCache(0x" + Integer.toHexString(System.identityHashCode(this)) + ")"; } + + /** + * Allocate a new shared memory slot. + * + * @param datanode The datanode to allocate a shm slot with. + * @param peer A peer connected to the datanode. + * @param usedPeer Will be set to true if we use up the provided peer. + * @param blockId The block id and block pool id of the block we're + * allocating this slot for. + * @param clientName The name of the DFSClient allocating the shared + * memory. + * @return Null if short-circuit shared memory is disabled; + * a short-circuit memory slot otherwise. + * @throws IOException An exception if there was an error talking to + * the datanode. + */ + public Slot allocShmSlot(DatanodeInfo datanode, + DomainPeer peer, MutableBoolean usedPeer, + ExtendedBlockId blockId, String clientName) throws IOException { + if (shmManager != null) { + return shmManager.allocSlot(datanode, peer, usedPeer, + blockId, clientName); + } else { + return null; + } + } + + /** + * Free a slot immediately. + * + * ONLY use this if the DataNode is not yet aware of the slot. + * + * @param slot The slot to free. + */ + public void freeSlot(Slot slot) { + Preconditions.checkState(shmManager != null); + slot.makeInvalid(); + shmManager.freeSlot(slot); + } + + /** + * Schedule a shared memory slot to be released. + * + * @param slot The slot to release. + */ + public void scheduleSlotReleaser(Slot slot) { + Preconditions.checkState(shmManager != null); + releaserExecutor.execute(new SlotReleaser(slot)); + } + + @VisibleForTesting + public DfsClientShmManager getDfsClientShmManager() { + return shmManager; + } } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitReplica.java Mon Mar 3 23:51:58 2014 @@ -28,6 +28,7 @@ import java.nio.channels.FileChannel.Map import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; @@ -81,6 +82,11 @@ public class ShortCircuitReplica { private final long creationTimeMs; /** + * If non-null, the shared memory slot associated with this replica. + */ + private final Slot slot; + + /** * Current mmap state. * * Protected by the cache lock. @@ -114,7 +120,7 @@ public class ShortCircuitReplica { public ShortCircuitReplica(ExtendedBlockId key, FileInputStream dataStream, FileInputStream metaStream, - ShortCircuitCache cache, long creationTimeMs) throws IOException { + ShortCircuitCache cache, long creationTimeMs, Slot slot) throws IOException { this.key = key; this.dataStream = dataStream; this.metaStream = metaStream; @@ -126,6 +132,7 @@ public class ShortCircuitReplica { } this.cache = cache; this.creationTimeMs = creationTimeMs; + this.slot = slot; } /** @@ -141,21 +148,61 @@ public class ShortCircuitReplica { * Must be called with the cache lock held. */ boolean isStale() { - long deltaMs = Time.monotonicNow() - creationTimeMs; - long staleThresholdMs = cache.getStaleThresholdMs(); - if (deltaMs > staleThresholdMs) { + if (slot != null) { + // Check staleness by looking at the shared memory area we use to + // communicate with the DataNode. + boolean stale = !slot.isValid(); if (LOG.isTraceEnabled()) { - LOG.trace(this + " is stale because it's " + deltaMs + - " ms old, and staleThresholdMs = " + staleThresholdMs); + LOG.trace(this + ": checked shared memory segment. isStale=" + stale); } - return true; + return stale; } else { - if (LOG.isTraceEnabled()) { - LOG.trace(this + " is not stale because it's only " + deltaMs + - " ms old, and staleThresholdMs = " + staleThresholdMs); + // Fall back to old, time-based staleness method. + long deltaMs = Time.monotonicNow() - creationTimeMs; + long staleThresholdMs = cache.getStaleThresholdMs(); + if (deltaMs > staleThresholdMs) { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " is stale because it's " + deltaMs + + " ms old, and staleThresholdMs = " + staleThresholdMs); + } + return true; + } else { + if (LOG.isTraceEnabled()) { + LOG.trace(this + " is not stale because it's only " + deltaMs + + " ms old, and staleThresholdMs = " + staleThresholdMs); + } + return false; } + } + } + + /** + * Try to add a no-checksum anchor to our shared memory slot. + * + * It is only possible to add this anchor when the block is mlocked on the Datanode. + * The DataNode will not munlock the block until the number of no-checksum anchors + * for the block reaches zero. + * + * This method does not require any synchronization. + * + * @return True if we successfully added a no-checksum anchor. + */ + public boolean addNoChecksumAnchor() { + if (slot == null) { return false; } + return slot.addAnchor(); + } + + /** + * Remove a no-checksum anchor for our shared memory slot. + * + * This method does not require any synchronization. + */ + public void removeNoChecksumAnchor() { + if (slot != null) { + slot.removeAnchor(); + } } /** @@ -165,7 +212,7 @@ public class ShortCircuitReplica { */ @VisibleForTesting public boolean hasMmap() { - return ((mmapData != null) && (mmapData instanceof ClientMmap)); + return ((mmapData != null) && (mmapData instanceof MappedByteBuffer)); } /** @@ -174,8 +221,8 @@ public class ShortCircuitReplica { * Must be called with the cache lock held. */ void munmap() { - ClientMmap clientMmap = (ClientMmap)mmapData; - NativeIO.POSIX.munmap(clientMmap.getMappedByteBuffer()); + MappedByteBuffer mmap = (MappedByteBuffer)mmapData; + NativeIO.POSIX.munmap(mmap); mmapData = null; } @@ -186,12 +233,25 @@ public class ShortCircuitReplica { * cache or elsewhere. */ void close() { + String suffix = ""; + Preconditions.checkState(refCount == 0, "tried to close replica with refCount " + refCount + ": " + this); + refCount = -1; Preconditions.checkState(purged, "tried to close unpurged replica " + this); - if (hasMmap()) munmap(); + if (hasMmap()) { + munmap(); + suffix += " munmapped."; + } IOUtils.cleanup(LOG, dataStream, metaStream); + if (slot != null) { + cache.scheduleSlotReleaser(slot); + suffix += " scheduling " + slot + " for later release."; + } + if (LOG.isTraceEnabled()) { + LOG.trace("closed " + this + suffix); + } } public FileInputStream getDataStream() { @@ -210,8 +270,8 @@ public class ShortCircuitReplica { return key; } - public ClientMmap getOrCreateClientMmap() { - return cache.getOrCreateClientMmap(this); + public ClientMmap getOrCreateClientMmap(boolean anchor) { + return cache.getOrCreateClientMmap(this, anchor); } MappedByteBuffer loadMmapInternal() { @@ -250,6 +310,11 @@ public class ShortCircuitReplica { this.evictableTimeNs = evictableTimeNs; } + @VisibleForTesting + public Slot getSlot() { + return slot; + } + /** * Convert the replica to a string for debugging purposes. * Note that we can't take the lock here. Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Mon Mar 3 23:51:58 2014 @@ -23,6 +23,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -116,14 +117,30 @@ public interface DataTransferProtocol { * * @param blk The block to get file descriptors for. * @param blockToken Security token for accessing the block. + * @param slotId The shared memory slot id to use, or null + * to use no slot id. * @param maxVersion Maximum version of the block data the client - * can understand. + * can understand. */ public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - int maxVersion) throws IOException; + SlotId slotId, int maxVersion) throws IOException; /** + * Release a pair of short-circuit FDs requested earlier. + * + * @param slotId SlotID used by the earlier file descriptors. + */ + public void releaseShortCircuitFds(final SlotId slotId) throws IOException; + + /** + * Request a short circuit shared memory area from a DataNode. + * + * @pram clientName The name of the client. + */ + public void requestShortCircuitShm(String clientName) throws IOException; + + /** * Receive a block from a source datanode * and then notifies the namenode * to remove the copy from the original datanode. Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Op.java Mon Mar 3 23:51:58 2014 @@ -35,7 +35,9 @@ public enum Op { COPY_BLOCK((byte)84), BLOCK_CHECKSUM((byte)85), TRANSFER_BLOCK((byte)86), - REQUEST_SHORT_CIRCUIT_FDS((byte)87); + REQUEST_SHORT_CIRCUIT_FDS((byte)87), + RELEASE_SHORT_CIRCUIT_FDS((byte)88), + REQUEST_SHORT_CIRCUIT_SHM((byte)89); /** The code for this operation. */ public final byte code; Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Mon Mar 3 23:51:58 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpReadBlockProto; @@ -33,6 +34,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -82,6 +85,12 @@ public abstract class Receiver implement case REQUEST_SHORT_CIRCUIT_FDS: opRequestShortCircuitFds(in); break; + case RELEASE_SHORT_CIRCUIT_FDS: + opReleaseShortCircuitFds(in); + break; + case REQUEST_SHORT_CIRCUIT_SHM: + opRequestShortCircuitShm(in); + break; default: throw new IOException("Unknown op " + op + " in data stream"); } @@ -141,9 +150,26 @@ public abstract class Receiver implement private void opRequestShortCircuitFds(DataInputStream in) throws IOException { final OpRequestShortCircuitAccessProto proto = OpRequestShortCircuitAccessProto.parseFrom(vintPrefixed(in)); + SlotId slotId = (proto.hasSlotId()) ? + PBHelper.convert(proto.getSlotId()) : null; requestShortCircuitFds(PBHelper.convert(proto.getHeader().getBlock()), PBHelper.convert(proto.getHeader().getToken()), - proto.getMaxVersion()); + slotId, proto.getMaxVersion()); + } + + /** Receive {@link Op#RELEASE_SHORT_CIRCUIT_FDS} */ + private void opReleaseShortCircuitFds(DataInputStream in) + throws IOException { + final ReleaseShortCircuitAccessRequestProto proto = + ReleaseShortCircuitAccessRequestProto.parseFrom(vintPrefixed(in)); + releaseShortCircuitFds(PBHelper.convert(proto.getSlotId())); + } + + /** Receive {@link Op#REQUEST_SHORT_CIRCUIT_SHM} */ + private void opRequestShortCircuitShm(DataInputStream in) throws IOException { + final ShortCircuitShmRequestProto proto = + ShortCircuitShmRequestProto.parseFrom(vintPrefixed(in)); + requestShortCircuitShm(proto.getClientName()); } /** Receive OP_REPLACE_BLOCK */ Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Mon Mar 3 23:51:58 2014 @@ -25,6 +25,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto; @@ -37,6 +38,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpRequestShortCircuitAccessProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.CachingStrategyProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmRequestProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; @@ -161,15 +164,37 @@ public class Sender implements DataTrans @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token blockToken, - int maxVersion) throws IOException { - OpRequestShortCircuitAccessProto proto = + SlotId slotId, int maxVersion) throws IOException { + OpRequestShortCircuitAccessProto.Builder builder = OpRequestShortCircuitAccessProto.newBuilder() .setHeader(DataTransferProtoUtil.buildBaseHeader( - blk, blockToken)).setMaxVersion(maxVersion).build(); + blk, blockToken)).setMaxVersion(maxVersion); + if (slotId != null) { + builder.setSlotId(PBHelper.convert(slotId)); + } + OpRequestShortCircuitAccessProto proto = builder.build(); send(out, Op.REQUEST_SHORT_CIRCUIT_FDS, proto); } @Override + public void releaseShortCircuitFds(SlotId slotId) throws IOException { + ReleaseShortCircuitAccessRequestProto proto = + ReleaseShortCircuitAccessRequestProto.newBuilder(). + setSlotId(PBHelper.convert(slotId)). + build(); + send(out, Op.RELEASE_SHORT_CIRCUIT_FDS, proto); + } + + @Override + public void requestShortCircuitShm(String clientName) throws IOException { + ShortCircuitShmRequestProto proto = + ShortCircuitShmRequestProto.newBuilder(). + setClientName(clientName). + build(); + send(out, Op.REQUEST_SHORT_CIRCUIT_SHM, proto); + } + + @Override public void replaceBlock(final ExtendedBlock blk, final Token blockToken, final String delHint, Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Mon Mar 3 23:51:58 2014 @@ -41,6 +41,8 @@ import org.apache.hadoop.fs.permission.F import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -91,6 +93,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmSlotProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmIdProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; @@ -2055,5 +2059,29 @@ public class PBHelper { .addAllEntries(convertAclEntryProto(e.getEntries())).build(); return GetAclStatusResponseProto.newBuilder().setResult(r).build(); } + + public static ShortCircuitShmSlotProto convert(SlotId slotId) { + return ShortCircuitShmSlotProto.newBuilder(). + setShmId(convert(slotId.getShmId())). + setSlotIdx(slotId.getSlotIdx()). + build(); + } + + public static ShortCircuitShmIdProto convert(ShmId shmId) { + return ShortCircuitShmIdProto.newBuilder(). + setHi(shmId.getHi()). + setLo(shmId.getLo()). + build(); + + } + + public static SlotId convert(ShortCircuitShmSlotProto slotId) { + return new SlotId(PBHelper.convert(slotId.getShmId()), + slotId.getSlotIdx()); + } + + public static ShmId convert(ShortCircuitShmIdProto shmId) { + return new ShmId(shmId.getHi(), shmId.getLo()); + } } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Mar 3 23:51:58 2014 @@ -185,6 +185,7 @@ public class DataNode extends Configured AtomicInteger xmitsInProgress = new AtomicInteger(); Daemon dataXceiverServer = null; Daemon localDataXceiverServer = null; + ShortCircuitRegistry shortCircuitRegistry = null; ThreadGroup threadGroup = null; private DNConf dnConf; private volatile boolean heartbeatsDisabledForTests = false; @@ -540,6 +541,7 @@ public class DataNode extends Configured domainPeerServer.getBindPath()); } } + this.shortCircuitRegistry = new ShortCircuitRegistry(conf); } static DomainPeerServer getDomainPeerServer(Configuration conf, @@ -1304,6 +1306,7 @@ public class DataNode extends Configured MBeans.unregister(dataNodeInfoBeanName); dataNodeInfoBeanName = null; } + if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown(); LOG.info("Shutdown complete."); synchronized(this) { // it is already false, but setting it again to avoid a findbug warning. @@ -1957,7 +1960,8 @@ public class DataNode extends Configured * * @return the fsdataset that stores the blocks */ - FsDatasetSpi getFSDataset() { + @VisibleForTesting + public FsDatasetSpi getFSDataset() { return data; } @@ -2568,4 +2572,8 @@ public class DataNode extends Configured DataStorage getStorage() { return storage; } + + public ShortCircuitRegistry getShortCircuitRegistry() { + return shortCircuitRegistry; + } } Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java Mon Mar 3 23:51:58 2014 @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR; +import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_INVALID; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_UNSUPPORTED; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.ERROR_ACCESS_TOKEN; import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SUCCESS; @@ -42,6 +43,9 @@ import java.nio.channels.ClosedChannelEx import java.util.Arrays; import org.apache.commons.logging.Log; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; @@ -58,6 +62,8 @@ import org.apache.hadoop.hdfs.protocol.p import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReleaseShortCircuitAccessResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitShmResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; @@ -65,11 +71,13 @@ import org.apache.hadoop.hdfs.security.t import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsUnsupportedException; import org.apache.hadoop.hdfs.server.datanode.DataNode.ShortCircuitFdsVersionException; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.NewShmInfo; import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -84,7 +92,7 @@ class DataXceiver extends Receiver imple public static final Log LOG = DataNode.LOG; static final Log ClientTraceLog = DataNode.ClientTraceLog; - private final Peer peer; + private Peer peer; private final String remoteAddress; // address of remote side private final String localAddress; // local address of this daemon private final DataNode datanode; @@ -220,7 +228,8 @@ class DataXceiver extends Receiver imple opStartTime = now(); processOp(op); ++opsProcessed; - } while (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0); + } while ((peer != null) && + (!peer.isClosed() && dnConf.socketKeepaliveTimeout > 0)); } catch (Throwable t) { LOG.error(datanode.getDisplayName() + ":DataXceiver error processing " + ((op == null) ? "unknown" : op.name()) + " operation " + @@ -232,15 +241,17 @@ class DataXceiver extends Receiver imple + datanode.getXceiverCount()); } updateCurrentThreadName("Cleaning up"); - dataXceiverServer.closePeer(peer); - IOUtils.closeStream(in); + if (peer != null) { + dataXceiverServer.closePeer(peer); + IOUtils.closeStream(in); + } } } @Override public void requestShortCircuitFds(final ExtendedBlock blk, final Token token, - int maxVersion) throws IOException { + SlotId slotId, int maxVersion) throws IOException { updateCurrentThreadName("Passing file descriptors for block " + blk); BlockOpResponseProto.Builder bld = BlockOpResponseProto.newBuilder(); FileInputStream fis[] = null; @@ -249,7 +260,17 @@ class DataXceiver extends Receiver imple throw new IOException("You cannot pass file descriptors over " + "anything but a UNIX domain socket."); } - fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); + if (slotId != null) { + datanode.shortCircuitRegistry.registerSlot( + ExtendedBlockId.fromExtendedBlock(blk), slotId); + } + try { + fis = datanode.requestShortCircuitFdsForRead(blk, token, maxVersion); + } finally { + if ((fis == null) && (slotId != null)) { + datanode.shortCircuitRegistry.unregisterSlot(slotId); + } + } bld.setStatus(SUCCESS); bld.setShortCircuitAccessVersion(DataNode.CURRENT_BLOCK_FORMAT_VERSION); } catch (ShortCircuitFdsVersionException e) { @@ -294,6 +315,122 @@ class DataXceiver extends Receiver imple } @Override + public void releaseShortCircuitFds(SlotId slotId) throws IOException { + boolean success = false; + try { + String error; + Status status; + try { + datanode.shortCircuitRegistry.unregisterSlot(slotId); + error = null; + status = Status.SUCCESS; + } catch (UnsupportedOperationException e) { + error = "unsupported operation"; + status = Status.ERROR_UNSUPPORTED; + } catch (Throwable e) { + error = e.getMessage(); + status = Status.ERROR_INVALID; + } + ReleaseShortCircuitAccessResponseProto.Builder bld = + ReleaseShortCircuitAccessResponseProto.newBuilder(); + bld.setStatus(status); + if (error != null) { + bld.setError(error); + } + bld.build().writeDelimitedTo(socketOut); + success = true; + } finally { + if (ClientTraceLog.isInfoEnabled()) { + BlockSender.ClientTraceLog.info(String.format( + "src: 127.0.0.1, dest: 127.0.0.1, op: RELEASE_SHORT_CIRCUIT_FDS," + + " shmId: %016x%016x, slotIdx: %d, srvID: %s, success: %b", + slotId.getShmId().getHi(), slotId.getShmId().getLo(), + slotId.getSlotIdx(), datanode.getDatanodeUuid(), success)); + } + } + } + + private void sendShmErrorResponse(Status status, String error) + throws IOException { + ShortCircuitShmResponseProto.newBuilder().setStatus(status). + setError(error).build().writeDelimitedTo(socketOut); + } + + private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo) + throws IOException { + ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS). + setId(PBHelper.convert(shmInfo.shmId)).build(). + writeDelimitedTo(socketOut); + // Send the file descriptor for the shared memory segment. + byte buf[] = new byte[] { (byte)0 }; + FileDescriptor shmFdArray[] = + new FileDescriptor[] { shmInfo.stream.getFD() }; + sock.sendFileDescriptors(shmFdArray, buf, 0, buf.length); + } + + @Override + public void requestShortCircuitShm(String clientName) throws IOException { + NewShmInfo shmInfo = null; + boolean success = false; + DomainSocket sock = peer.getDomainSocket(); + try { + if (sock == null) { + sendShmErrorResponse(ERROR_INVALID, "Bad request from " + + peer + ": must request a shared " + + "memory segment over a UNIX domain socket."); + return; + } + try { + shmInfo = datanode.shortCircuitRegistry. + createNewMemorySegment(clientName, sock); + // After calling #{ShortCircuitRegistry#createNewMemorySegment}, the + // socket is managed by the DomainSocketWatcher, not the DataXceiver. + releaseSocket(); + } catch (UnsupportedOperationException e) { + sendShmErrorResponse(ERROR_UNSUPPORTED, + "This datanode has not been configured to support " + + "short-circuit shared memory segments."); + return; + } catch (IOException e) { + sendShmErrorResponse(ERROR, + "Failed to create shared file descriptor: " + e.getMessage()); + return; + } + sendShmSuccessResponse(sock, shmInfo); + success = true; + } finally { + if (ClientTraceLog.isInfoEnabled()) { + if (success) { + BlockSender.ClientTraceLog.info(String.format( + "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " + + "op: REQUEST_SHORT_CIRCUIT_SHM," + + " shmId: %016x%016x, srvID: %s, success: true", + clientName, shmInfo.shmId.getHi(), shmInfo.shmId.getLo(), + datanode.getDatanodeUuid())); + } else { + BlockSender.ClientTraceLog.info(String.format( + "cliID: %s, src: 127.0.0.1, dest: 127.0.0.1, " + + "op: REQUEST_SHORT_CIRCUIT_SHM, " + + "shmId: n/a, srvID: %s, success: false", + clientName, datanode.getDatanodeUuid())); + } + } + if ((!success) && (peer == null)) { + // If we failed to pass the shared memory segment to the client, + // close the UNIX domain socket now. This will trigger the + // DomainSocketWatcher callback, cleaning up the segment. + IOUtils.cleanup(null, sock); + } + IOUtils.cleanup(null, shmInfo); + } + } + + void releaseSocket() { + dataXceiverServer.releasePeer(peer); + peer = null; + } + + @Override public void readBlock(final ExtendedBlock block, final Token blockToken, final String clientName, Modified: hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java?rev=1573821&r1=1573820&r2=1573821&view=diff ============================================================================== --- hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java (original) +++ hadoop/common/branches/HDFS-5535/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java Mon Mar 3 23:51:58 2014 @@ -248,4 +248,8 @@ class DataXceiverServer implements Runna synchronized int getNumPeers() { return peers.size(); } + + synchronized void releasePeer(Peer peer) { + peers.remove(peer); + } }