Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3FB2F200B92 for ; Tue, 13 Sep 2016 10:29:31 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3E623160AD3; Tue, 13 Sep 2016 08:29:31 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E8E4C160AD8 for ; Tue, 13 Sep 2016 10:29:29 +0200 (CEST) Received: (qmail 64266 invoked by uid 500); 13 Sep 2016 08:29:17 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 63043 invoked by uid 99); 13 Sep 2016 08:29:16 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 08:29:16 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 92B9FE08F2; Tue, 13 Sep 2016 08:29:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: drankye@apache.org To: common-commits@hadoop.apache.org Date: Tue, 13 Sep 2016 08:29:55 -0000 Message-Id: <79cbe66675c24c22a2bf439660b4a733@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [41/50] [abbrv] hadoop git commit: HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer. archived-at: Tue, 13 Sep 2016 08:29:31 -0000 HDFS-10808. DiskBalancer does not execute multi-steps plan-redux. Contributed by Anu Engineer. Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bee9f57f Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bee9f57f Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bee9f57f Branch: refs/heads/HDFS-10285 Commit: bee9f57f5ca9f037ade932c6fd01b0dad47a1296 Parents: cba973f Author: Anu Engineer Authored: Fri Sep 9 15:00:39 2016 -0700 Committer: Anu Engineer Committed: Fri Sep 9 15:00:39 2016 -0700 ---------------------------------------------------------------------- .../hdfs/server/datanode/DiskBalancer.java | 31 +- .../server/diskbalancer/TestDiskBalancer.java | 492 ++++++++++++------- .../TestDiskBalancerWithMockMover.java | 17 +- 3 files changed, 339 insertions(+), 201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java index e9e2e5b..d853ae9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java @@ -501,14 +501,11 @@ public class DiskBalancer { public void run() { Thread.currentThread().setName("DiskBalancerThread"); LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}", - planFile, planID); - try { - for (Map.Entry entry : - workMap.entrySet()) { - blockMover.copyBlocks(entry.getKey(), entry.getValue()); - } - } finally { - blockMover.setExitFlag(); + planFile, planID); + for (Map.Entry entry : + workMap.entrySet()) { + blockMover.setRunnable(); + blockMover.copyBlocks(entry.getKey(), entry.getValue()); } } }); @@ -857,8 +854,8 @@ public class DiskBalancer { if (item.getErrorCount() >= getMaxError(item)) { item.setErrMsg("Error count exceeded."); - LOG.info("Maximum error count exceeded. Error count: {} Max error:{} " - , item.getErrorCount(), item.getMaxDiskErrors()); + LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ", + item.getErrorCount(), item.getMaxDiskErrors()); } return null; @@ -962,7 +959,8 @@ public class DiskBalancer { LOG.error("Exceeded the max error count. source {}, dest: {} " + "error count: {}", source.getBasePath(), dest.getBasePath(), item.getErrorCount()); - break; + this.setExitFlag(); + continue; } // Check for the block tolerance constraint. @@ -971,7 +969,8 @@ public class DiskBalancer { "blocks.", source.getBasePath(), dest.getBasePath(), item.getBytesCopied(), item.getBlocksCopied()); - break; + this.setExitFlag(); + continue; } ExtendedBlock block = getNextBlock(poolIters, item); @@ -979,7 +978,8 @@ public class DiskBalancer { if (block == null) { LOG.error("No source blocks, exiting the copy. Source: {}, " + "dest:{}", source.getBasePath(), dest.getBasePath()); - break; + this.setExitFlag(); + continue; } // check if someone told us exit, treat this as an interruption @@ -987,7 +987,7 @@ public class DiskBalancer { // for the thread, since both getNextBlock and moveBlocAcrossVolume // can take some time. if (!shouldRun()) { - break; + continue; } long timeUsed; @@ -1006,7 +1006,8 @@ public class DiskBalancer { LOG.error("Destination volume: {} does not have enough space to" + " accommodate a block. Block Size: {} Exiting from" + " copyBlocks.", dest.getBasePath(), block.getNumBytes()); - break; + this.setExitFlag(); + continue; } LOG.debug("Moved block with size {} from {} to {}", http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java index dc177fd..eb15bdc 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancer.java @@ -25,7 +25,6 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; -import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.balancer.TestBalancer; @@ -37,19 +36,18 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector; import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster; -import org.apache.hadoop.hdfs.server.diskbalancer.datamodel - .DiskBalancerDataNode; +import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode; import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume; import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.URISyntaxException; import java.util.LinkedList; import java.util.List; -import java.util.Random; import java.util.concurrent.TimeoutException; import static org.junit.Assert.assertEquals; @@ -62,6 +60,7 @@ import static org.junit.Assert.assertTrue; public class TestDiskBalancer { private static final String PLAN_FILE = "/system/current.plan.json"; + static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class); @Test public void testDiskBalancerNameNodeConnectivity() throws Exception { @@ -110,61 +109,278 @@ public class TestDiskBalancer { */ @Test public void testDiskBalancerEndToEnd() throws Exception { + Configuration conf = new HdfsConfiguration(); - final int defaultBlockSize = 100; conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, defaultBlockSize); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, defaultBlockSize); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); - final int numDatanodes = 1; - final String fileName = "/tmp.txt"; - final Path filePath = new Path(fileName); - final int blocks = 100; - final int blocksSize = 1024; - final int fileLen = blocks * blocksSize; - - - // Write a file and restart the cluster - long[] capacities = new long[]{defaultBlockSize * 2 * fileLen, - defaultBlockSize * 2 * fileLen}; - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(numDatanodes) - .storageCapacities(capacities) - .storageTypes(new StorageType[]{StorageType.DISK, StorageType.DISK}) - .storagesPerDatanode(2) + final int blockCount = 100; + final int blockSize = 1024; + final int diskCount = 2; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + + MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount) + .setBlockSize(blockSize) + .setDiskCount(diskCount) + .setNumDatanodes(dataNodeCount) + .setConf(conf) + .build(); + try { + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); + } finally { + cluster.shutdown(); + } + } + + @Test + public void testBalanceDataBetweenMultiplePairsOfVolumes() + throws Exception { + + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + final int blockCount = 1000; + final int blockSize = 1024; + + // create 3 disks, that means we will have 2 plans + // Move Data from disk0->disk1 and disk0->disk2. + final int diskCount = 3; + final int dataNodeCount = 1; + final int dataNodeIndex = 0; + final int sourceDiskIndex = 0; + + + MiniDFSCluster cluster = new ClusterBuilder() + .setBlockCount(blockCount) + .setBlockSize(blockSize) + .setDiskCount(diskCount) + .setNumDatanodes(dataNodeCount) + .setConf(conf) .build(); - FsVolumeImpl source = null; - FsVolumeImpl dest = null; + + try { + DataMover dataMover = new DataMover(cluster, dataNodeIndex, + sourceDiskIndex, conf, blockSize, blockCount); + dataMover.moveDataToSourceDisk(); + NodePlan plan = dataMover.generatePlan(); + + // 3 disks , The plan should move data both disks, + // so we must have 2 plan steps. + assertEquals(plan.getVolumeSetPlans().size(), 2); + + dataMover.executePlan(plan); + dataMover.verifyPlanExectionDone(); + dataMover.verifyAllVolumesHaveData(); + dataMover.verifyTolerance(plan, 0, sourceDiskIndex, 10); + } finally { + cluster.shutdown(); + } + } + + /** + * Sets alll Disks capacity to size specified. + * + * @param cluster - DiskBalancerCluster + * @param size - new size of the disk + */ + private void setVolumeCapacity(DiskBalancerCluster cluster, long size, + String diskType) { + Preconditions.checkNotNull(cluster); + for (DiskBalancerDataNode node : cluster.getNodes()) { + for (DiskBalancerVolume vol : + node.getVolumeSets().get(diskType).getVolumes()) { + vol.setCapacity(size); + } + node.getVolumeSets().get(diskType).computeVolumeDataDensity(); + } + } + + /** + * Helper class that allows us to create different kinds of MiniDFSClusters + * and populate data. + */ + static class ClusterBuilder { + private Configuration conf; + private int blockSize; + private int numDatanodes; + private int fileLen; + private int blockCount; + private int diskCount; + + public ClusterBuilder setConf(Configuration conf) { + this.conf = conf; + return this; + } + + public ClusterBuilder setBlockSize(int blockSize) { + this.blockSize = blockSize; + return this; + } + + public ClusterBuilder setNumDatanodes(int datanodeCount) { + this.numDatanodes = datanodeCount; + return this; + } + + public ClusterBuilder setBlockCount(int blockCount) { + this.blockCount = blockCount; + return this; + } + + public ClusterBuilder setDiskCount(int diskCount) { + this.diskCount = diskCount; + return this; + } + + private long[] getCapacities(int diskCount, int bSize, int fSize) { + Preconditions.checkState(diskCount > 0); + long[] capacities = new long[diskCount]; + for (int x = 0; x < diskCount; x++) { + capacities[x] = diskCount * bSize * fSize * 2L; + } + return capacities; + } + + private StorageType[] getStorageTypes(int diskCount) { + Preconditions.checkState(diskCount > 0); + StorageType[] array = new StorageType[diskCount]; + for (int x = 0; x < diskCount; x++) { + array[x] = StorageType.DISK; + } + return array; + } + + public MiniDFSCluster build() throws IOException, TimeoutException, + InterruptedException { + Preconditions.checkNotNull(this.conf); + Preconditions.checkState(blockSize > 0); + Preconditions.checkState(numDatanodes > 0); + fileLen = blockCount * blockSize; + Preconditions.checkState(fileLen > 0); + conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + + final String fileName = "/tmp.txt"; + Path filePath = new Path(fileName); + fileLen = blockCount * blockSize; + + + // Write a file and restart the cluster + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numDatanodes) + .storageCapacities(getCapacities(diskCount, blockSize, fileLen)) + .storageTypes(getStorageTypes(diskCount)) + .storagesPerDatanode(diskCount) + .build(); + generateData(filePath, cluster); + cluster.restartDataNodes(); + cluster.waitActive(); + return cluster; + } + + private void generateData(Path filePath, MiniDFSCluster cluster) + throws IOException, InterruptedException, TimeoutException { cluster.waitActive(); - Random r = new Random(); FileSystem fs = cluster.getFileSystem(0); TestBalancer.createFile(cluster, filePath, fileLen, (short) 1, numDatanodes - 1); - DFSTestUtil.waitReplication(fs, filePath, (short) 1); cluster.restartDataNodes(); cluster.waitActive(); + } + } - // Get the data node and move all data to one disk. - DataNode dnNode = cluster.getDataNodes().get(numDatanodes - 1); - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - source = (FsVolumeImpl) refs.get(0); - dest = (FsVolumeImpl) refs.get(1); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), - source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); - } + class DataMover { + private final MiniDFSCluster cluster; + private final int sourceDiskIndex; + private final int dataNodeIndex; + private final Configuration conf; + private final int blockCount; + private final int blockSize; + private DataNode node; + + /** + * Constructs a DataMover class. + * + * @param cluster - MiniDFSCluster. + * @param dataNodeIndex - Datanode to operate against. + * @param sourceDiskIndex - source Disk Index. + */ + public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int + sourceDiskIndex, Configuration conf, int blockSize, int + blockCount) { + this.cluster = cluster; + this.dataNodeIndex = dataNodeIndex; + this.node = cluster.getDataNodes().get(dataNodeIndex); + this.sourceDiskIndex = sourceDiskIndex; + this.conf = conf; + this.blockCount = blockCount; + this.blockSize = blockSize; + } + /** + * Moves all data to a source disk to create disk imbalance so we can run a + * planner. + * + * @throws IOException + */ + public void moveDataToSourceDisk() throws IOException { + moveAllDataToDestDisk(this.node, sourceDiskIndex); cluster.restartDataNodes(); cluster.waitActive(); + } + + /** + * Moves all data in the data node to one disk. + * + * @param dataNode - Datanode + * @param destDiskindex - Index of the destination disk. + */ + private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex) + throws IOException { + Preconditions.checkNotNull(dataNode); + Preconditions.checkState(destDiskindex >= 0); + try (FsDatasetSpi.FsVolumeReferences refs = + dataNode.getFSDataset().getFsVolumeReferences()) { + if (refs.size() <= destDiskindex) { + throw new IllegalArgumentException("Invalid Disk index."); + } + FsVolumeImpl dest = (FsVolumeImpl) refs.get(destDiskindex); + for (int x = 0; x < refs.size(); x++) { + if (x == destDiskindex) { + continue; + } + FsVolumeImpl source = (FsVolumeImpl) refs.get(x); + DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(), + source, dest); + + } + } + } + + /** + * Generates a NodePlan for the datanode specified. + * + * @return NodePlan. + */ + public NodePlan generatePlan() throws Exception { + // Start up a disk balancer and read the cluster info. - final DataNode newDN = cluster.getDataNodes().get(numDatanodes - 1); + node = cluster.getDataNodes().get(dataNodeIndex); ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); + ConnectorFactory.getCluster(cluster.getFileSystem(dataNodeIndex) + .getUri(), conf); DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector); @@ -173,11 +389,11 @@ public class TestDiskBalancer { // Rewrite the capacity in the model to show that disks need // re-balancing. - setVolumeCapacity(diskBalancerCluster, defaultBlockSize * 2 * fileLen, + setVolumeCapacity(diskBalancerCluster, blockSize * 2L * blockCount, "DISK"); // Pick a node to process. - nodesToProcess.add(diskBalancerCluster.getNodeByUUID(dnNode - .getDatanodeUuid())); + nodesToProcess.add(diskBalancerCluster.getNodeByUUID( + node.getDatanodeUuid())); diskBalancerCluster.setNodesToProcess(nodesToProcess); // Compute a plan. @@ -188,169 +404,91 @@ public class TestDiskBalancer { assertTrue(clusterplan.size() == 1); NodePlan plan = clusterplan.get(0); - plan.setNodeUUID(dnNode.getDatanodeUuid()); + plan.setNodeUUID(node.getDatanodeUuid()); plan.setTimeStamp(Time.now()); - String planJson = plan.toJson(); - String planID = DigestUtils.shaHex(planJson); + assertNotNull(plan.getVolumeSetPlans()); assertTrue(plan.getVolumeSetPlans().size() > 0); plan.getVolumeSetPlans().get(0).setTolerancePercent(10); + return plan; + } + + /** + * Waits for a plan executing to finish. + */ + public void executePlan(NodePlan plan) throws + IOException, TimeoutException, InterruptedException { + + node = cluster.getDataNodes().get(dataNodeIndex); + String planJson = plan.toJson(); + String planID = DigestUtils.shaHex(planJson); // Submit the plan and wait till the execution is done. - newDN.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false); - String jmxString = newDN.getDiskBalancerStatus(); + node.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, + false); + String jmxString = node.getDiskBalancerStatus(); assertNotNull(jmxString); DiskBalancerWorkStatus status = DiskBalancerWorkStatus.parseJson(jmxString); - DiskBalancerWorkStatus realStatus = newDN.queryDiskBalancerPlan(); + DiskBalancerWorkStatus realStatus = node.queryDiskBalancerPlan(); assertEquals(realStatus.getPlanID(), status.getPlanID()); GenericTestUtils.waitFor(new Supplier() { @Override public Boolean get() { try { - return newDN.queryDiskBalancerPlan().getResult() == + return node.queryDiskBalancerPlan().getResult() == DiskBalancerWorkStatus.Result.PLAN_DONE; } catch (IOException ex) { return false; } } }, 1000, 100000); - - - //verify that it worked. - dnNode = cluster.getDataNodes().get(numDatanodes - 1); - assertEquals(dnNode.queryDiskBalancerPlan().getResult(), - DiskBalancerWorkStatus.Result.PLAN_DONE); - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - source = (FsVolumeImpl) refs.get(0); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - } - - - // Tolerance - long delta = (plan.getVolumeSetPlans().get(0).getBytesToMove() - * 10) / 100; - assertTrue( - (DiskBalancerTestUtil.getBlockCount(source) * - defaultBlockSize + delta) >= - plan.getVolumeSetPlans().get(0).getBytesToMove()); - - } finally { - cluster.shutdown(); } - } - - @Test(timeout=60000) - public void testBalanceDataBetweenMultiplePairsOfVolumes() - throws Exception { - Configuration conf = new HdfsConfiguration(); - final int DEFAULT_BLOCK_SIZE = 2048; - conf.setBoolean(DFSConfigKeys.DFS_DISK_BALANCER_ENABLED, true); - conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE); - conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, DEFAULT_BLOCK_SIZE); - conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); - final int NUM_DATANODES = 1; - final long CAP = 512 * 1024; - final Path testFile = new Path("/testfile"); - MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) - .numDataNodes(NUM_DATANODES) - .storageCapacities(new long[]{CAP, CAP, CAP, CAP}) - .storagesPerDatanode(4) - .build(); - try { - cluster.waitActive(); - DistributedFileSystem fs = cluster.getFileSystem(); - TestBalancer.createFile(cluster, testFile, CAP, (short) 1, 0); - - DFSTestUtil.waitReplication(fs, testFile, (short) 1); - DataNode dnNode = cluster.getDataNodes().get(0); - // Move data out of two volumes to make them empty. - try (FsDatasetSpi.FsVolumeReferences refs = - dnNode.getFSDataset().getFsVolumeReferences()) { - assertEquals(4, refs.size()); - for (int i = 0; i < refs.size(); i += 2) { - FsVolumeImpl source = (FsVolumeImpl) refs.get(i); - FsVolumeImpl dest = (FsVolumeImpl) refs.get(i + 1); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) > 0); - DiskBalancerTestUtil.moveAllDataToDestVolume(dnNode.getFSDataset(), - source, dest); - assertTrue(DiskBalancerTestUtil.getBlockCount(source) == 0); - } - } - - cluster.restartDataNodes(); - cluster.waitActive(); - - // Start up a disk balancer and read the cluster info. - final DataNode dataNode = cluster.getDataNodes().get(0); - ClusterConnector nameNodeConnector = - ConnectorFactory.getCluster(cluster.getFileSystem(0).getUri(), conf); - - DiskBalancerCluster diskBalancerCluster = - new DiskBalancerCluster(nameNodeConnector); - diskBalancerCluster.readClusterInfo(); - List nodesToProcess = new LinkedList<>(); - // Rewrite the capacity in the model to show that disks need - // re-balancing. - setVolumeCapacity(diskBalancerCluster, CAP, "DISK"); - nodesToProcess.add(diskBalancerCluster.getNodeByUUID( - dataNode.getDatanodeUuid())); - diskBalancerCluster.setNodesToProcess(nodesToProcess); - - // Compute a plan. - List clusterPlan = diskBalancerCluster.computePlan(10.0f); - - NodePlan plan = clusterPlan.get(0); - assertEquals(2, plan.getVolumeSetPlans().size()); - plan.setNodeUUID(dnNode.getDatanodeUuid()); - plan.setTimeStamp(Time.now()); - String planJson = plan.toJson(); - String planID = DigestUtils.shaHex(planJson); - dataNode.submitDiskBalancerPlan(planID, 1, PLAN_FILE, planJson, false); - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - try { - return dataNode.queryDiskBalancerPlan().getResult() == - DiskBalancerWorkStatus.Result.PLAN_DONE; - } catch (IOException ex) { - return false; - } - } - }, 1000, 100000); - assertEquals(dataNode.queryDiskBalancerPlan().getResult(), + /** + * Verifies the Plan Execution has been done. + */ + public void verifyPlanExectionDone() throws IOException { + node = cluster.getDataNodes().get(dataNodeIndex); + assertEquals(node.queryDiskBalancerPlan().getResult(), DiskBalancerWorkStatus.Result.PLAN_DONE); + } + /** + * Once diskBalancer is run, all volumes mush has some data. + */ + public void verifyAllVolumesHaveData() throws IOException { + node = cluster.getDataNodes().get(dataNodeIndex); try (FsDatasetSpi.FsVolumeReferences refs = - dataNode.getFSDataset().getFsVolumeReferences()) { - for (FsVolumeSpi vol : refs) { - assertTrue(DiskBalancerTestUtil.getBlockCount(vol) > 0); + node.getFSDataset().getFsVolumeReferences()) { + for (FsVolumeSpi volume : refs) { + assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); + LOG.info(refs.toString() + " : Block Count : {}", + DiskBalancerTestUtil.getBlockCount(volume)); } } - } finally { - cluster.shutdown(); } - } - /** - * Sets alll Disks capacity to size specified. - * - * @param cluster - DiskBalancerCluster - * @param size - new size of the disk - */ - private void setVolumeCapacity(DiskBalancerCluster cluster, long size, - String diskType) { - Preconditions.checkNotNull(cluster); - for (DiskBalancerDataNode node : cluster.getNodes()) { - for (DiskBalancerVolume vol : - node.getVolumeSets().get(diskType).getVolumes()) { - vol.setCapacity(size); + /** + * Verifies that tolerance values are honored correctly. + */ + public void verifyTolerance(NodePlan plan, int planIndex, int + sourceDiskIndex, int tolerance) throws IOException { + // Tolerance + long delta = (plan.getVolumeSetPlans().get(planIndex).getBytesToMove() + * tolerance) / 100; + FsVolumeImpl volume = null; + try (FsDatasetSpi.FsVolumeReferences refs = + node.getFSDataset().getFsVolumeReferences()) { + volume = (FsVolumeImpl) refs.get(sourceDiskIndex); + assertTrue(DiskBalancerTestUtil.getBlockCount(volume) > 0); + + assertTrue( + (DiskBalancerTestUtil.getBlockCount(volume) * + (blockSize + delta)) >= + plan.getVolumeSetPlans().get(0).getBytesToMove()); } - node.getVolumeSets().get(diskType).computeVolumeDataDensity(); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/bee9f57f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java index c362f49..794a887 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/diskbalancer/TestDiskBalancerWithMockMover.java @@ -358,14 +358,13 @@ public class TestDiskBalancerWithMockMover { private AtomicBoolean shouldRun; private FsDatasetSpi dataset; - private Integer runCount; + private int runCount; private volatile boolean sleepInCopyBlocks; private long delay; public TestMover(FsDatasetSpi dataset) { this.dataset = dataset; this.shouldRun = new AtomicBoolean(false); - this.runCount = new Integer(0); } public void setSleep() { @@ -401,7 +400,7 @@ public class TestDiskBalancerWithMockMover { if (delay > 0) { Thread.sleep(delay); } - synchronized (runCount) { + synchronized (this) { if (shouldRun()) { runCount++; } @@ -461,9 +460,9 @@ public class TestDiskBalancerWithMockMover { } public int getRunCount() { - synchronized (runCount) { - LOG.info("Run count : " + runCount.intValue()); - return runCount.intValue(); + synchronized (this) { + LOG.info("Run count : " + runCount); + return runCount; } } } @@ -510,7 +509,7 @@ public class TestDiskBalancerWithMockMover { } } - private class DiskBalancerBuilder { + private static class DiskBalancerBuilder { private TestMover blockMover; private Configuration conf; private String nodeID; @@ -546,7 +545,7 @@ public class TestDiskBalancerWithMockMover { } } - private class DiskBalancerClusterBuilder { + private static class DiskBalancerClusterBuilder { private String jsonFilePath; private Configuration conf; @@ -573,7 +572,7 @@ public class TestDiskBalancerWithMockMover { } } - private class PlanBuilder { + private static class PlanBuilder { private String sourcePath; private String destPath; private String sourceUUID; --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org