Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AE069200C81 for ; Thu, 11 May 2017 18:58:47 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id ACB39160BCF; Thu, 11 May 2017 16:58:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B14A9160BCC for ; Thu, 11 May 2017 18:58:45 +0200 (CEST) Received: (qmail 13701 invoked by uid 500); 11 May 2017 16:58:43 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 13371 invoked by uid 99); 11 May 2017 16:58:42 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 May 2017 16:58:42 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B59CAE7DA2; Thu, 11 May 2017 16:58:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: inigoiri@apache.org To: common-commits@hadoop.apache.org Date: Thu, 11 May 2017 16:58:46 -0000 Message-Id: In-Reply-To: <9a45bd10f8a94e45ad39713e44db6c8b@git.apache.org> References: <9a45bd10f8a94e45ad39713e44db6c8b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [05/50] [abbrv] hadoop git commit: HDFS-9807. Add an optional StorageID to writes. Contributed by Ewan Higgs archived-at: Thu, 11 May 2017 16:58:47 -0000 http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java index e7f0228..75baf84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java @@ -81,10 +81,11 @@ class FsVolumeList { return Collections.unmodifiableList(volumes); } - private FsVolumeReference chooseVolume(List list, long blockSize) - throws IOException { + private FsVolumeReference chooseVolume(List list, + long blockSize, String storageId) throws IOException { while (true) { - FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize); + FsVolumeImpl volume = blockChooser.chooseVolume(list, blockSize, + storageId); try { return volume.obtainReference(); } catch (ClosedChannelException e) { @@ -100,18 +101,20 @@ class FsVolumeList { * Get next volume. * * @param blockSize free space needed on the volume - * @param storageType the desired {@link StorageType} + * @param storageType the desired {@link StorageType} + * @param storageId the storage id which may or may not be used by + * the VolumeChoosingPolicy. * @return next volume to store the block in. */ - FsVolumeReference getNextVolume(StorageType storageType, long blockSize) - throws IOException { + FsVolumeReference getNextVolume(StorageType storageType, String storageId, + long blockSize) throws IOException { final List list = new ArrayList<>(volumes.size()); for(FsVolumeImpl v : volumes) { if (v.getStorageType() == storageType) { list.add(v); } } - return chooseVolume(list, blockSize); + return chooseVolume(list, blockSize, storageId); } /** @@ -129,7 +132,7 @@ class FsVolumeList { list.add(v); } } - return chooseVolume(list, blockSize); + return chooseVolume(list, blockSize, null); } long getDfsUsed() throws IOException { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 74cdeae..c98a336 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -1018,7 +1018,8 @@ public class DFSTestUtil { // send the request new Sender(out).transferBlock(b, new Token(), dfsClient.clientName, new DatanodeInfo[]{datanodes[1]}, - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, + new String[0]); out.flush(); return BlockOpResponseProto.parseDelimitedFrom(in); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index b6884da..3a8fb59 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -1448,12 +1448,33 @@ public class TestBlockStoragePolicy { testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, StorageType.ARCHIVE}, new StorageType[]{StorageType.DISK}, false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.DISK, StorageType.SSD}, + new StorageType[]{StorageType.SSD}, + true); + + testStorageTypeCheckAccessResult(new StorageType[]{StorageType.RAM_DISK}, + new StorageType[]{StorageType.DISK}, false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK}, + false); + + testStorageTypeCheckAccessResult( + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}, + new StorageType[]{StorageType.DISK}, + false); + } private void testStorageTypeCheckAccessResult(StorageType[] requested, StorageType[] allowed, boolean expAccess) { try { - BlockTokenSecretManager.checkAccess(requested, allowed); + BlockTokenSecretManager.checkAccess(requested, allowed, "StorageTypes"); if (!expAccess) { fail("No expected access with allowed StorageTypes " + Arrays.toString(allowed) + " and requested StorageTypes " @@ -1467,4 +1488,56 @@ public class TestBlockStoragePolicy { } } } + + @Test + public void testStorageIDCheckAccess() { + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1"}, + new String[]{"DN1-Storage1"}, true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"}, + new String[]{"DN1-Storage1"}, + true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1", "DN2-Storage1"}, + new String[]{"DN1-Storage1", "DN1-Storage2"}, false); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1", "DN1-Storage2"}, + new String[]{"DN1-Storage1"}, true); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage1", "DN1-Storage2"}, + new String[]{"DN2-Storage1"}, false); + + testStorageIDCheckAccessResult( + new String[]{"DN1-Storage2", "DN2-Storage2"}, + new String[]{"DN1-Storage1", "DN2-Storage1"}, false); + + testStorageIDCheckAccessResult(new String[0], new String[0], false); + + testStorageIDCheckAccessResult(new String[0], new String[]{"DN1-Storage1"}, + true); + + testStorageIDCheckAccessResult(new String[]{"DN1-Storage1"}, new String[0], + false); + } + + private void testStorageIDCheckAccessResult(String[] requested, + String[] allowed, boolean expAccess) { + try { + BlockTokenSecretManager.checkAccess(requested, allowed, "StorageIDs"); + if (!expAccess) { + fail("No expected access with allowed StorageIDs" + + Arrays.toString(allowed) + " and requested StorageIDs" + + Arrays.toString(requested)); + } + } catch (SecretManager.InvalidToken e) { + if (expAccess) { + fail("Expected access with allowed StorageIDs " + + Arrays.toString(allowed) + " and requested StorageIDs" + + Arrays.toString(requested)); + } + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index 3f4fe28..7a2ac1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -559,6 +559,7 @@ public class TestDataTransferProtocol { BlockTokenSecretManager.DUMMY_TOKEN, "cl", new DatanodeInfo[1], new StorageType[1], null, stage, 0, block.getNumBytes(), block.getNumBytes(), newGS, - checksum, CachingStrategy.newDefaultStrategy(), false, false, null); + checksum, CachingStrategy.newDefaultStrategy(), false, false, + null, null, new String[0]); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java index 5c1b38f..e159914 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestWriteBlockGetsBlockLengthHint.java @@ -98,11 +98,11 @@ public class TestWriteBlockGetsBlockLengthHint { * correctly propagate the hint to FsDatasetSpi. */ @Override - public synchronized ReplicaHandler createRbw( - StorageType storageType, ExtendedBlock b, boolean allowLazyPersist) + public synchronized ReplicaHandler createRbw(StorageType storageType, + String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH)); - return super.createRbw(storageType, b, allowLazyPersist); + return super.createRbw(storageType, storageId, b, allowLazyPersist); } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java index e98207f..747f295 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/token/block/TestBlockToken.java @@ -151,7 +151,7 @@ public class TestBlockToken { assertTrue("Received BlockTokenIdentifier is wrong", ident.equals(id)); sm.checkAccess(id, null, PBHelperClient.convert(req.getBlock()), BlockTokenIdentifier.AccessMode.WRITE, - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, null); result = id.getBlockId(); } return GetReplicaVisibleLengthResponseProto.newBuilder() @@ -160,11 +160,11 @@ public class TestBlockToken { } private BlockTokenIdentifier generateTokenId(BlockTokenSecretManager sm, - ExtendedBlock block, - EnumSet accessModes, - StorageType... storageTypes) throws IOException { + ExtendedBlock block, EnumSet accessModes, + StorageType[] storageTypes, String[] storageIds) + throws IOException { Token token = sm.generateToken(block, accessModes, - storageTypes); + storageTypes, storageIds); BlockTokenIdentifier id = sm.createIdentifier(); id.readFields(new DataInputStream(new ByteArrayInputStream(token .getIdentifier()))); @@ -178,29 +178,28 @@ public class TestBlockToken { enableProtobuf); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block1, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block2, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.DEFAULT)); + new StorageType[]{StorageType.DEFAULT}, null)); // We must be backwards compatible when adding storageType TestWritable.testWritable(generateTokenId(sm, block3, - EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - (StorageType[]) null)); + EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), null, null)); TestWritable.testWritable(generateTokenId(sm, block3, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.EMPTY_ARRAY)); + StorageType.EMPTY_ARRAY, null)); } @Test @@ -215,35 +214,36 @@ public class TestBlockToken { private static void checkAccess(BlockTokenSecretManager m, Token t, ExtendedBlock blk, - BlockTokenIdentifier.AccessMode mode) throws SecretManager.InvalidToken { - m.checkAccess(t, null, blk, mode, new StorageType[]{ StorageType.DEFAULT }); + BlockTokenIdentifier.AccessMode mode, StorageType[] storageTypes, + String[] storageIds) throws SecretManager.InvalidToken { + m.checkAccess(t, null, blk, mode, storageTypes, storageIds); } private void tokenGenerationAndVerification(BlockTokenSecretManager master, - BlockTokenSecretManager slave, StorageType... storageTypes) - throws Exception { + BlockTokenSecretManager slave, StorageType[] storageTypes, + String[] storageIds) throws Exception { // single-mode tokens for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode .values()) { // generated by master Token token1 = master.generateToken(block1, - EnumSet.of(mode), storageTypes); - checkAccess(master, token1, block1, mode); - checkAccess(slave, token1, block1, mode); + EnumSet.of(mode), storageTypes, storageIds); + checkAccess(master, token1, block1, mode, storageTypes, storageIds); + checkAccess(slave, token1, block1, mode, storageTypes, storageIds); // generated by slave Token token2 = slave.generateToken(block2, - EnumSet.of(mode), storageTypes); - checkAccess(master, token2, block2, mode); - checkAccess(slave, token2, block2, mode); + EnumSet.of(mode), storageTypes, storageIds); + checkAccess(master, token2, block2, mode, storageTypes, storageIds); + checkAccess(slave, token2, block2, mode, storageTypes, storageIds); } // multi-mode tokens Token mtoken = master.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - storageTypes); + storageTypes, storageIds); for (BlockTokenIdentifier.AccessMode mode : BlockTokenIdentifier.AccessMode .values()) { - checkAccess(master, mtoken, block3, mode); - checkAccess(slave, mtoken, block3, mode); + checkAccess(master, mtoken, block3, mode, storageTypes, storageIds); + checkAccess(slave, mtoken, block3, mode, storageTypes, storageIds); } } @@ -259,18 +259,18 @@ public class TestBlockToken { ExportedBlockKeys keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); // key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); keys = masterHandler.exportKeys(); slaveHandler.addKeys(keys); tokenGenerationAndVerification(masterHandler, slaveHandler, - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, slaveHandler, null); + new StorageType[]{StorageType.DEFAULT}, null); + tokenGenerationAndVerification(masterHandler, slaveHandler, null, null); } @Test @@ -315,7 +315,7 @@ public class TestBlockToken { enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final Server server = createMockDatanode(sm, token, conf); @@ -365,7 +365,7 @@ public class TestBlockToken { enableProtobuf); Token token = sm.generateToken(block3, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final Server server = createMockDatanode(sm, token, conf); server.start(); @@ -451,19 +451,23 @@ public class TestBlockToken { ExportedBlockKeys keys = masterHandler.exportKeys(); bpMgr.addKeys(bpid, keys); + String[] storageIds = new String[] {"DS-9001"}; tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, storageIds); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); // Test key updating masterHandler.updateKeys(); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, storageIds); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); keys = masterHandler.exportKeys(); bpMgr.addKeys(bpid, keys); tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), - StorageType.DEFAULT); - tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null); + new StorageType[]{StorageType.DEFAULT}, new String[]{"DS-9001"}); + tokenGenerationAndVerification(masterHandler, bpMgr.get(bpid), null, + null); } } @@ -540,7 +544,7 @@ public class TestBlockToken { useProto); Token token = sm.generateToken(block1, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DEFAULT}); + new StorageType[]{StorageType.DEFAULT}, new String[0]); final byte[] tokenBytes = token.getIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); @@ -605,7 +609,7 @@ public class TestBlockToken { useProto); Token token = sm.generateToken(block1, EnumSet.noneOf(BlockTokenIdentifier.AccessMode.class), - StorageType.EMPTY_ARRAY); + StorageType.EMPTY_ARRAY, new String[0]); final byte[] tokenBytes = token.getIdentifier(); BlockTokenIdentifier legacyToken = new BlockTokenIdentifier(); BlockTokenIdentifier protobufToken = new BlockTokenIdentifier(); @@ -699,7 +703,8 @@ public class TestBlockToken { */ BlockTokenIdentifier identifier = new BlockTokenIdentifier("user", "blockpool", 123, EnumSet.allOf(BlockTokenIdentifier.AccessMode.class), - new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, true); + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new String[] {"fake-storage-id"}, true); Calendar cal = new GregorianCalendar(); cal.set(2017, 1, 9, 0, 12, 35); long datetime = cal.getTimeInMillis(); @@ -749,7 +754,8 @@ public class TestBlockToken { new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, StorageType.DISK, StorageType.ARCHIVE}; BlockTokenIdentifier ident = new BlockTokenIdentifier("user", "bpool", - 123, accessModes, storageTypes, useProto); + 123, accessModes, storageTypes, new String[] {"fake-storage-id"}, + useProto); ident.setExpiryDate(1487080345L); BlockTokenIdentifier ret = writeAndReadBlockToken(ident); assertEquals(ret.getExpiryDate(), 1487080345L); @@ -760,6 +766,7 @@ public class TestBlockToken { assertEquals(ret.getAccessModes(), EnumSet.allOf(BlockTokenIdentifier.AccessMode.class)); assertArrayEquals(ret.getStorageTypes(), storageTypes); + assertArrayEquals(ret.getStorageIds(), new String[] {"fake-storage-id"}); } @Test @@ -767,5 +774,4 @@ public class TestBlockToken { testBlockTokenSerialization(false); testBlockTokenSerialization(true); } - } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java index 6810a0b..c9ff572 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java @@ -389,7 +389,7 @@ public abstract class BlockReportTestBase { // Create a bogus new block which will not be present on the namenode. ExtendedBlock b = new ExtendedBlock( poolId, rand.nextLong(), 1024L, rand.nextLong()); - dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false); + dn.getFSDataset().createRbw(StorageType.DEFAULT, null, b, false); DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index cd3befd..18b4922 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -1023,21 +1023,22 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override // FsDatasetSpi public synchronized ReplicaHandler createRbw( - StorageType storageType, ExtendedBlock b, + StorageType storageType, String storageId, ExtendedBlock b, boolean allowLazyPersist) throws IOException { - return createTemporary(storageType, b); + return createTemporary(storageType, storageId, b); } @Override // FsDatasetSpi public synchronized ReplicaHandler createTemporary( - StorageType storageType, ExtendedBlock b) throws IOException { + StorageType storageType, String storageId, ExtendedBlock b) + throws IOException { if (isValidBlock(b)) { - throw new ReplicaAlreadyExistsException("Block " + b + - " is valid, and cannot be written to."); - } + throw new ReplicaAlreadyExistsException("Block " + b + + " is valid, and cannot be written to."); + } if (isValidRbw(b)) { - throw new ReplicaAlreadyExistsException("Block " + b + - " is being written, and cannot be written to."); + throw new ReplicaAlreadyExistsException("Block " + b + + " is being written, and cannot be written to."); } final Map map = getMap(b.getBlockPoolId()); BInfo binfo = new BInfo(b.getBlockPoolId(), b.getLocalBlock(), true); @@ -1419,7 +1420,7 @@ public class SimulatedFSDataset implements FsDatasetSpi { @Override public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, - StorageType targetStorageType) throws IOException { + StorageType targetStorageType, String storageId) throws IOException { // TODO Auto-generated method stub return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 579252b..311d5a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -647,7 +647,7 @@ public class TestBlockRecovery { if(LOG.isDebugEnabled()) { LOG.debug("Running " + GenericTestUtils.getMethodName()); } - dn.data.createRbw(StorageType.DEFAULT, block, false); + dn.data.createRbw(StorageType.DEFAULT, null, block, false); BlockRecoveryWorker.RecoveryTaskContiguous RecoveryTaskContiguous = recoveryWorker.new RecoveryTaskContiguous(rBlock); try { @@ -673,7 +673,7 @@ public class TestBlockRecovery { LOG.debug("Running " + GenericTestUtils.getMethodName()); } ReplicaInPipeline replicaInfo = dn.data.createRbw( - StorageType.DEFAULT, block, false).getReplica(); + StorageType.DEFAULT, null, block, false).getReplica(); ReplicaOutputStreams streams = null; try { streams = replicaInfo.createStreams(true, @@ -972,7 +972,7 @@ public class TestBlockRecovery { // Register this thread as the writer for the recoveringBlock. LOG.debug("slowWriter creating rbw"); ReplicaHandler replicaHandler = - spyDN.data.createRbw(StorageType.DISK, block, false); + spyDN.data.createRbw(StorageType.DISK, null, block, false); replicaHandler.close(); LOG.debug("slowWriter created rbw"); // Tell the parent thread to start progressing. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java index f811bd8..8992d47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReplacement.java @@ -394,7 +394,7 @@ public class TestBlockReplacement { DataOutputStream out = new DataOutputStream(sock.getOutputStream()); new Sender(out).replaceBlock(block, targetStorageType, BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(), - sourceProxy); + sourceProxy, null); out.flush(); // receiveResponse DataInputStream reply = new DataInputStream(sock.getInputStream()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java index b2bfe49..8fda664 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataXceiverLazyPersistHint.java @@ -129,7 +129,7 @@ public class TestDataXceiverLazyPersistHint { DataChecksum.newDataChecksum(DataChecksum.Type.NULL, 0), CachingStrategy.newDefaultStrategy(), lazyPersist, - false, null); + false, null, null, new String[0]); } // Helper functions to setup the mock objects. @@ -151,7 +151,7 @@ public class TestDataXceiverLazyPersistHint { any(BlockConstructionStage.class), anyLong(), anyLong(), anyLong(), anyString(), any(DatanodeInfo.class), any(DataNode.class), any(DataChecksum.class), any(CachingStrategy.class), - captor.capture(), anyBoolean()); + captor.capture(), anyBoolean(), any(String.class)); doReturn(mock(DataOutputStream.class)).when(xceiverSpy) .getBufferedOutputStream(); return xceiverSpy; http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java index cd86720..38e4287 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java @@ -167,7 +167,8 @@ public class TestDiskError { BlockTokenSecretManager.DUMMY_TOKEN, "", new DatanodeInfo[0], new StorageType[0], null, BlockConstructionStage.PIPELINE_SETUP_CREATE, 1, 0L, 0L, 0L, - checksum, CachingStrategy.newDefaultStrategy(), false, false, null); + checksum, CachingStrategy.newDefaultStrategy(), false, false, + null, null, new String[0]); out.flush(); // close the connection before sending the content of the block @@ -274,7 +275,7 @@ public class TestDiskError { dn1.getDatanodeId()); dn0.transferBlock(block, new DatanodeInfo[]{dnd1}, - new StorageType[]{StorageType.DISK}); + new StorageType[]{StorageType.DISK}, new String[0]); // Sleep for 1 second so the DataTrasnfer daemon can start transfer. try { Thread.sleep(1000); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 4e724bc7..2e69595 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -81,7 +81,7 @@ public class TestSimulatedFSDataset { // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipeline bInfo = fsdataset.createRbw( - StorageType.DEFAULT, b, false).getReplica(); + StorageType.DEFAULT, null, b, false).getReplica(); ReplicaOutputStreams out = bInfo.createStreams(true, DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512)); try { @@ -368,7 +368,7 @@ public class TestSimulatedFSDataset { ExtendedBlock block = new ExtendedBlock(newbpid,1); try { // it will throw an exception if the block pool is not found - fsdataset.createTemporary(StorageType.DEFAULT, block); + fsdataset.createTemporary(StorageType.DEFAULT, null, block); } catch (IOException ioe) { // JUnit does not capture exception in non-main thread, // so cache it and then let main thread throw later. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java index 62ef731..2e439d6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java @@ -138,14 +138,15 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public ReplicaHandler createTemporary(StorageType t, ExtendedBlock b) + public ReplicaHandler createTemporary(StorageType t, String i, + ExtendedBlock b) throws IOException { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } @Override - public ReplicaHandler createRbw(StorageType t, ExtendedBlock b, boolean tf) - throws IOException { + public ReplicaHandler createRbw(StorageType storageType, String id, + ExtendedBlock b, boolean tf) throws IOException { return new ReplicaHandler(new ExternalReplicaInPipeline(), null); } @@ -332,7 +333,8 @@ public class ExternalDatasetImpl implements FsDatasetSpi { } @Override - public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, StorageType targetStorageType) throws IOException { + public ReplicaInfo moveBlockAcrossStorage(ExtendedBlock block, + StorageType targetStorageType, String storageId) throws IOException { return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java index 9414a0e..24a43e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestAvailableSpaceVolumeChoosingPolicy.java @@ -89,10 +89,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy { // than the threshold of 1MB. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); - - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -115,21 +117,29 @@ public class TestAvailableSpaceVolumeChoosingPolicy { // Third volume, again with 3MB free space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(2).getAvailable()).thenReturn(1024L * 1024L * 3); - + // We should alternate assigning between the two volumes with a lot of free // space. initPolicy(policy, 1.0f); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); // All writes should be assigned to the volume with the least free space. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -156,22 +166,30 @@ public class TestAvailableSpaceVolumeChoosingPolicy { // Fourth volume, again with 3MB free space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(3).getAvailable()).thenReturn(1024L * 1024L * 3); - + // We should alternate assigning between the two volumes with a lot of free // space. initPolicy(policy, 1.0f); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(2), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(3), policy.chooseVolume(volumes, 100, + null)); // We should alternate assigning between the two volumes with less free // space. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 100, + null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100, + null)); } @Test(timeout=60000) @@ -190,13 +208,14 @@ public class TestAvailableSpaceVolumeChoosingPolicy { // than the threshold of 1MB. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(1024L * 1024L * 3); - + // All writes should be assigned to the volume with the least free space. // However, if the volume with the least free space doesn't have enough // space to accept the replica size, and another volume does have enough // free space, that should be chosen instead. initPolicy(policy, 0.0f); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 1024L * 1024L * 2)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, + 1024L * 1024L * 2, null)); } @Test(timeout=60000) @@ -220,10 +239,11 @@ public class TestAvailableSpaceVolumeChoosingPolicy { .thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 3) .thenReturn(1024L * 1024L * 1); // After the third check, return 1MB. - + // Should still be able to get a volume for the replica even though the // available space on the second volume changed. - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 100)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, + 100, null)); } @Test(timeout=60000) @@ -271,12 +291,12 @@ public class TestAvailableSpaceVolumeChoosingPolicy { Mockito.when(volume.getAvailable()).thenReturn(1024L * 1024L * 3); volumes.add(volume); } - + initPolicy(policy, preferencePercent); long lowAvailableSpaceVolumeSelected = 0; long highAvailableSpaceVolumeSelected = 0; for (int i = 0; i < RANDOMIZED_ITERATIONS; i++) { - FsVolumeSpi volume = policy.chooseVolume(volumes, 100); + FsVolumeSpi volume = policy.chooseVolume(volumes, 100, null); for (int j = 0; j < volumes.size(); j++) { // Note how many times the first low available volume was selected if (volume == volumes.get(j) && j == 0) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java index 9b3047f..44e2a30 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/TestRoundRobinVolumeChoosingPolicy.java @@ -50,20 +50,21 @@ public class TestRoundRobinVolumeChoosingPolicy { // Second volume, with 200 bytes of space. volumes.add(Mockito.mock(FsVolumeSpi.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); - + // Test two rounds of round-robin choosing - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0, null)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 0, null)); // The first volume has only 100L space, so the policy should // wisely choose the second one in case we ask for more. - Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150)); + Assert.assertEquals(volumes.get(1), policy.chooseVolume(volumes, 150, + null)); // Fail if no volume can be chosen? try { - policy.chooseVolume(volumes, Long.MAX_VALUE); + policy.chooseVolume(volumes, Long.MAX_VALUE, null); Assert.fail(); } catch (IOException e) { // Passed. @@ -93,7 +94,7 @@ public class TestRoundRobinVolumeChoosingPolicy { int blockSize = 700; try { - policy.chooseVolume(volumes, blockSize); + policy.chooseVolume(volumes, blockSize, null); Assert.fail("expected to throw DiskOutOfSpaceException"); } catch(DiskOutOfSpaceException e) { Assert.assertEquals("Not returnig the expected message", @@ -137,21 +138,21 @@ public class TestRoundRobinVolumeChoosingPolicy { Mockito.when(ssdVolumes.get(1).getAvailable()).thenReturn(100L); Assert.assertEquals(diskVolumes.get(0), - policy.chooseVolume(diskVolumes, 0)); + policy.chooseVolume(diskVolumes, 0, null)); // Independent Round-Robin for different storage type Assert.assertEquals(ssdVolumes.get(0), - policy.chooseVolume(ssdVolumes, 0)); + policy.chooseVolume(ssdVolumes, 0, null)); // Take block size into consideration Assert.assertEquals(ssdVolumes.get(0), - policy.chooseVolume(ssdVolumes, 150L)); + policy.chooseVolume(ssdVolumes, 150L, null)); Assert.assertEquals(diskVolumes.get(1), - policy.chooseVolume(diskVolumes, 0)); + policy.chooseVolume(diskVolumes, 0, null)); Assert.assertEquals(diskVolumes.get(0), - policy.chooseVolume(diskVolumes, 50L)); + policy.chooseVolume(diskVolumes, 50L, null)); try { - policy.chooseVolume(diskVolumes, 200L); + policy.chooseVolume(diskVolumes, 200L, null); Assert.fail("Should throw an DiskOutOfSpaceException before this!"); } catch (DiskOutOfSpaceException e) { // Pass. http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java index 905c3f0..3293561 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java @@ -259,7 +259,7 @@ public class TestFsDatasetImpl { String bpid = BLOCK_POOL_IDS[NUM_BLOCKS % BLOCK_POOL_IDS.length]; ExtendedBlock eb = new ExtendedBlock(bpid, i); try (ReplicaHandler replica = - dataset.createRbw(StorageType.DEFAULT, eb, false)) { + dataset.createRbw(StorageType.DEFAULT, null, eb, false)) { } } final String[] dataDirs = @@ -566,7 +566,7 @@ public class TestFsDatasetImpl { class ResponderThread extends Thread { public void run() { try (ReplicaHandler replica = dataset - .createRbw(StorageType.DEFAULT, eb, false)) { + .createRbw(StorageType.DEFAULT, null, eb, false)) { LOG.info("CreateRbw finished"); startFinalizeLatch.countDown(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java index 83c15ca..ee3a79f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsVolumeList.java @@ -101,7 +101,7 @@ public class TestFsVolumeList { } for (int i = 0; i < 10; i++) { try (FsVolumeReference ref = - volumeList.getNextVolume(StorageType.DEFAULT, 128)) { + volumeList.getNextVolume(StorageType.DEFAULT, null, 128)) { // volume No.2 will not be chosen. assertNotEquals(ref.getVolume(), volumes.get(1)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java index da53cae..11525ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java @@ -353,7 +353,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[FINALIZED], false); Assert.fail("Should not have created a replica that's already " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { @@ -371,7 +371,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[TEMPORARY], false); Assert.fail("Should not have created a replica that had created as " + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { @@ -381,7 +381,7 @@ public class TestWriteToReplica { 0L, blocks[RBW].getNumBytes()); // expect to be successful try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RBW], false); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { @@ -397,7 +397,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RWR], false); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { @@ -413,7 +413,7 @@ public class TestWriteToReplica { } try { - dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[RUR], false); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { @@ -430,49 +430,49 @@ public class TestWriteToReplica { e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA)); } - dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false); + dataSet.createRbw(StorageType.DEFAULT, null, blocks[NON_EXISTENT], false); } private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException { try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[FINALIZED]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[FINALIZED]); Assert.fail("Should not have created a temporary replica that was " + "finalized " + blocks[FINALIZED]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[TEMPORARY]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[TEMPORARY]); Assert.fail("Should not have created a replica that had created as" + "temporary " + blocks[TEMPORARY]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RBW]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RBW]); Assert.fail("Should not have created a replica that had created as RBW " + blocks[RBW]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RWR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RWR]); Assert.fail("Should not have created a replica that was waiting to be " + "recovered " + blocks[RWR]); } catch (ReplicaAlreadyExistsException e) { } try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[RUR]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[RUR]); Assert.fail("Should not have created a replica that was under recovery " + blocks[RUR]); } catch (ReplicaAlreadyExistsException e) { } - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); try { - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]); + dataSet.createTemporary(StorageType.DEFAULT, null, blocks[NON_EXISTENT]); Assert.fail("Should not have created a replica that had already been " + "created " + blocks[NON_EXISTENT]); } catch (Exception e) { @@ -485,7 +485,8 @@ public class TestWriteToReplica { blocks[NON_EXISTENT].setGenerationStamp(newGenStamp); try { ReplicaInPipeline replicaInfo = - dataSet.createTemporary(StorageType.DEFAULT, blocks[NON_EXISTENT]).getReplica(); + dataSet.createTemporary(StorageType.DEFAULT, null, + blocks[NON_EXISTENT]).getReplica(); Assert.assertTrue(replicaInfo.getGenerationStamp() == newGenStamp); Assert.assertTrue( replicaInfo.getBlockId() == blocks[NON_EXISTENT].getBlockId()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/a3954cca/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java new file mode 100644 index 0000000..e0f7426 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestNamenodeStorageDirectives.java @@ -0,0 +1,330 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.*; +import org.apache.hadoop.hdfs.protocol.*; +import org.apache.hadoop.hdfs.server.blockmanagement.*; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy; +import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy; +import org.apache.hadoop.net.Node; +import org.junit.After; +import org.junit.Test; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +/** + * Test to ensure that the StorageType and StorageID sent from Namenode + * to DFSClient are respected. + */ +public class TestNamenodeStorageDirectives { + public static final Logger LOG = + LoggerFactory.getLogger(TestNamenodeStorageDirectives.class); + + private static final int BLOCK_SIZE = 512; + + private MiniDFSCluster cluster; + + @After + public void tearDown() { + shutdown(); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode, StorageType[][] storageTypes) + throws IOException { + startDFSCluster(numNameNodes, numDataNodes, storagePerDataNode, + storageTypes, RoundRobinVolumeChoosingPolicy.class, + BlockPlacementPolicyDefault.class); + } + + private void startDFSCluster(int numNameNodes, int numDataNodes, + int storagePerDataNode, StorageType[][] storageTypes, + Class volumeChoosingPolicy, + Class blockPlacementPolicy) throws + IOException { + shutdown(); + Configuration conf = new Configuration(); + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); + + /* + * Lower the DN heartbeat, DF rate, and recheck interval to one second + * so state about failures and datanode death propagates faster. + */ + conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_DF_INTERVAL_KEY, 1000); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, + 1000); + /* Allow 1 volume failure */ + conf.setInt(DFSConfigKeys.DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY, 1); + conf.setTimeDuration(DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY, + 0, TimeUnit.MILLISECONDS); + conf.setClass( + DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY, + volumeChoosingPolicy, VolumeChoosingPolicy.class); + conf.setClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + blockPlacementPolicy, BlockPlacementPolicy.class); + + MiniDFSNNTopology nnTopology = + MiniDFSNNTopology.simpleFederatedTopology(numNameNodes); + + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(nnTopology) + .numDataNodes(numDataNodes) + .storagesPerDatanode(storagePerDataNode) + .storageTypes(storageTypes) + .build(); + cluster.waitActive(); + } + + private void shutdown() { + if (cluster != null) { + cluster.shutdown(); + cluster = null; + } + } + + private void createFile(Path path, int numBlocks, short replicateFactor) + throws IOException, InterruptedException, TimeoutException { + createFile(0, path, numBlocks, replicateFactor); + } + + private void createFile(int fsIdx, Path path, int numBlocks, + short replicateFactor) + throws IOException, TimeoutException, InterruptedException { + final int seed = 0; + final DistributedFileSystem fs = cluster.getFileSystem(fsIdx); + DFSTestUtil.createFile(fs, path, BLOCK_SIZE * numBlocks, + replicateFactor, seed); + DFSTestUtil.waitReplication(fs, path, replicateFactor); + } + + private boolean verifyFileReplicasOnStorageType(Path path, int numBlocks, + StorageType storageType) throws IOException { + MiniDFSCluster.NameNodeInfo info = cluster.getNameNodeInfos()[0]; + InetSocketAddress addr = info.nameNode.getServiceRpcAddress(); + assert addr.getPort() != 0; + DFSClient client = new DFSClient(addr, cluster.getConfiguration(0)); + + FileSystem fs = cluster.getFileSystem(); + + if (!fs.exists(path)) { + LOG.info("verifyFileReplicasOnStorageType: file {} does not exist", path); + return false; + } + long fileLength = client.getFileInfo(path.toString()).getLen(); + int foundBlocks = 0; + LocatedBlocks locatedBlocks = + client.getLocatedBlocks(path.toString(), 0, fileLength); + for (LocatedBlock locatedBlock : locatedBlocks.getLocatedBlocks()) { + for (StorageType st : locatedBlock.getStorageTypes()) { + if (st == storageType) { + foundBlocks++; + } + } + } + + LOG.info("Found {}/{} blocks on StorageType {}", + foundBlocks, numBlocks, storageType); + final boolean isValid = foundBlocks >= numBlocks; + return isValid; + } + + private void testStorageTypes(StorageType[][] storageTypes, + String storagePolicy, StorageType[] expectedStorageTypes, + StorageType[] unexpectedStorageTypes) throws ReconfigurationException, + InterruptedException, TimeoutException, IOException { + final int numDataNodes = storageTypes.length; + final int storagePerDataNode = storageTypes[0].length; + startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes); + cluster.getFileSystem(0).setStoragePolicy(new Path("/"), storagePolicy); + Path testFile = new Path("/test"); + final short replFactor = 2; + final int numBlocks = 10; + createFile(testFile, numBlocks, replFactor); + + for (StorageType storageType: expectedStorageTypes) { + assertTrue(verifyFileReplicasOnStorageType(testFile, numBlocks, + storageType)); + } + + for (StorageType storageType: unexpectedStorageTypes) { + assertFalse(verifyFileReplicasOnStorageType(testFile, numBlocks, + storageType)); + } + } + + /** + * Verify that writing to SSD and DISK will write to the correct Storage + * Types. + * @throws IOException + */ + @Test(timeout=60000) + public void testTargetStorageTypes() throws ReconfigurationException, + InterruptedException, TimeoutException, IOException { + // DISK and not anything else. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "ONE_SSD", + new StorageType[]{StorageType.SSD, StorageType.DISK}, + new StorageType[]{StorageType.RAM_DISK, StorageType.ARCHIVE}); + // only on SSD. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "ALL_SSD", + new StorageType[]{StorageType.SSD}, + new StorageType[]{StorageType.RAM_DISK, StorageType.DISK, + StorageType.ARCHIVE}); + // only on SSD. + testStorageTypes(new StorageType[][]{ + {StorageType.SSD, StorageType.DISK, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK, StorageType.DISK}}, + "ALL_SSD", + new StorageType[]{StorageType.SSD}, + new StorageType[]{StorageType.RAM_DISK, StorageType.DISK, + StorageType.ARCHIVE}); + + // DISK and not anything else. + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "HOT", + new StorageType[]{StorageType.DISK}, + new StorageType[] {StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}); + + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + "WARM", + new StorageType[]{StorageType.DISK, StorageType.ARCHIVE}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD}); + + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}, + "COLD", + new StorageType[]{StorageType.ARCHIVE}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.DISK}); + + // We wait for Lasy Persist to write to disk. + testStorageTypes(new StorageType[][] { + {StorageType.RAM_DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.DISK}, + {StorageType.SSD, StorageType.DISK}}, + "LAZY_PERSIST", + new StorageType[]{StorageType.DISK}, + new StorageType[]{StorageType.RAM_DISK, StorageType.SSD, + StorageType.ARCHIVE}); + } + + /** + * A VolumeChoosingPolicy test stub used to verify that the storageId passed + * in is indeed in the list of volumes. + * @param + */ + private static class TestVolumeChoosingPolicy + extends RoundRobinVolumeChoosingPolicy { + static String expectedStorageId; + + @Override + public V chooseVolume(List volumes, long replicaSize, String storageId) + throws IOException { + assertEquals(expectedStorageId, storageId); + return super.chooseVolume(volumes, replicaSize, storageId); + } + } + + private static class TestBlockPlacementPolicy + extends BlockPlacementPolicyDefault { + static DatanodeStorageInfo[] dnStorageInfosToReturn; + + @Override + public DatanodeStorageInfo[] chooseTarget(String srcPath, int numOfReplicas, + Node writer, List chosenNodes, + boolean returnChosenNodes, Set excludedNodes, long blocksize, + final BlockStoragePolicy storagePolicy, EnumSet flags) { + return dnStorageInfosToReturn; + } + } + + private DatanodeStorageInfo getDatanodeStorageInfo(int dnIndex) + throws UnregisteredNodeException { + if (cluster == null) { + return null; + } + DatanodeID dnId = cluster.getDataNodes().get(dnIndex).getDatanodeId(); + DatanodeManager dnManager = cluster.getNamesystem() + .getBlockManager().getDatanodeManager(); + return dnManager.getDatanode(dnId).getStorageInfos()[0]; + } + + @Test(timeout=60000) + public void testStorageIDBlockPlacementSpecific() + throws ReconfigurationException, InterruptedException, TimeoutException, + IOException { + final StorageType[][] storageTypes = { + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + {StorageType.DISK, StorageType.DISK}, + }; + final int numDataNodes = storageTypes.length; + final int storagePerDataNode = storageTypes[0].length; + startDFSCluster(1, numDataNodes, storagePerDataNode, storageTypes, + TestVolumeChoosingPolicy.class, TestBlockPlacementPolicy.class); + Path testFile = new Path("/test"); + final short replFactor = 1; + final int numBlocks = 10; + DatanodeStorageInfo dnInfoToUse = getDatanodeStorageInfo(0); + TestBlockPlacementPolicy.dnStorageInfosToReturn = + new DatanodeStorageInfo[] {dnInfoToUse}; + TestVolumeChoosingPolicy.expectedStorageId = dnInfoToUse.getStorageID(); + //file creation invokes both BlockPlacementPolicy and VolumeChoosingPolicy, + //and will test that the storage ids match + createFile(testFile, numBlocks, replFactor); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org