Return-Path: X-Original-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-hdfs-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id F32E110B92 for ; Mon, 3 Mar 2014 03:59:22 +0000 (UTC) Received: (qmail 88724 invoked by uid 500); 3 Mar 2014 03:59:21 -0000 Delivered-To: apmail-hadoop-hdfs-commits-archive@hadoop.apache.org Received: (qmail 88583 invoked by uid 500); 3 Mar 2014 03:59:20 -0000 Mailing-List: contact hdfs-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hdfs-dev@hadoop.apache.org Delivered-To: mailing list hdfs-commits@hadoop.apache.org Received: (qmail 87843 invoked by uid 99); 3 Mar 2014 03:59:08 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 03:59:07 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Mar 2014 03:59:02 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 64BCA2388A4A; Mon, 3 Mar 2014 03:58:40 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1573433 [3/3] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ src/main/java/org/ap... Date: Mon, 03 Mar 2014 03:58:38 -0000 To: hdfs-commits@hadoop.apache.org From: cmccabe@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140303035840.64BCA2388A4A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java?rev=1573433&r1=1573432&r2=1573433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitCache.java Mon Mar 3 03:58:37 2014 @@ -20,26 +20,50 @@ package org.apache.hadoop.hdfs; import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.PerDatanodeVisitorInfo; import org.apache.hadoop.hdfs.client.ShortCircuitCache; +import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; +import org.apache.hadoop.hdfs.client.DfsClientShmManager.Visitor; import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; +import org.apache.hadoop.hdfs.net.DomainPeer; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.TemporarySocketDirectory; +import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Time; import org.junit.Assert; +import org.junit.Assume; import org.junit.Test; import com.google.common.base.Preconditions; import com.google.common.base.Supplier; import java.io.DataOutputStream; +import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_CONTEXT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC; +import static org.hamcrest.CoreMatchers.equalTo; public class TestShortCircuitCache { static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); @@ -104,7 +128,7 @@ public class TestShortCircuitCache { return new ShortCircuitReplicaInfo( new ShortCircuitReplica(key, pair.getFileInputStreams()[0], pair.getFileInputStreams()[1], - cache, Time.monotonicNow())); + cache, Time.monotonicNow(), null)); } catch (IOException e) { throw new RuntimeException(e); } @@ -114,14 +138,14 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testCreateAndDestroy() throws Exception { ShortCircuitCache cache = - new ShortCircuitCache(10, 1, 10, 1, 1, 10000); + new ShortCircuitCache(10, 1, 10, 1, 1, 10000, 0); cache.close(); } @Test(timeout=60000) public void testAddAndRetrieve() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000); + new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000, 0); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"), @@ -170,7 +194,7 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testExpiry() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000); + new ShortCircuitCache(2, 1, 1, 10000000, 1, 10000, 0); final TestFileDescriptorPair pair = new TestFileDescriptorPair(); ShortCircuitReplicaInfo replicaInfo1 = cache.fetchOrCreate( @@ -203,7 +227,7 @@ public class TestShortCircuitCache { @Test(timeout=60000) public void testEviction() throws Exception { final ShortCircuitCache cache = - new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000); + new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10000, 0); final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] { new TestFileDescriptorPair(), new TestFileDescriptorPair(), @@ -269,10 +293,10 @@ public class TestShortCircuitCache { } @Test(timeout=60000) - public void testStaleness() throws Exception { + public void testTimeBasedStaleness() throws Exception { // Set up the cache with a short staleness time. final ShortCircuitCache cache = - new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10); + new ShortCircuitCache(2, 10000000, 1, 10000000, 1, 10, 0); final TestFileDescriptorPair pairs[] = new TestFileDescriptorPair[] { new TestFileDescriptorPair(), new TestFileDescriptorPair(), @@ -294,7 +318,7 @@ public class TestShortCircuitCache { new ShortCircuitReplica(key, pairs[iVal].getFileInputStreams()[0], pairs[iVal].getFileInputStreams()[1], - cache, Time.monotonicNow() + (iVal * HOUR_IN_MS))); + cache, Time.monotonicNow() + (iVal * HOUR_IN_MS), null)); } catch (IOException e) { throw new RuntimeException(e); } @@ -343,4 +367,149 @@ public class TestShortCircuitCache { } cache.close(); } + + private static Configuration createShortCircuitConf(String testName, + TemporarySocketDirectory sockDir) { + Configuration conf = new Configuration(); + conf.set(DFS_CLIENT_CONTEXT, testName); + conf.setLong(DFS_BLOCK_SIZE_KEY, 4096); + conf.set(DFS_DOMAIN_SOCKET_PATH_KEY, new File(sockDir.getDir(), + testName).getAbsolutePath()); + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true); + conf.setBoolean(DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, + false); + conf.setBoolean(DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC, false); + DFSInputStream.tcpReadsDisabledForTesting = true; + DomainSocket.disableBindPathValidation(); + Assume.assumeThat(DomainSocket.getLoadingFailureReason(), equalTo(null)); + return conf; + } + + private static DomainPeer getDomainPeerToDn(Configuration conf) + throws IOException { + DomainSocket sock = + DomainSocket.connect(conf.get(DFS_DOMAIN_SOCKET_PATH_KEY)); + return new DomainPeer(sock); + } + + @Test(timeout=60000) + public void testAllocShm() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf("testAllocShm", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + // The ClientShmManager starts off empty + Assert.assertEquals(0, info.size()); + } + }); + DomainPeer peer = getDomainPeerToDn(conf); + MutableBoolean usedPeer = new MutableBoolean(false); + ExtendedBlockId blockId = new ExtendedBlockId(123, "xyz"); + final DatanodeInfo datanode = + new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); + // Allocating the first shm slot requires using up a peer. + Slot slot = cache.allocShmSlot(datanode, peer, usedPeer, + blockId, "testAllocShm_client"); + Assert.assertNotNull(slot); + Assert.assertTrue(usedPeer.booleanValue()); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + // The ClientShmManager starts off empty + Assert.assertEquals(1, info.size()); + PerDatanodeVisitorInfo vinfo = info.get(datanode); + Assert.assertFalse(vinfo.disabled); + Assert.assertEquals(0, vinfo.full.size()); + Assert.assertEquals(1, vinfo.notFull.size()); + } + }); + cache.scheduleSlotReleaser(slot); + // Wait for the slot to be released, and the shared memory area to be + // closed. Since we didn't register this shared memory segment on the + // server, it will also be a test of how well the server deals with + // bogus client behavior. + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final MutableBoolean done = new MutableBoolean(false); + try { + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + done.setValue(info.get(datanode).full.isEmpty() && + info.get(datanode).notFull.isEmpty()); + } + }); + } catch (IOException e) { + LOG.error("error running visitor", e); + } + return done.booleanValue(); + } + }, 10, 60000); + cluster.shutdown(); + } + + @Test(timeout=60000) + public void testShmBasedStaleness() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf("testShmBasedStaleness", sockDir); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.dfs.getClientContext().getShortCircuitCache(); + String TEST_FILE = "/test_file"; + final int TEST_FILE_LEN = 8193; + final int SEED = 0xFADED; + DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN, + (short)1, SEED); + FSDataInputStream fis = fs.open(new Path(TEST_FILE)); + int first = fis.read(); + final ExtendedBlock block = + DFSTestUtil.getFirstBlock(fs, new Path(TEST_FILE)); + Assert.assertTrue(first != -1); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + ShortCircuitReplica replica = replicas.get( + ExtendedBlockId.fromExtendedBlock(block)); + Assert.assertNotNull(replica); + Assert.assertTrue(replica.getSlot().isValid()); + } + }); + // Stop the Namenode. This will close the socket keeping the client's + // shared memory segment alive, and make it stale. + cluster.getDataNodes().get(0).shutdown(); + cache.accept(new CacheVisitor() { + @Override + public void visit(int numOutstandingMmaps, + Map replicas, + Map failedLoads, + Map evictable, + Map evictableMmapped) { + ShortCircuitReplica replica = replicas.get( + ExtendedBlockId.fromExtendedBlock(block)); + Assert.assertNotNull(replica); + Assert.assertFalse(replica.getSlot().isValid()); + } + }); + cluster.shutdown(); + } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java?rev=1573433&r1=1573432&r2=1573433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestShortCircuitLocalRead.java Mon Mar 3 03:58:37 2014 @@ -420,7 +420,7 @@ public class TestShortCircuitLocalRead { } } - @Test + @Test(timeout=120000) public void testHandleTruncatedBlockFile() throws IOException { MiniDFSCluster cluster = null; HdfsConfiguration conf = new HdfsConfiguration(); Copied: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java (from r1573432, hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java) URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java?p2=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java&p1=hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java&r1=1573432&r2=1573433&rev=1573433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitSharedMemorySegment.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java Mon Mar 3 03:58:37 2014 @@ -1,104 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hdfs.client; - -import java.io.File; -import java.io.FileInputStream; -import java.util.ArrayList; - -import org.apache.commons.lang.SystemUtils; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.io.nativeio.NativeIO; -import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory; -import org.apache.hadoop.hdfs.client.ShortCircuitSharedMemorySegment.Slot; -import org.junit.Assume; -import org.junit.Before; -import org.junit.Test; -import org.junit.Assert; - -public class TestShortCircuitSharedMemorySegment { - public static final Log LOG = - LogFactory.getLog(TestShortCircuitSharedMemorySegment.class); - - private static final File TEST_BASE = - new File(System.getProperty("test.build.data", "/tmp")); - - @Before - public void before() { - Assume.assumeTrue(NativeIO.isAvailable()); - Assume.assumeTrue(SystemUtils.IS_OS_UNIX); - } - - @Test(timeout=60000) - public void testStartupShutdown() throws Exception { - File path = new File(TEST_BASE, "testStartupShutdown"); - path.mkdirs(); - SharedFileDescriptorFactory factory = - new SharedFileDescriptorFactory("shm_", path.getAbsolutePath()); - FileInputStream stream = factory.createDescriptor(4096); - ShortCircuitSharedMemorySegment shm = - new ShortCircuitSharedMemorySegment(stream); - shm.close(); - stream.close(); - FileUtil.fullyDelete(path); - } - - @Test(timeout=60000) - public void testAllocateSlots() throws Exception { - File path = new File(TEST_BASE, "testAllocateSlots"); - path.mkdirs(); - SharedFileDescriptorFactory factory = - new SharedFileDescriptorFactory("shm_", path.getAbsolutePath()); - FileInputStream stream = factory.createDescriptor(4096); - ShortCircuitSharedMemorySegment shm = - new ShortCircuitSharedMemorySegment(stream); - int numSlots = 0; - ArrayList slots = new ArrayList(); - while (true) { - Slot slot = shm.allocateNextSlot(); - if (slot == null) { - LOG.info("allocated " + numSlots + " slots before running out."); - break; - } - slots.add(slot); - numSlots++; - } - int slotIdx = 0; - for (Slot slot : slots) { - Assert.assertFalse(slot.addAnchor()); - Assert.assertEquals(slotIdx++, slot.getIndex()); - } - for (Slot slot : slots) { - slot.makeAnchorable(); - } - for (Slot slot : slots) { - Assert.assertTrue(slot.addAnchor()); - } - for (Slot slot : slots) { - slot.removeAnchor(); - } - shm.close(); - for (Slot slot : slots) { - slot.close(); - } - stream.close(); - FileUtil.fullyDelete(path); - } -} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1573433&r1=1573432&r2=1573433&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Mon Mar 3 03:58:37 2014 @@ -209,41 +209,11 @@ public class TestFsDatasetCache { return sizes; } - /** - * Blocks until cache usage hits the expected new value. - */ - private long verifyExpectedCacheUsage(final long expectedCacheUsed, - final long expectedBlocks) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - private int tries = 0; - - @Override - public Boolean get() { - long curCacheUsed = fsd.getCacheUsed(); - long curBlocks = fsd.getNumBlocksCached(); - if ((curCacheUsed != expectedCacheUsed) || - (curBlocks != expectedBlocks)) { - if (tries++ > 10) { - LOG.info("verifyExpectedCacheUsage: have " + - curCacheUsed + "/" + expectedCacheUsed + " bytes cached; " + - curBlocks + "/" + expectedBlocks + " blocks cached. " + - "memlock limit = " + - NativeIO.POSIX.getCacheManipulator().getMemlockLimit() + - ". Waiting..."); - } - return false; - } - return true; - } - }, 100, 60000); - return expectedCacheUsed; - } - private void testCacheAndUncacheBlock() throws Exception { LOG.info("beginning testCacheAndUncacheBlock"); final int NUM_BLOCKS = 5; - verifyExpectedCacheUsage(0, 0); + DFSTestUtil.verifyExpectedCacheUsage(0, 0, fsd); assertEquals(0, fsd.getNumBlocksCached()); // Write a test file @@ -271,7 +241,8 @@ public class TestFsDatasetCache { // Cache each block in succession, checking each time for (int i=0; i