Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5502D10C17 for ; Mon, 3 Mar 2014 04:13:27 +0000 (UTC) Received: (qmail 6065 invoked by uid 500); 3 Mar 2014 04:13:25 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 5822 invoked by uid 500); 3 Mar 2014 04:13:24 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 5814 invoked by uid 99); 3 Mar 2014 04:13:23 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 04:13:23 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 04:13:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 412AD2388A4A; Mon, 3 Mar 2014 04:12:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1573436 [2/2] - in /hadoop/common/branches/branch-2.4/hadoop-hdfs-project: ./ hadoop-hdfs/ hadoop-hdfs/src/main/java/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ hadoop-hdfs/sr... Date: Mon, 03 Mar 2014 04:12:50 -0000 To: hdfs-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140303041252.412AD2388A4A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Mar 3 04:12:49 2014 @@ -266,6 +266,15 @@ public class FsDatasetCache { ExtendedBlockId key = new ExtendedBlockId(blockId, bpid); Value prevValue = mappableBlockMap.get(key); + if (!dataset.datanode.getShortCircuitRegistry(). + processBlockMunlockRequest(key)) { + // TODO: we probably want to forcibly uncache the block (and close the + // shm) after a certain timeout has elapsed. + if (LOG.isDebugEnabled()) { + LOG.debug(key + " is anchored, and can't be uncached now."); + } + return; + } if (prevValue == null) { if (LOG.isDebugEnabled()) { LOG.debug("Block with id " + blockId + ", pool " + bpid + " " + @@ -380,6 +389,7 @@ public class FsDatasetCache { LOG.debug("Successfully cached " + key + ". We are now caching " + newUsedBytes + " bytes in total."); } + dataset.datanode.getShortCircuitRegistry().processBlockMlockEvent(key); numBlocksCached.addAndGet(1); success = true; } finally { Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Mar 3 04:12:49 2014 @@ -47,8 +47,6 @@ import com.google.common.base.Preconditi */ @InterfaceAudience.Private public final class CachePool { - public static final Log LOG = LogFactory.getLog(CachePool.class); - @Nonnull private final String poolName; Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/datatransfer.proto Mon Mar 3 04:12:49 2014 @@ -128,6 +128,22 @@ message OpBlockChecksumProto { required BaseHeaderProto header = 1; } +/** + * An ID uniquely identifying a shared memory segment. + */ +message ShortCircuitShmIdProto { + required int64 hi = 1; + required int64 lo = 2; +} + +/** + * An ID uniquely identifying a slot within a shared memory segment. + */ +message ShortCircuitShmSlotProto { + required ShortCircuitShmIdProto shmId = 1; + required int32 slotIdx = 2; +} + message OpRequestShortCircuitAccessProto { required BaseHeaderProto header = 1; @@ -137,6 +153,32 @@ message OpRequestShortCircuitAccessProto * if the on-disk format changes. */ required uint32 maxVersion = 2; + + /** + * The shared memory slot to use, if we are using one. + */ + optional ShortCircuitShmSlotProto slotId = 3; +} + +message ReleaseShortCircuitAccessRequestProto { + required ShortCircuitShmSlotProto slotId = 1; +} + +message ReleaseShortCircuitAccessResponseProto { + required Status status = 1; + optional string error = 2; +} + +message ShortCircuitShmRequestProto { + // The name of the client requesting the shared memory segment. This is + // purely for logging / debugging purposes. + required string clientName = 1; +} + +message ShortCircuitShmResponseProto { + required Status status = 1; + optional string error = 2; + optional ShortCircuitShmIdProto id = 3; } message PacketHeaderProto { Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Mon Mar 3 04:12:49 2014 @@ -1138,6 +1138,27 @@ + dfs.datanode.shared.file.descriptor.path + /dev/shm + + The path to use when creating file descriptors that will be shared + between the DataNode and the DFSClient. Typically we use /dev/shm, so + that the file descriptors will not be written to disk. Systems that + don't have /dev/shm should use /tmp. + + + + + dfs.short.circuit.shared.memory.watcher.interrupt.check.ms + 60000 + + The length of time in milliseconds that the short-circuit shared memory + watcher will go between checking for java interruptions sent from other + threads. This is provided mainly for unit tests. + + + + dfs.namenode.kerberos.internal.spnego.principal ${dfs.web.authentication.kerberos.principal} Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/TestEnhancedByteBufferAccess.java Mon Mar 3 04:12:49 2014 @@ -17,9 +17,15 @@ */ package org.apache.hadoop.fs; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS; + import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.InputStream; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; @@ -34,6 +40,7 @@ import org.apache.commons.logging.LogFac import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.DFSClient; @@ -42,18 +49,24 @@ import org.apache.hadoop.hdfs.DFSTestUti import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; +import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; +import org.apache.hadoop.hdfs.protocol.CachePoolInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Assume; import org.junit.BeforeClass; @@ -71,12 +84,28 @@ public class TestEnhancedByteBufferAcces private static final Log LOG = LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName()); - static TemporarySocketDirectory sockDir; + static private TemporarySocketDirectory sockDir; + + static private CacheManipulator prevCacheManipulator; @BeforeClass public static void init() { sockDir = new TemporarySocketDirectory(); DomainSocket.disableBindPathValidation(); + prevCacheManipulator = NativeIO.POSIX.getCacheManipulator(); + NativeIO.POSIX.setCacheManipulator(new CacheManipulator() { + @Override + public void mlock(String identifier, + ByteBuffer mmap, long length) throws IOException { + LOG.info("mlocking " + identifier); + } + }); + } + + @AfterClass + public static void teardown() { + // Restore the original CacheManipulator + NativeIO.POSIX.setCacheManipulator(prevCacheManipulator); } private static byte[] byteBufferToArray(ByteBuffer buf) { @@ -86,12 +115,14 @@ public class TestEnhancedByteBufferAcces return resultArray; } + private static int BLOCK_SIZE = 4096; + public static HdfsConfiguration initZeroCopyTest() { Assume.assumeTrue(NativeIO.isAvailable()); Assume.assumeTrue(SystemUtils.IS_OS_UNIX); HdfsConfiguration conf = new HdfsConfiguration(); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3); conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100); conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY, @@ -99,6 +130,9 @@ public class TestEnhancedByteBufferAcces "TestRequestMmapAccess._PORT.sock").getAbsolutePath()); conf.setBoolean(DFSConfigKeys. DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true); + conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000); + conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000); return conf; } @@ -549,4 +583,119 @@ public class TestEnhancedByteBufferAcces new File(TEST_PATH).delete(); } } + + /** + * Test that we can zero-copy read cached data even without disabling + * checksums. + */ + @Test(timeout=120000) + public void testZeroCopyReadOfCachedData() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + BlockReaderTestUtil.enableBlockReaderFactoryTracing(); + BlockReaderTestUtil.enableHdfsCachingTracing(); + + final int TEST_FILE_LENGTH = 16385; + final Path TEST_PATH = new Path("/a"); + final int RANDOM_SEED = 23453; + HdfsConfiguration conf = initZeroCopyTest(); + conf.setBoolean(DFSConfigKeys. + DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, false); + final String CONTEXT = "testZeroCopyReadOfCachedData"; + conf.set(DFSConfigKeys.DFS_CLIENT_CONTEXT, CONTEXT); + conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, + DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, 4096)); + MiniDFSCluster cluster = null; + ByteBuffer result = null; + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + FsDatasetSpi fsd = cluster.getDataNodes().get(0).getFSDataset(); + DistributedFileSystem fs = cluster.getFileSystem(); + DFSTestUtil.createFile(fs, TEST_PATH, + TEST_FILE_LENGTH, (short)1, RANDOM_SEED); + DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1); + byte original[] = DFSTestUtil. + calculateFileContentsFromSeed(RANDOM_SEED, TEST_FILE_LENGTH); + + // Prior to caching, the file can't be read via zero-copy + FSDataInputStream fsIn = fs.open(TEST_PATH); + try { + result = fsIn.read(null, TEST_FILE_LENGTH / 2, + EnumSet.noneOf(ReadOption.class)); + Assert.fail("expected UnsupportedOperationException"); + } catch (UnsupportedOperationException e) { + // expected + } + // Cache the file + fs.addCachePool(new CachePoolInfo("pool1")); + long directiveId = fs.addCacheDirective(new CacheDirectiveInfo.Builder(). + setPath(TEST_PATH). + setReplication((short)1). + setPool("pool1"). + build()); + int numBlocks = (int)Math.ceil((double)TEST_FILE_LENGTH / BLOCK_SIZE); + DFSTestUtil.verifyExpectedCacheUsage( + DFSTestUtil.roundUpToMultiple(TEST_FILE_LENGTH, BLOCK_SIZE), + numBlocks, cluster.getDataNodes().get(0).getFSDataset()); + try { + result = fsIn.read(null, TEST_FILE_LENGTH, + EnumSet.noneOf(ReadOption.class)); + } catch (UnsupportedOperationException e) { + Assert.fail("expected to be able to read cached file via zero-copy"); + } + // Verify result + Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, + BLOCK_SIZE), byteBufferToArray(result)); + // check that the replica is anchored + final ExtendedBlock firstBlock = + DFSTestUtil.getFirstBlock(fs, TEST_PATH); + final ShortCircuitCache cache = ClientContext.get( + CONTEXT, new DFSClient.Conf(conf)). getShortCircuitCache(); + waitForReplicaAnchorStatus(cache, firstBlock, true, true, 1); + // Uncache the replica + fs.removeCacheDirective(directiveId); + waitForReplicaAnchorStatus(cache, firstBlock, false, true, 1); + fsIn.releaseBuffer(result); + waitForReplicaAnchorStatus(cache, firstBlock, false, false, 1); + DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); + + fsIn.close(); + fs.close(); + cluster.shutdown(); + } + + private void waitForReplicaAnchorStatus(final ShortCircuitCache cache, + final ExtendedBlock block, final boolean expectedIsAnchorable, + final boolean expectedIsAnchored, final int expectedOutstandingMmaps) + throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final MutableBoolean result = new MutableBoolean(false); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + Assert.assertEquals(expectedOutstandingMmaps, numOutstandingMmaps); + ShortCircuitReplica replica = + replicas.get(ExtendedBlockId.fromExtendedBlock(block)); + Assert.assertNotNull(replica); + Slot slot = replica.getSlot(); + if ((expectedIsAnchorable != slot.isAnchorable()) || + (expectedIsAnchored != slot.isAnchored())) { + LOG.info("replica " + replica + " has isAnchorable = " + + slot.isAnchorable() + ", isAnchored = " + slot.isAnchored() + + ". Waiting for isAnchorable = " + expectedIsAnchorable + + ", isAnchored = " + expectedIsAnchored); + return; + } + result.setValue(true); + } + }); + return result.toBoolean(); + } + }, 10, 60000); + } } Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/BlockReaderTestUtil.java Mon Mar 3 04:12:49 2014 @@ -31,6 +31,7 @@ import java.util.Random; import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.DfsClientShmManager; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.net.Peer; @@ -38,9 +39,13 @@ import org.apache.hadoop.hdfs.net.TcpPee import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache; +import org.apache.hadoop.hdfs.server.namenode.CacheManager; import org.apache.hadoop.net.NetUtils; import org.apache.log4j.Level; import org.apache.log4j.LogManager; @@ -206,6 +211,15 @@ public class BlockReaderTestUtil { return cluster.getDataNode(ipcport); } + public static void enableHdfsCachingTracing() { + LogManager.getLogger(CacheReplicationMonitor.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(CacheManager.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(FsDatasetCache.class.getName()).setLevel( + Level.TRACE); + } + public static void enableBlockReaderFactoryTracing() { LogManager.getLogger(BlockReaderFactory.class.getName()).setLevel( Level.TRACE); @@ -213,5 +227,18 @@ public class BlockReaderTestUtil { Level.TRACE); LogManager.getLogger(ShortCircuitReplica.class.getName()).setLevel( Level.TRACE); + LogManager.getLogger(BlockReaderLocal.class.getName()).setLevel( + Level.TRACE); + } + + public static void enableShortCircuitShmTracing() { + LogManager.getLogger(DfsClientShmManager.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitRegistry.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(ShortCircuitShm.class.getName()).setLevel( + Level.TRACE); + LogManager.getLogger(DataNode.class.getName()).setLevel( + Level.TRACE); } } \ No newline at end of file Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Mon Mar 3 04:12:49 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs; import com.google.common.base.Charsets; import com.google.common.base.Joiner; +import com.google.common.base.Supplier; import com.google.common.collect.Lists; import org.apache.commons.io.FileUtils; @@ -48,15 +49,18 @@ import org.apache.hadoop.hdfs.server.com import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.VersionInfo; import java.io.*; @@ -1110,4 +1114,47 @@ public class DFSTestUtil { buf.duplicate().get(arr); return arr; } + + /** + * Blocks until cache usage hits the expected new value. + */ + public static long verifyExpectedCacheUsage(final long expectedCacheUsed, + final long expectedBlocks, final FsDatasetSpi fsd) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + private int tries = 0; + + @Override + public Boolean get() { + long curCacheUsed = fsd.getCacheUsed(); + long curBlocks = fsd.getNumBlocksCached(); + if ((curCacheUsed != expectedCacheUsed) || + (curBlocks != expectedBlocks)) { + if (tries++ > 10) { + LOG.info("verifyExpectedCacheUsage: have " + + curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + + curBlocks + "/" + expectedBlocks + " blocks cached. " + + "memlock limit = " + + NativeIO.POSIX.getCacheManipulator().getMemlockLimit() + + ". Waiting..."); + } + return false; + } + return true; + } + }, 100, 60000); + return expectedCacheUsed; + } + + /** + * Round a long value up to a multiple of a factor. + * + * @param val The value. + * @param factor The factor to round up to. Must be > 1. + * @return The rounded value. + */ + public static long roundUpToMultiple(long val, int factor) { + assert (factor > 1); + long c = (val + factor - 1) / factor; + return c * factor; + } } Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderFactory.java Mon Mar 3 04:12:49 2014 @@ -18,7 +18,9 @@ package org.apache.hadoop.hdfs; import java.io.File; +import java.io.IOException; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicBoolean; @@ -29,8 +31,11 @@ import org.apache.commons.logging.Log; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; @@ -47,6 +52,7 @@ import static org.apache.hadoop.hdfs.DFS import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; import static org.hamcrest.CoreMatchers.equalTo; @@ -56,10 +62,6 @@ public class TestBlockReaderFactory { @Before public void init() { DomainSocket.disableBindPathValidation(); - } - - @Before - public void before() { Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); } @@ -69,7 +71,7 @@ public class TestBlockReaderFactory { BlockReaderFactory.createShortCircuitReplicaInfoCallback = null; } - private static Configuration createShortCircuitConf(String testName, + public static Configuration createShortCircuitConf(String testName, TemporarySocketDirectory sockDir) { Configuration conf = new Configuration(); conf.set(DFS_CLIENT_CONTEXT, testName); @@ -99,6 +101,8 @@ public class TestBlockReaderFactory { // the client is. Both support UNIX domain reads. Configuration clientConf = createShortCircuitConf( "testFallbackFromShortCircuitToUnixDomainTraffic", sockDir); + clientConf.set(DFS_CLIENT_CONTEXT, + "testFallbackFromShortCircuitToUnixDomainTraffic_clientContext"); clientConf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, true); Configuration serverConf = new Configuration(clientConf); serverConf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, false); @@ -289,4 +293,87 @@ public class TestBlockReaderFactory { sockDir.close(); Assert.assertFalse(testFailed.get()); } + + /** + * Test that a client which supports short-circuit reads using + * shared memory can fall back to not using shared memory when + * the server doesn't support it. + */ + @Test + public void testShortCircuitReadFromServerWithoutShm() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration clientConf = createShortCircuitConf( + "testShortCircuitReadFromServerWithoutShm", sockDir); + Configuration serverConf = new Configuration(clientConf); + serverConf.setInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + clientConf.set(DFS_CLIENT_CONTEXT, + "testShortCircuitReadFromServerWithoutShm_clientContext"); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + final DatanodeInfo datanode = + new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + Assert.assertEquals(1, info.size()); + PerDatanodeVisitorInfo vinfo = info.get(datanode); + Assert.assertTrue(vinfo.disabled); + Assert.assertEquals(0, vinfo.full.size()); + Assert.assertEquals(0, vinfo.notFull.size()); + } + }); + cluster.shutdown(); + } + + /** + * Test that a client which does not support short-circuit reads using + * shared memory can talk with a server which supports it. + */ + @Test + public void testShortCircuitReadFromClientWithoutShm() throws Exception { + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration clientConf = createShortCircuitConf( + "testShortCircuitReadWithoutShm", sockDir); + Configuration serverConf = new Configuration(clientConf); + DFSInputStream.tcpReadsDisabledForTesting = true; + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build(); + cluster.waitActive(); + clientConf.setInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, 0); + clientConf.set(DFS_CLIENT_CONTEXT, + "testShortCircuitReadFromClientWithoutShm_clientContext"); + final DistributedFileSystem fs = + (DistributedFileSystem)FileSystem.get(cluster.getURI(0), clientConf); + final String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 4000; + final int SEED = 0xFADEC; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE)); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + Assert.assertEquals(null, cache.getDfsClientShmManager()); + cluster.shutdown(); + } } Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockReaderLocal.java Mon Mar 3 04:12:49 2014 @@ -23,19 +23,21 @@ import java.io.FileInputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; +import java.util.UUID; import java.util.concurrent.TimeoutException; import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId; import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory; import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.util.Time; @@ -132,6 +134,8 @@ public class TestBlockReaderLocal { byte original[] = new byte[BlockReaderLocalTest.TEST_LENGTH]; FileSystem fs = null; + ShortCircuitShm shm = null; + RandomAccessFile raf = null; try { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); cluster.waitActive(); @@ -156,7 +160,6 @@ public class TestBlockReaderLocal { File dataFile = MiniDFSCluster.getBlockFile(0, block); File metaFile = MiniDFSCluster.getBlockMetadataFile(0, block); - DatanodeID datanodeID = cluster.getDataNodes().get(0).getDatanodeId(); ShortCircuitCache shortCircuitCache = ClientContext.getFromConf(conf).getShortCircuitCache(); cluster.shutdown(); @@ -168,15 +171,23 @@ public class TestBlockReaderLocal { }; dataIn = streams[0]; metaIn = streams[1]; - ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId()); - ShortCircuitReplica replica = new ShortCircuitReplica( - key, dataIn, metaIn, shortCircuitCache, Time.now()); + ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), + block.getBlockPoolId()); + raf = new RandomAccessFile( + new File(sockDir.getDir().getAbsolutePath(), + UUID.randomUUID().toString()), "rw"); + raf.setLength(8192); + FileInputStream shmStream = new FileInputStream(raf.getFD()); + shm = new ShortCircuitShm(ShmId.createRandom(), shmStream); + ShortCircuitReplica replica = + new ShortCircuitReplica(key, dataIn, metaIn, shortCircuitCache, + Time.now(), shm.allocAndRegisterSlot( + ExtendedBlockId.fromExtendedBlock(block))); blockReaderLocal = new BlockReaderLocal.Builder( new DFSClient.Conf(conf)). setFilename(TEST_PATH.getName()). setBlock(block). setShortCircuitReplica(replica). - setDatanodeID(datanodeID). setCachingStrategy(new CachingStrategy(false, readahead)). setVerifyChecksum(checksum). build(); @@ -193,6 +204,8 @@ public class TestBlockReaderLocal { if (dataIn != null) dataIn.close(); if (metaIn != null) metaIn.close(); if (blockReaderLocal != null) blockReaderLocal.close(); + if (shm != null) shm.free(); + if (raf != null) raf.close(); } } @@ -369,13 +382,13 @@ public class TestBlockReaderLocal { assertArrayRegionsEqual(original, 6657, DFSTestUtil.asArray(buf), 0, 1); - reader.setMlocked(true); + reader.forceAnchorable(); readFully(reader, buf, 0, 5120); buf.flip(); assertArrayRegionsEqual(original, 6658, DFSTestUtil.asArray(buf), 0, 5120); - reader.setMlocked(false); + reader.forceUnanchorable(); readFully(reader, buf, 0, 513); buf.flip(); assertArrayRegionsEqual(original, 11778, @@ -544,10 +557,10 @@ public class TestBlockReaderLocal { assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); readFully(reader, buf, 10, 100); assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.setMlocked(true); + reader.forceAnchorable(); readFully(reader, buf, 110, 700); assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.setMlocked(false); + reader.forceUnanchorable(); reader.skip(1); // skip from offset 810 to offset 811 readFully(reader, buf, 811, 5); assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); @@ -599,10 +612,10 @@ public class TestBlockReaderLocal { assertArrayRegionsEqual(original, 1, buf.array(), 1, 9); readFully(reader, buf, 10, 100); assertArrayRegionsEqual(original, 10, buf.array(), 10, 100); - reader.setMlocked(true); + reader.forceAnchorable(); readFully(reader, buf, 110, 700); assertArrayRegionsEqual(original, 110, buf.array(), 110, 700); - reader.setMlocked(false); + reader.forceUnanchorable(); reader.skip(1); // skip from offset 810 to offset 811 readFully(reader, buf, 811, 5); assertArrayRegionsEqual(original, 811, buf.array(), 811, 5); Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Mon Mar 3 04:12:49 2014 @@ -20,26 +20,50 @@ package org.apache.hadoop.hdfs; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import java.io.DataOutputStream; +import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; +import static org.hamcrest.CoreMatchers.equalTo; public class TestShortCircuitCache { static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); @@ -104,7 +128,7 @@ public class TestShortCircuitCache { return new ShortCircuitReplicaInfo( new ShortCircuitReplica(key, pair.getFileInputStreams()[0], pair.getFileInputStreams()[1], - cache, Time.monotonicNow())); + cache, Time.monotonicNow(), null)); } catch (IOException e) { throw new RuntimeException(e); } @@ -114,14 +138,14 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testCreateAndDestroy() throws Exception { ShortCircuitCache cache = - new ShortCircuitCache(10, 1, 10, 1, 1, 10000); + new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0); cache.close(); } @Test(timeout=60000) public void testAddAndRetrieve() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000); + new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), @@ -170,7 +194,7 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testExpiry() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000); + new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000, 0); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate( @@ -203,7 +227,7 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testEviction() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000); + new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0); final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] { new TestFileDescriptorPair(), new TestFileDescriptorPair(), @@ -269,10 +293,10 @@ public class TestShortCircuitCache { } @Test(timeout=60000) - public void testStaleness() throws Exception { + public void testTimeBasedStaleness() throws Exception { // Set up the cache with a short staleness time. final ShortCircuitCache cache = - new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10); + new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10, 0); final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] { new TestFileDescriptorPair(), new TestFileDescriptorPair(), @@ -294,7 +318,7 @@ public class TestShortCircuitCache { new ShortCircuitReplica(key, pairs[iVal].getFileInputStreams()[0], pairs[iVal].getFileInputStreams()[1], - cache, Time.monotonicNow() + (iVal * HOUR_IN_MS))); + cache, Time.monotonicNow() + (iVal * HOUR_IN_MS), null)); } catch (IOException e) { throw new RuntimeException(e); } @@ -343,4 +367,149 @@ public class TestShortCircuitCache { } cache.close(); } + + private static Configuration createShortCircuitConf(String testName, + TemporarySocketDirectory sockDir) { + Configuration conf = new Configuration(); + conf.set(DFS_CLIENT_CONTEXT, testName); + conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + testName).getAbsolutePath()); + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + false); + conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); + DFSInputStream.tcpReadsDisabledForTesting = true; + DomainSocket.disableBindPathValidation(); + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + return conf; + } + + private static DomainPeer getDomainPeerToDn(Configuration conf) + throws IOException { + DomainSocket sock = + DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY)); + return new DomainPeer(sock); + } + + @Test(timeout=60000) + public void testAllocShm() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf("testAllocShm", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + // The ClientShmManager starts off empty + Assert.assertEquals(0, info.size()); + } + }); + DomainPeer peer = getDomainPeerToDn(conf); + MutableBoolean usedPeer = new MutableBoolean(false); + ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz"); + final DatanodeInfo datanode = + new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); + // Allocating the first shm slot requires using up a peer. + Slot slot = cache.allocShmSlot(datanode, peer, usedPeer, + blockId, "testAllocShm_client"); + Assert.assertNotNull(slot); + Assert.assertTrue(usedPeer.booleanValue()); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + // The ClientShmManager starts off empty + Assert.assertEquals(1, info.size()); + PerDatanodeVisitorInfo vinfo = info.get(datanode); + Assert.assertFalse(vinfo.disabled); + Assert.assertEquals(0, vinfo.full.size()); + Assert.assertEquals(1, vinfo.notFull.size()); + } + }); + cache.scheduleSlotReleaser(slot); + // Wait for the slot to be released, and the shared memory area to be + // closed. Since we didn't register this shared memory segment on the + // server, it will also be a test of how well the server deals with + // bogus client behavior. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final MutableBoolean done = new MutableBoolean(false); + try { + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + done.setValue(info.get(datanode).full.isEmpty() && + info.get(datanode).notFull.isEmpty()); + } + }); + } catch (IOException e) { + LOG.error("error running visitor", e); + } + return done.booleanValue(); + } + }, 10, 60000); + cluster.shutdown(); + } + + @Test(timeout=60000) + public void testShmBasedStaleness() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf("testShmBasedStaleness", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 8193; + final int SEED = 0xFADED; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + FSDataInputStream fis = fs.open(new Path(TEST_FILE)); + int first = fis.read(); + final ExtendedBlock block = + DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE)); + Assert.assertTrue(first != -1); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + ShortCircuitReplica replica = replicas.get( + ExtendedBlockId.fromExtendedBlock(block)); + Assert.assertNotNull(replica); + Assert.assertTrue(replica.getSlot().isValid()); + } + }); + // Stop the Namenode. This will close the socket keeping the client's + // shared memory segment alive, and make it stale. + cluster.getDataNodes().get(0).shutdown(); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + ShortCircuitReplica replica = replicas.get( + ExtendedBlockId.fromExtendedBlock(block)); + Assert.assertNotNull(replica); + Assert.assertFalse(replica.getSlot().isValid()); + } + }); + cluster.shutdown(); + } } Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Mar 3 04:12:49 2014 @@ -421,7 +421,7 @@ public class TestShortCircuitLocalRead { } } - @Test + @Test(timeout=120000) public void testHandleTruncatedBlockFile() throws IOException { MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); Modified: hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1573436&r1=1573435&r2=1573436&view=diff ============================================================================== --- hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original) +++ hadoop/common/branches/branch-2.4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Mon Mar 3 04:12:49 2014 @@ -209,41 +209,11 @@ public class TestFsDatasetCache { return sizes; } - /** - * Blocks until cache usage hits the expected new value. - */ - private long verifyExpectedCacheUsage(final long expectedCacheUsed, - final long expectedBlocks) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - private int tries = 0; - - @Override - public Boolean get() { - long curCacheUsed = fsd.getCacheUsed(); - long curBlocks = fsd.getNumBlocksCached(); - if ((curCacheUsed != expectedCacheUsed) || - (curBlocks != expectedBlocks)) { - if (tries++ > 10) { - LOG.info("verifyExpectedCacheUsage: have " + - curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + - curBlocks + "/" + expectedBlocks + " blocks cached. " + - "memlock limit = " + - NativeIO.POSIX.getCacheManipulator().getMemlockLimit() + - ". Waiting..."); - } - return false; - } - return true; - } - }, 100, 60000); - return expectedCacheUsed; - } - private void testCacheAndUncacheBlock() throws Exception { LOG.info("beginning testCacheAndUncacheBlock"); final int NUM_BLOCKS = 5; - verifyExpectedCacheUsage(0, 0); + DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); assertEquals(0, fsd.getNumBlocksCached()); // Write a test file @@ -271,7 +241,8 @@ public class TestFsDatasetCache { // Cache each block in succession, checking each time for (int i=0; i