Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 0531A200C1E for ; Fri, 17 Feb 2017 15:49:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 03DC0160B73; Fri, 17 Feb 2017 14:49:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 83915160B3F for ; Fri, 17 Feb 2017 15:49:54 +0100 (CET) Received: (qmail 46099 invoked by uid 500); 17 Feb 2017 14:49:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 43763 invoked by uid 99); 17 Feb 2017 14:49:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 17 Feb 2017 14:49:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3AEC8E04F3; Fri, 17 Feb 2017 14:49:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: rakeshr@apache.org To: common-commits@hadoop.apache.org Date: Fri, 17 Feb 2017 14:50:24 -0000 Message-Id: <26d69e133f464ad1a9bf98a93bb36f0c@git.apache.org> In-Reply-To: <05a86cae06e545b3ac1a82a6f00b2924@git.apache.org> References: <05a86cae06e545b3ac1a82a6f00b2924@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [44/50] [abbrv] hadoop git commit: HDFS-11193 : [SPS]: Erasure coded files should be considered for satisfying storage policy. Contributed by Rakesh R archived-at: Fri, 17 Feb 2017 14:49:56 -0000 HDFS-11193 : [SPS]: Erasure coded files should be considered for satisfying storage policy. Contributed by Rakesh R Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3b1b32df Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3b1b32df Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3b1b32df Branch: refs/heads/HDFS-10285 Commit: 3b1b32dffd3bd5ed81ebed365b00d755c20801a8 Parents: 90fdaac Author: Uma Maheswara Rao G Authored: Thu Jan 5 09:30:39 2017 -0800 Committer: Rakesh Radhakrishnan Committed: Fri Feb 17 19:54:25 2017 +0530 ---------------------------------------------------------------------- .../blockmanagement/BlockInfoStriped.java | 10 + .../server/namenode/StoragePolicySatisfier.java | 76 ++- ...stStoragePolicySatisfierWithStripedFile.java | 469 +++++++++++++++++++ 3 files changed, 551 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1b32df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java index 790cd77..8bc63c1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStriped.java @@ -244,6 +244,10 @@ public class BlockInfoStriped extends BlockInfo { return true; } + /** + * This class contains datanode storage information and block index in the + * block group. + */ public static class StorageAndBlockIndex { private final DatanodeStorageInfo storage; private final byte blockIndex; @@ -253,10 +257,16 @@ public class BlockInfoStriped extends BlockInfo { this.blockIndex = blockIndex; } + /** + * @return storage in the datanode. + */ public DatanodeStorageInfo getStorage() { return storage; } + /** + * @return block index in the block group. + */ public byte getBlockIndex() { return blockIndex; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1b32df/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 26e0775..a854bd7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.server.balancer.Matcher; import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped.StorageAndBlockIndex; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo; @@ -43,6 +45,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo; import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.util.Daemon; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -226,8 +229,26 @@ public class StoragePolicySatisfier implements Runnable { for (int i = 0; i < blocks.length; i++) { BlockInfo blockInfo = blocks[i]; - List expectedStorageTypes = existingStoragePolicy + List expectedStorageTypes; + if (blockInfo.isStriped()) { + if (ErasureCodingPolicyManager + .checkStoragePolicySuitableForECStripedMode( + existingStoragePolicyID)) { + expectedStorageTypes = existingStoragePolicy + .chooseStorageTypes((short) blockInfo.getCapacity()); + } else { + // Currently we support only limited policies (HOT, COLD, ALLSSD) + // for EC striped mode files. SPS will ignore to move the blocks if + // the storage policy is not in EC Striped mode supported policies + LOG.warn("The storage policy " + existingStoragePolicy.getName() + + " is not suitable for Striped EC files. " + + "So, ignoring to move the blocks"); + return false; + } + } else { + expectedStorageTypes = existingStoragePolicy .chooseStorageTypes(blockInfo.getReplication()); + } foundMatchingTargetNodesForAllBlocks |= computeBlockMovingInfos( blockMovingInfos, blockInfo, expectedStorageTypes); } @@ -439,12 +460,18 @@ public class StoragePolicySatisfier implements Runnable { if (sourceNodes.size() <= 0) { return blkMovingInfos; } - buildBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes, - targetNodes, targetStorageTypes, blkMovingInfos); + + if (blockInfo.isStriped()) { + buildStripedBlockMovingInfos(blockInfo, sourceNodes, sourceStorageTypes, + targetNodes, targetStorageTypes, blkMovingInfos); + } else { + buildContinuousBlockMovingInfos(blockInfo, sourceNodes, + sourceStorageTypes, targetNodes, targetStorageTypes, blkMovingInfos); + } return blkMovingInfos; } - private void buildBlockMovingInfos(BlockInfo blockInfo, + private void buildContinuousBlockMovingInfos(BlockInfo blockInfo, List sourceNodes, List sourceStorageTypes, List targetNodes, List targetStorageTypes, List blkMovingInfos) { @@ -458,6 +485,47 @@ public class StoragePolicySatisfier implements Runnable { blkMovingInfos.add(blkMovingInfo); } + private void buildStripedBlockMovingInfos(BlockInfo blockInfo, + List sourceNodes, List sourceStorageTypes, + List targetNodes, List targetStorageTypes, + List blkMovingInfos) { + // For a striped block, it needs to construct internal block at the given + // index of a block group. Here it is iterating over all the block indices + // and construct internal blocks which can be then considered for block + // movement. + BlockInfoStriped sBlockInfo = (BlockInfoStriped) blockInfo; + for (StorageAndBlockIndex si : sBlockInfo.getStorageAndIndexInfos()) { + if (si.getBlockIndex() >= 0) { + DatanodeDescriptor dn = si.getStorage().getDatanodeDescriptor(); + DatanodeInfo[] srcNode = new DatanodeInfo[1]; + StorageType[] srcStorageType = new StorageType[1]; + DatanodeInfo[] targetNode = new DatanodeInfo[1]; + StorageType[] targetStorageType = new StorageType[1]; + for (int i = 0; i < sourceNodes.size(); i++) { + DatanodeInfo node = sourceNodes.get(i); + if (node.equals(dn)) { + srcNode[0] = node; + srcStorageType[0] = sourceStorageTypes.get(i); + targetNode[0] = targetNodes.get(i); + targetStorageType[0] = targetStorageTypes.get(i); + + // construct internal block + long blockId = blockInfo.getBlockId() + si.getBlockIndex(); + long numBytes = StripedBlockUtil.getInternalBlockLength( + sBlockInfo.getNumBytes(), sBlockInfo.getCellSize(), + sBlockInfo.getDataBlockNum(), si.getBlockIndex()); + Block blk = new Block(blockId, numBytes, + blockInfo.getGenerationStamp()); + BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blk, srcNode, + targetNode, srcStorageType, targetStorageType); + blkMovingInfos.add(blkMovingInfo); + break; // found matching source-target nodes + } + } + } + } + } + /** * Choose the target storage within same datanode if possible. * http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b1b32df/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java new file mode 100644 index 0000000..5f8639f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java @@ -0,0 +1,469 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.util.concurrent.TimeoutException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.NameNodeProxies; +import org.apache.hadoop.hdfs.StripedFileTestUtil; +import org.apache.hadoop.hdfs.client.HdfsAdmin; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Supplier; + +/** + * Tests that StoragePolicySatisfier daemon is able to check the striped blocks + * to be moved and finding its expected target locations in order to satisfy the + * storage policy. + */ +public class TestStoragePolicySatisfierWithStripedFile { + + private static final Logger LOG = LoggerFactory + .getLogger(TestStoragePolicySatisfierWithStripedFile.class); + + private final int stripesPerBlock = 2; + + private ErasureCodingPolicy ecPolicy; + private int dataBlocks; + private int parityBlocks; + private int cellSize; + private int defaultStripeBlockSize; + + private ErasureCodingPolicy getEcPolicy() { + return ErasureCodingPolicyManager.getSystemDefaultPolicy(); + } + + /** + * Initialize erasure coding policy. + */ + @Before + public void init(){ + ecPolicy = getEcPolicy(); + dataBlocks = ecPolicy.getNumDataUnits(); + parityBlocks = ecPolicy.getNumParityUnits(); + cellSize = ecPolicy.getCellSize(); + defaultStripeBlockSize = cellSize * stripesPerBlock; + } + + /** + * Tests to verify that all the striped blocks(data + parity blocks) are + * moving to satisfy the storage policy. + */ + @Test(timeout = 300000) + public void testMoverWithFullStripe() throws Exception { + // start 10 datanodes + int numOfDatanodes = 10; + int storagesPerDatanode = 2; + long capacity = 20 * defaultStripeBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf, defaultStripeBlockSize); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}) + .storageCapacities(capacities) + .build(); + + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + try { + cluster.waitActive(); + + // set "/bar" directory with HOT storage policy. + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); + // set an EC policy on "/bar" directory + client.setErasureCodingPolicy(barDir, null); + + // write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = cellSize * dataBlocks; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen, (short) 3, 0); + + // verify storage types and locations + LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, + fileLen); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + for (StorageType type : lb.getStorageTypes()) { + Assert.assertEquals(StorageType.DISK, type); + } + } + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // start 5 more datanodes + int numOfNewDatanodes = 5; + capacities = new long[numOfNewDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfNewDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + cluster.startDataNodes(conf, 5, + new StorageType[][]{ + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + true, null, null, null, capacities, null, false, false, false, null); + cluster.triggerHeartbeats(); + + // move file to ARCHIVE + client.setStoragePolicy(barDir, "COLD"); + hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); + LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); + cluster.triggerHeartbeats(); + + waitForBlocksMovementResult(cluster, 1, 60000); + // verify storage types and locations + waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9, + 9, 60000); + } finally { + cluster.shutdown(); + } + } + + /** + * Tests to verify that only few datanodes are available and few striped + * blocks are able to move. Others are still trying to find available nodes. + * + * For example, we have 3 nodes A(disk, disk), B(disk, disk), C(disk, archive) + * + * Assume a block with storage locations A(disk), B(disk), C(disk). Now, set + * policy as COLD and invoked {@link HdfsAdmin#satisfyStoragePolicy(Path)}, + * while choosing the target node for A, it shouldn't choose C. For C, it + * should do local block movement as it has ARCHIVE storage type. + */ + @Test(timeout = 300000) + public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy() + throws Exception { + // start 10 datanodes + int numOfDatanodes = 10; + int storagesPerDatanode = 2; + long capacity = 20 * defaultStripeBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf, defaultStripeBlockSize); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}, + {StorageType.DISK, StorageType.ARCHIVE}}) + .storageCapacities(capacities) + .build(); + + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + try { + cluster.waitActive(); + + // set "/bar" directory with HOT storage policy. + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); + // set an EC policy on "/bar" directory + client.setErasureCodingPolicy(barDir, null); + + // write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = cellSize * dataBlocks; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen, (short) 3, 0); + + // verify storage types and locations + LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, + fileLen); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + for (StorageType type : lb.getStorageTypes()) { + Assert.assertEquals(StorageType.DISK, type); + } + } + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // start 2 more datanodes + int numOfNewDatanodes = 2; + capacities = new long[numOfNewDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfNewDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + cluster.startDataNodes(conf, 2, + new StorageType[][]{ + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + true, null, null, null, capacities, null, false, false, false, null); + cluster.triggerHeartbeats(); + + // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE + // storage type. + client.setStoragePolicy(barDir, "COLD"); + hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); + LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); + cluster.triggerHeartbeats(); + + waitForBlocksMovementResult(cluster, 1, 60000); + waitForAttemptedItems(cluster, 1, 30000); + // verify storage types and locations. + waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5, + 9, 60000); + } finally { + cluster.shutdown(); + } + } + + /** + * Tests to verify that for the given path, no blocks under the given path + * will be scheduled for block movement as there are no available datanode + * with required storage type. + * + * For example, there are two block for a file: + * + * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)], + * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD. + * No datanode is available with storage type ARCHIVE. + * + * SPS won't schedule any block movement for this path. + */ + @Test(timeout = 300000) + public void testWhenNoTargetDatanodeToSatisfyStoragePolicy() + throws Exception { + // start 10 datanodes + int numOfDatanodes = 10; + int storagesPerDatanode = 2; + long capacity = 20 * defaultStripeBlockSize; + long[][] capacities = new long[numOfDatanodes][storagesPerDatanode]; + for (int i = 0; i < numOfDatanodes; i++) { + for (int j = 0; j < storagesPerDatanode; j++) { + capacities[i][j] = capacity; + } + } + + final Configuration conf = new HdfsConfiguration(); + initConfWithStripe(conf, defaultStripeBlockSize); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(numOfDatanodes) + .storagesPerDatanode(storagesPerDatanode) + .storageTypes(new StorageType[][]{ + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}}) + .storageCapacities(capacities) + .build(); + + HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf); + try { + cluster.waitActive(); + + // set "/bar" directory with HOT storage policy. + ClientProtocol client = NameNodeProxies.createProxy(conf, + cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy(); + String barDir = "/bar"; + client.mkdirs(barDir, new FsPermission((short) 777), true); + client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME); + // set an EC policy on "/bar" directory + client.setErasureCodingPolicy(barDir, null); + + // write file to barDir + final String fooFile = "/bar/foo"; + long fileLen = cellSize * dataBlocks; + DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile), + fileLen, (short) 3, 0); + + // verify storage types and locations + LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0, + fileLen); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + for (StorageType type : lb.getStorageTypes()) { + Assert.assertEquals(StorageType.DISK, type); + } + } + StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, + dataBlocks + parityBlocks); + + // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE + // storage type. + client.setStoragePolicy(barDir, "COLD"); + hdfsAdmin.satisfyStoragePolicy(new Path(fooFile)); + LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy"); + cluster.triggerHeartbeats(); + + waitForAttemptedItems(cluster, 1, 30000); + // verify storage types and locations. + waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9, + 60000); + waitForAttemptedItems(cluster, 1, 30000); + } finally { + cluster.shutdown(); + } + } + + private void waitForAttemptedItems(MiniDFSCluster cluster, + long expectedBlkMovAttemptedCount, int timeout) + throws TimeoutException, InterruptedException { + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}", + expectedBlkMovAttemptedCount, + sps.getAttemptedItemsMonitor().getAttemptedItemsCount()); + return sps.getAttemptedItemsMonitor() + .getAttemptedItemsCount() == expectedBlkMovAttemptedCount; + } + }, 100, timeout); + } + + private static void initConfWithStripe(Configuration conf, + int stripeBlockSize) { + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, stripeBlockSize); + conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L); + conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, + 1L); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY, + false); + } + + // Check whether the Block movement has been successfully completed to satisfy + // the storage policy for the given file. + private void waitExpectedStorageType(MiniDFSCluster cluster, + final String fileName, long fileLen, + final StorageType expectedStorageType, int expectedStorageCount, + int expectedBlkLocationCount, int timeout) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + int actualStorageCount = 0; + try { + LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient() + .getLocatedBlocks(fileName, 0, fileLen); + for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + LOG.info("LocatedBlocks => Size {}, locs {}", + lb.getLocations().length, lb); + if (lb.getLocations().length > expectedBlkLocationCount) { + return false; + } + for (StorageType storageType : lb.getStorageTypes()) { + if (expectedStorageType == storageType) { + actualStorageCount++; + } else { + LOG.info("Expected storage type {} and actual {}", + expectedStorageType, storageType); + } + } + } + LOG.info( + expectedStorageType + " replica count, expected={} and actual={}", + expectedStorageCount, actualStorageCount); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + return expectedStorageCount == actualStorageCount; + } + }, 100, timeout); + } + + // Check whether the block movement result has been arrived at the + // Namenode(SPS). + private void waitForBlocksMovementResult(MiniDFSCluster cluster, + long expectedBlkMovResultsCount, int timeout) + throws TimeoutException, InterruptedException { + BlockManager blockManager = cluster.getNamesystem().getBlockManager(); + final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier(); + Assert.assertNotNull("Failed to get SPS object reference!", sps); + + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LOG.info("expectedResultsCount={} actualResultsCount={}", + expectedBlkMovResultsCount, + sps.getAttemptedItemsMonitor().resultsCount()); + return sps.getAttemptedItemsMonitor() + .resultsCount() == expectedBlkMovResultsCount; + } + }, 100, timeout); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org