From common-commits-return-78142-archive-asf-public=cust-asf.ponee.io@hadoop.apache.org Mon Jan 29 05:12:31 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id 0A76218064F for ; Mon, 29 Jan 2018 05:12:31 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id EE6A4160C52; Mon, 29 Jan 2018 04:12:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 1AE79160C43 for ; Mon, 29 Jan 2018 05:12:29 +0100 (CET) Received: (qmail 90011 invoked by uid 500); 29 Jan 2018 04:12:12 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 87617 invoked by uid 99); 29 Jan 2018 04:12:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 29 Jan 2018 04:12:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 77939F4DBB; Mon, 29 Jan 2018 04:12:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Mon, 29 Jan 2018 04:12:32 -0000 Message-Id: <5671bf65a67048689ac2c2bdcad51c53@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [27/50] [abbrv] hadoop git commit: HDFS-12291: [SPS]: Provide a mechanism to recursively iterate and satisfy storage policy of all the files under the given dir. Contributed by Surendra Singh Lilhore. http://git-wip-us.apache.org/repos/asf/hadoop/blob/e94f1deb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 3375590..57e9f94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -21,6 +21,9 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KE import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.slf4j.LoggerFactory.getLogger; import java.io.FileNotFoundException; import java.io.IOException; @@ -61,8 +64,10 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.Assert; import org.junit.Test; +import org.mockito.Mockito; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.slf4j.event.Level; import com.google.common.base.Supplier; @@ -71,6 +76,12 @@ import com.google.common.base.Supplier; * moved and finding its suggested target locations to move. */ public class TestStoragePolicySatisfier { + + { + GenericTestUtils.setLogLevel( + getLogger(FSTreeTraverser.class), Level.DEBUG); + } + private static final String ONE_SSD = "ONE_SSD"; private static final String COLD = "COLD"; private static final Logger LOG = @@ -341,7 +352,9 @@ public class TestStoragePolicySatisfier { // take no effect for the sub-dir's file in the directory. DFSTestUtil.waitExpectedStorageType( - subFile2, StorageType.DEFAULT, 3, 30000, dfs); + subFile2, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + subFile2, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -1083,6 +1096,368 @@ public class TestStoragePolicySatisfier { } } + /** + * Test SPS for empty directory, xAttr should be removed. + */ + @Test(timeout = 300000) + public void testSPSForEmptyDirectory() throws IOException, TimeoutException, + InterruptedException { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path emptyDir = new Path("/emptyDir"); + fs.mkdirs(emptyDir); + fs.satisfyStoragePolicy(emptyDir); + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved("/emptyDir", + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for not exist directory. + */ + @Test(timeout = 300000) + public void testSPSForNonExistDirectory() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + Path emptyDir = new Path("/emptyDir"); + try { + fs.satisfyStoragePolicy(emptyDir); + fail("FileNotFoundException should throw"); + } catch (FileNotFoundException e) { + // nothing to do + } + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for directory tree which doesn't have files. + */ + @Test(timeout = 300000) + public void testSPSWithDirectoryTreeWithoutFile() throws Exception { + MiniDFSCluster cluster = null; + try { + Configuration conf = new Configuration(); + conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + cluster = new MiniDFSCluster.Builder(conf).build(); + cluster.waitActive(); + // Create directories + /* + * root + * | + * A--------C--------D + * | + * G----H----I + * | + * O + */ + DistributedFileSystem fs = cluster.getFileSystem(); + fs.mkdirs(new Path("/root/C/H/O")); + fs.mkdirs(new Path("/root/A")); + fs.mkdirs(new Path("/root/D")); + fs.mkdirs(new Path("/root/C/G")); + fs.mkdirs(new Path("/root/C/I")); + fs.satisfyStoragePolicy(new Path("/root")); + // Make sure satisfy xattr has been removed. + DFSTestUtil.waitForXattrRemoved("/root", + XATTR_SATISFY_STORAGE_POLICY, cluster.getNamesystem(), 30000); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } + + /** + * Test SPS for directory which has multilevel directories. + */ + @Test(timeout = 300000) + public void testMultipleLevelDirectoryForSatisfyStoragePolicy() + throws Exception { + try { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List files = getDFSListOfTree(); + dfs.setStoragePolicy(new Path("/root"), COLD); + dfs.satisfyStoragePolicy(new Path("/root")); + for (String fileName : files) { + // Wait till the block is moved to ARCHIVE + DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, + 30000, dfs); + } + } finally { + shutdownCluster(); + } + } + + /** + * Test SPS for batch processing. + */ + @Test(timeout = 300000) + public void testBatchProcessingForSPSDirectory() throws Exception { + try { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + config.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY, + true); + // Set queue max capacity + config.setInt(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_QUEUE_LIMIT_KEY, + 5); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + List files = getDFSListOfTree(); + LogCapturer logs = GenericTestUtils.LogCapturer.captureLogs(LogFactory + .getLog(FSTreeTraverser.class)); + + dfs.setStoragePolicy(new Path("/root"), COLD); + dfs.satisfyStoragePolicy(new Path("/root")); + for (String fileName : files) { + // Wait till the block is moved to ARCHIVE + DFSTestUtil.waitExpectedStorageType(fileName, StorageType.ARCHIVE, 2, + 30000, dfs); + } + waitForBlocksMovementResult(files.size(), 30000); + String expectedLogMessage = "StorageMovementNeeded queue remaining" + + " capacity is zero"; + assertTrue("Log output does not contain expected log message: " + + expectedLogMessage, logs.getOutput().contains(expectedLogMessage)); + } finally { + shutdownCluster(); + } + } + + + /** + * Test traverse when parent got deleted. + * 1. Delete /root when traversing Q + * 2. U, R, S should not be in queued. + */ + @Test + public void testTraverseWhenParentDeleted() throws Exception { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List expectedTraverseOrder = getDFSListOfTree(); + + //Remove files which will not be traverse when parent is deleted + expectedTraverseOrder.remove("/root/D/L/R"); + expectedTraverseOrder.remove("/root/D/L/S"); + expectedTraverseOrder.remove("/root/D/L/Q/U"); + FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); + + //Queue limit can control the traverse logic to wait for some free + //entry in queue. After 10 files, traverse control will be on U. + StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); + Mockito.when(sps.isRunning()).thenReturn(true); + BlockStorageMovementNeeded movmentNeededQueue = + new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); + INode rootINode = fsDir.getINode("/root"); + movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); + movmentNeededQueue.init(); + + //Wait for thread to reach U. + Thread.sleep(1000); + + dfs.delete(new Path("/root/D/L"), true); + + // Remove 10 element and make queue free, So other traversing will start. + for (int i = 0; i < 10; i++) { + String path = expectedTraverseOrder.remove(0); + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + //Wait to finish tree traverse + Thread.sleep(5000); + + // Check other element traversed in order and R,S should not be added in + // queue which we already removed from expected list + for (String path : expectedTraverseOrder) { + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + dfs.delete(new Path("/root"), true); + } + + /** + * Test traverse when root parent got deleted. + * 1. Delete L when traversing Q + * 2. E, M, U, R, S should not be in queued. + */ + @Test + public void testTraverseWhenRootParentDeleted() throws Exception { + StorageType[][] diskTypes = new StorageType[][] { + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.SSD}, + {StorageType.DISK, StorageType.DISK}}; + config.setLong("dfs.block.size", DEFAULT_BLOCK_SIZE); + hdfsCluster = startCluster(config, diskTypes, diskTypes.length, + storagesPerDatanode, capacity); + dfs = hdfsCluster.getFileSystem(); + createDirectoryTree(dfs); + + List expectedTraverseOrder = getDFSListOfTree(); + + // Remove files which will not be traverse when parent is deleted + expectedTraverseOrder.remove("/root/D/L/R"); + expectedTraverseOrder.remove("/root/D/L/S"); + expectedTraverseOrder.remove("/root/D/L/Q/U"); + expectedTraverseOrder.remove("/root/D/M"); + expectedTraverseOrder.remove("/root/E"); + FSDirectory fsDir = hdfsCluster.getNamesystem().getFSDirectory(); + StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); + Mockito.when(sps.isRunning()).thenReturn(true); + // Queue limit can control the traverse logic to wait for some free + // entry in queue. After 10 files, traverse control will be on U. + BlockStorageMovementNeeded movmentNeededQueue = + new BlockStorageMovementNeeded(fsDir.getFSNamesystem(), sps, 10); + movmentNeededQueue.init(); + INode rootINode = fsDir.getINode("/root"); + movmentNeededQueue.addToPendingDirQueue(rootINode.getId()); + // Wait for thread to reach U. + Thread.sleep(1000); + + dfs.delete(new Path("/root/D/L"), true); + + // Remove 10 element and make queue free, So other traversing will start. + for (int i = 0; i < 10; i++) { + String path = expectedTraverseOrder.remove(0); + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + // Wait to finish tree traverse + Thread.sleep(5000); + + // Check other element traversed in order and E, M, U, R, S should not be + // added in queue which we already removed from expected list + for (String path : expectedTraverseOrder) { + long trackId = movmentNeededQueue.get().getTrackId(); + INode inode = fsDir.getInode(trackId); + assertTrue("Failed to traverse tree, expected " + path + " but got " + + inode.getFullPathName(), path.equals(inode.getFullPathName())); + } + dfs.delete(new Path("/root"), true); + } + + private static void createDirectoryTree(DistributedFileSystem dfs) + throws Exception { + // tree structure + /* + * root + * | + * A--------B--------C--------D--------E + * | | + * F----G----H----I J----K----L----M + * | | + * N----O----P Q----R----S + * | | + * T U + */ + // create root Node and child + dfs.mkdirs(new Path("/root")); + DFSTestUtil.createFile(dfs, new Path("/root/A"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B")); + DFSTestUtil.createFile(dfs, new Path("/root/C"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/D")); + DFSTestUtil.createFile(dfs, new Path("/root/E"), 1024, (short) 3, 0); + + // Create /root/B child + DFSTestUtil.createFile(dfs, new Path("/root/B/F"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B/G")); + DFSTestUtil.createFile(dfs, new Path("/root/B/H"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/B/I"), 1024, (short) 3, 0); + + // Create /root/D child + DFSTestUtil.createFile(dfs, new Path("/root/D/J"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/D/K"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/D/L")); + DFSTestUtil.createFile(dfs, new Path("/root/D/M"), 1024, (short) 3, 0); + + // Create /root/B/G child + DFSTestUtil.createFile(dfs, new Path("/root/B/G/N"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/B/G/O"), 1024, (short) 3, 0); + dfs.mkdirs(new Path("/root/B/G/P")); + + // Create /root/D/L child + dfs.mkdirs(new Path("/root/D/L/Q")); + DFSTestUtil.createFile(dfs, new Path("/root/D/L/R"), 1024, (short) 3, 0); + DFSTestUtil.createFile(dfs, new Path("/root/D/L/S"), 1024, (short) 3, 0); + + // Create /root/B/G/P child + DFSTestUtil.createFile(dfs, new Path("/root/B/G/P/T"), 1024, (short) 3, 0); + + // Create /root/D/L/Q child + DFSTestUtil.createFile(dfs, new Path("/root/D/L/Q/U"), 1024, (short) 3, 0); + } + + private List getDFSListOfTree() { + List dfsList = new ArrayList<>(); + dfsList.add("/root/A"); + dfsList.add("/root/B/F"); + dfsList.add("/root/B/G/N"); + dfsList.add("/root/B/G/O"); + dfsList.add("/root/B/G/P/T"); + dfsList.add("/root/B/H"); + dfsList.add("/root/B/I"); + dfsList.add("/root/C"); + dfsList.add("/root/D/J"); + dfsList.add("/root/D/K"); + dfsList.add("/root/D/L/Q/U"); + dfsList.add("/root/D/L/R"); + dfsList.add("/root/D/L/S"); + dfsList.add("/root/D/M"); + dfsList.add("/root/E"); + return dfsList; + } + private String createFileAndSimulateFavoredNodes(int favoredNodesCount) throws IOException { ArrayList dns = hdfsCluster.getDataNodes(); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org