Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7F6C6200BFB for ; Wed, 11 Jan 2017 22:49:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7DDE6160B4E; Wed, 11 Jan 2017 21:49:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CFE5A160B3B for ; Wed, 11 Jan 2017 22:49:30 +0100 (CET) Received: (qmail 90459 invoked by uid 500); 11 Jan 2017 21:49:29 -0000 Mailing-List: contact common-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list common-commits@hadoop.apache.org Received: (qmail 90450 invoked by uid 99); 11 Jan 2017 21:49:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 11 Jan 2017 21:49:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id AB5A7DFAEB; Wed, 11 Jan 2017 21:49:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: umamahesh@apache.org To: common-commits@hadoop.apache.org Message-Id: <4eccec6091ae4a14a0d2c82f324fdd31@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hadoop git commit: HDFS-11150: [SPS]: Provide persistence when satisfying storage policy. Contributed by Yuanbo Liu Date: Wed, 11 Jan 2017 21:49:29 +0000 (UTC) archived-at: Wed, 11 Jan 2017 21:49:32 -0000 Repository: hadoop Updated Branches: refs/heads/HDFS-10285 402cf3513 -> d5ae4ed4f HDFS-11150: [SPS]: Provide persistence when satisfying storage policy. Contributed by Yuanbo Liu Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5ae4ed4 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5ae4ed4 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5ae4ed4 Branch: refs/heads/HDFS-10285 Commit: d5ae4ed4fe305090852a2360cf431fe89727e527 Parents: 402cf35 Author: Uma Maheswara Rao G Authored: Wed Jan 11 13:48:58 2017 -0800 Committer: Uma Maheswara Rao G Committed: Wed Jan 11 13:48:58 2017 -0800 ---------------------------------------------------------------------- .../hadoop/hdfs/protocol/ClientProtocol.java | 2 +- .../hdfs/server/common/HdfsServerConstants.java | 3 + .../hdfs/server/namenode/FSDirAttrOp.java | 81 +++-- .../hdfs/server/namenode/FSDirXAttrOp.java | 8 + .../hdfs/server/namenode/FSDirectory.java | 14 + .../hdfs/server/namenode/FSNamesystem.java | 6 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 13 +- .../server/namenode/StoragePolicySatisfier.java | 22 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 37 +++ .../TestPersistentStoragePolicySatisfier.java | 311 +++++++++++++++++++ .../namenode/TestStoragePolicySatisfier.java | 112 +++---- 11 files changed, 519 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 764ba52..44634a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -1558,7 +1558,7 @@ public interface ClientProtocol { * @throws org.apache.hadoop.hdfs.server.namenode.SafeModeException append not * allowed in safemode. */ - @Idempotent + @AtMostOnce void satisfyStoragePolicy(String path) throws IOException; /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java index d1f1d82..b028533 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java @@ -372,6 +372,9 @@ public interface HdfsServerConstants { String XATTR_ERASURECODING_POLICY = "system.hdfs.erasurecoding.policy"; + String XATTR_SATISFY_STORAGE_POLICY = + "system.hdfs.satisfy.storage.policy"; + Path MOVER_ID_PATH = new Path("/system/mover.id"); long BLOCK_GROUP_INDEX_MASK = 15; http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index 9fa5b8c..fa7bb61 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -26,6 +26,7 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; @@ -42,12 +43,14 @@ import com.google.common.collect.Lists; import java.io.FileNotFoundException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; public class FSDirAttrOp { static HdfsFileStatus setPermission( @@ -197,10 +200,11 @@ public class FSDirAttrOp { return fsd.getAuditFileInfo(iip); } - static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, - String src) throws IOException { + static HdfsFileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm, + String src, boolean logRetryCache) throws IOException { FSPermissionChecker pc = fsd.getPermissionChecker(); + List xAttrs = Lists.newArrayListWithCapacity(1); INodesInPath iip; fsd.writeLock(); try { @@ -210,10 +214,13 @@ public class FSDirAttrOp { if (fsd.isPermissionEnabled()) { fsd.checkPathAccess(pc, iip, FsAction.WRITE); } - unprotectedSatisfyStoragePolicy(bm, iip); + XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd); + xAttrs.add(satisfyXAttr); } finally { fsd.writeUnlock(); } + fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); + return fsd.getAuditFileInfo(iip); } static BlockStoragePolicy[] getStoragePolicies(BlockManager bm) @@ -477,33 +484,61 @@ public class FSDirAttrOp { } } - static void unprotectedSatisfyStoragePolicy(BlockManager bm, - INodesInPath iip) throws IOException { + static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip, + BlockManager bm, FSDirectory fsd) throws IOException { - // check whether file exists. - INode inode = iip.getLastINode(); - if (inode == null) { - throw new FileNotFoundException("File/Directory does not exist: " - + iip.getPath()); - } + final INode inode = FSDirectory.resolveLastINode(iip); + final int snapshotId = iip.getLatestSnapshotId(); + final List candidateNodes = new ArrayList<>(); - // TODO: need to check whether inode's storage policy - // has been satisfied or inode exists in the satisfier - // list before calling satisfyStoragePolicy in BlockManager. - if (inode.isDirectory()) { - final int snapshotId = iip.getLatestSnapshotId(); + // TODO: think about optimization here, label the dir instead + // of the sub-files of the dir. + if (inode.isFile()) { + candidateNodes.add(inode); + } else if (inode.isDirectory()) { for (INode node : inode.asDirectory().getChildrenList(snapshotId)) { if (node.isFile()) { - bm.satisfyStoragePolicy(node.getId()); - + candidateNodes.add(node); } } - } else if (inode.isFile()) { - bm.satisfyStoragePolicy(inode.getId()); - } else { - throw new FileNotFoundException("File/Directory does not exist: " - + iip.getPath()); } + + // If node has satisfy xattr, then stop adding it + // to satisfy movement queue. + if (inodeHasSatisfyXAttr(candidateNodes)) { + throw new IOException( + "Cannot request to call satisfy storage policy on path " + + iip.getPath() + + ", as this file/dir was already called for satisfying " + + "storage policy."); + } + + final List xattrs = Lists.newArrayListWithCapacity(1); + final XAttr satisfyXAttr = + XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY); + xattrs.add(satisfyXAttr); + + for (INode node : candidateNodes) { + bm.satisfyStoragePolicy(node.getId()); + List existingXAttrs = XAttrStorage.readINodeXAttrs(node); + List newXAttrs = FSDirXAttrOp.setINodeXAttrs( + fsd, existingXAttrs, xattrs, EnumSet.of(XAttrSetFlag.CREATE)); + XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId); + } + return satisfyXAttr; + } + + private static boolean inodeHasSatisfyXAttr(List candidateNodes) { + // If the node is a directory and one of the child files + // has satisfy xattr, then return true for this directory. + for (INode inode : candidateNodes) { + final XAttrFeature f = inode.getXAttrFeature(); + if (inode.isFile() && + f != null && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) { + return true; + } + } + return false; } private static void setDirStoragePolicy( http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java index f676f36..91f34a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java @@ -41,6 +41,7 @@ import java.util.ListIterator; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; class FSDirXAttrOp { private static final XAttr KEYID_XATTR = @@ -278,6 +279,13 @@ class FSDirXAttrOp { ezProto.getKeyName()); } + // Add inode id to movement queue if xattrs contain satisfy xattr. + if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) { + FSDirAttrOp.unprotectedSatisfyStoragePolicy(iip, + fsd.getBlockManager(), fsd); + continue; + } + if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) { throw new IOException("Can only set '" + SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file."); http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index b21442d..277a06f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -87,6 +87,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE; import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER; +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID; /** @@ -1334,10 +1335,23 @@ public class FSDirectory implements Closeable { if (!inode.isSymlink()) { final XAttrFeature xaf = inode.getXAttrFeature(); addEncryptionZone((INodeWithAdditionalFields) inode, xaf); + addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf); } } } + private void addStoragePolicySatisfier(INodeWithAdditionalFields inode, + XAttrFeature xaf) { + if (xaf == null || inode.isDirectory()) { + return; + } + XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY); + if (xattr == null) { + return; + } + getBlockManager().satisfyStoragePolicy(inode.getId()); + } + private void addEncryptionZone(INodeWithAdditionalFields inode, XAttrFeature xaf) { if (xaf == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 79e54cc..88538da 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -2059,7 +2059,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, * * @param src file/directory path */ - void satisfyStoragePolicy(String src) throws IOException { + void satisfyStoragePolicy(String src, boolean logRetryCache) + throws IOException { checkOperation(OperationCategory.WRITE); writeLock(); try { @@ -2081,8 +2082,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, + " by admin. Seek for an admin help to activate it " + "or use Mover tool."); } - // TODO: need to update editlog for persistence. - FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src); + FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src, logRetryCache); } finally { writeUnlock(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 1f5198c..4b6e8ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -1321,7 +1321,18 @@ public class NameNodeRpcServer implements NamenodeProtocols { @Override // ClientProtocol public void satisfyStoragePolicy(String src) throws IOException { checkNNStartup(); - namesystem.satisfyStoragePolicy(src); + namesystem.checkOperation(OperationCategory.WRITE); + CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache); + if (cacheEntry != null && cacheEntry.isSuccess()) { + return; // Return previous response + } + boolean success = false; + try { + namesystem.satisfyStoragePolicy(src, cacheEntry != null); + success = true; + } finally { + RetryCache.setState(cacheEntry, success); + } } @Override // ClientProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index b1b1464..3b19833 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -168,16 +168,18 @@ public class StoragePolicySatisfier implements Runnable { } while (namesystem.isRunning() && isRunning) { try { - Long blockCollectionID = storageMovementNeeded.get(); - if (blockCollectionID != null) { - BlockCollection blockCollection = - namesystem.getBlockCollection(blockCollectionID); - // Check blockCollectionId existence. - if (blockCollection != null) { - boolean allBlockLocsAttemptedToSatisfy = - computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); - this.storageMovementsMonitor.add(blockCollectionID, - allBlockLocsAttemptedToSatisfy); + if (!namesystem.isInSafeMode()) { + Long blockCollectionID = storageMovementNeeded.get(); + if (blockCollectionID != null) { + BlockCollection blockCollection = + namesystem.getBlockCollection(blockCollectionID); + // Check blockCollectionId existence. + if (blockCollection != null) { + boolean allBlockLocsAttemptedToSatisfy = + computeAndAssignStorageMismatchedBlocksToDNs(blockCollection); + this.storageMovementsMonitor + .add(blockCollectionID, allBlockLocsAttemptedToSatisfy); + } } } // TODO: We can think to make this as configurable later, how frequently http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index 1fbc1d9..159253a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2114,4 +2114,41 @@ public class DFSTestUtil { assertFalse("File in trash : " + trashPath, fs.exists(trashPath)); } } + + /** + * Check whether the Block movement has been successfully + * completed to satisfy the storage policy for the given file. + * @param fileName file name. + * @param expectedStorageType storage type. + * @param expectedStorageCount expected storage type. + * @param timeout timeout. + * @param fs distributedFileSystem. + * @throws Exception + */ + public static void waitExpectedStorageType(String fileName, + final StorageType expectedStorageType, int expectedStorageCount, + int timeout, DistributedFileSystem fs) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + final LocatedBlock lb; + try { + lb = fs.getClient().getLocatedBlocks(fileName, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int actualStorageCount = 0; + for(StorageType type : lb.getStorageTypes()) { + if (expectedStorageType == type) { + actualStorageCount++; + } + } + LOG.info( + expectedStorageType + " replica count, expected=" + + expectedStorageCount + " and actual=" + actualStorageCount); + return expectedStorageCount == actualStorageCount; + } + }, 1000, timeout); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java new file mode 100644 index 0000000..e4b4290 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSTestUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; +import org.junit.Test; + +import java.io.IOException; + +/** + * Test persistence of satisfying files/directories. + */ +public class TestPersistentStoragePolicySatisfier { + + private static Configuration conf; + + private static MiniDFSCluster cluster; + private static DistributedFileSystem fs; + + private static Path testFile = + new Path("/testFile"); + private static String testFileName = testFile.toString(); + + private static Path parentDir = new Path("/parentDir"); + private static Path parentFile = new Path(parentDir, "parentFile"); + private static String parentFileName = parentFile.toString(); + private static Path childDir = new Path(parentDir, "childDir"); + private static Path childFile = new Path(childDir, "childFile"); + private static String childFileName = childFile.toString(); + + private static final String COLD = "COLD"; + private static final String WARM = "WARM"; + private static final String ONE_SSD = "ONE_SSD"; + private static final String ALL_SSD = "ALL_SSD"; + + private static StorageType[][] storageTypes = new StorageType[][] { + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.DISK, StorageType.SSD}, + {StorageType.SSD, StorageType.RAM_DISK}, + {StorageType.ARCHIVE, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.SSD} + }; + + private final int timeout = 300000; + + /** + * Setup environment for every test case. + * @throws IOException + */ + public void clusterSetUp() throws Exception { + clusterSetUp(false); + } + + /** + * Setup cluster environment. + * @param isHAEnabled if true, enable simple HA. + * @throws IOException + */ + private void clusterSetUp(boolean isHAEnabled) throws Exception { + conf = new HdfsConfiguration(); + final int dnNumber = storageTypes.length; + final short replication = 3; + MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf) + .storageTypes(storageTypes) + .numDataNodes(dnNumber); + if (isHAEnabled) { + clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology()); + } + cluster = clusterBuilder.build(); + cluster.waitActive(); + if (isHAEnabled) { + cluster.transitionToActive(0); + fs = HATestUtil.configureFailoverFs(cluster, conf); + } else { + fs = cluster.getFileSystem(); + } + + createTestFiles(fs, replication); + } + + /** + * Setup test files for testing. + * @param dfs + * @param replication + * @throws Exception + */ + private void createTestFiles(DistributedFileSystem dfs, + short replication) throws Exception { + DFSTestUtil.createFile(dfs, testFile, 1024L, replication, 0L); + DFSTestUtil.createFile(dfs, parentFile, 1024L, replication, 0L); + DFSTestUtil.createFile(dfs, childFile, 1024L, replication, 0L); + + DFSTestUtil.waitReplication(dfs, testFile, replication); + DFSTestUtil.waitReplication(dfs, parentFile, replication); + DFSTestUtil.waitReplication(dfs, childFile, replication); + } + + /** + * Tear down environment for every test case. + * @throws IOException + */ + private void clusterShutdown() throws IOException{ + if(fs != null) { + fs.close(); + fs = null; + } + if(cluster != null) { + cluster.shutdown(true); + cluster = null; + } + } + + /** + * While satisfying file/directory, trigger the cluster's checkpoint to + * make sure satisfier persistence work as expected. This test case runs + * as below: + * 1. use satisfyStoragePolicy and add xAttr to the file. + * 2. do the checkpoint by secondary NameNode. + * 3. restart the cluster immediately. + * 4. make sure all the storage policies are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithCheckpoint() throws Exception { + try { + clusterSetUp(); + fs.setStoragePolicy(testFile, WARM); + fs.satisfyStoragePolicy(testFile); + + // Start the checkpoint. + conf.set( + DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0"); + SecondaryNameNode secondary = new SecondaryNameNode(conf); + secondary.doCheckpoint(); + restartCluster(); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.ARCHIVE, 2, timeout, fs); + + fs.setStoragePolicy(parentDir, COLD); + fs.satisfyStoragePolicy(parentDir); + + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 3, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + + } finally { + clusterShutdown(); + } + } + + /** + * Tests to verify satisfier persistence working as expected + * in HA env. This test case runs as below: + * 1. setup HA cluster env with simple HA topology. + * 2. switch the active NameNode from nn0/nn1 to nn1/nn0. + * 3. make sure all the storage policies are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithHA() throws Exception { + try { + // Enable HA env for testing. + clusterSetUp(true); + + fs.setStoragePolicy(testFile, ALL_SSD); + fs.satisfyStoragePolicy(testFile); + + cluster.transitionToStandby(0); + cluster.transitionToActive(1); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.SSD, 3, timeout, fs); + + // test directory + fs.setStoragePolicy(parentDir, WARM); + fs.satisfyStoragePolicy(parentDir); + cluster.transitionToStandby(1); + cluster.transitionToActive(0); + + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 2, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + } finally { + clusterShutdown(); + } + } + + + /** + * Tests to verify satisfier persistence working well with multiple + * restarts operations. This test case runs as below: + * 1. satisfy the storage policy of file1. + * 2. restart the cluster. + * 3. check whether all the blocks are satisfied. + * 4. satisfy the storage policy of file2. + * 5. restart the cluster. + * 6. check whether all the blocks are satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithRestarts() throws Exception { + try { + clusterSetUp(); + fs.setStoragePolicy(testFile, ONE_SSD); + fs.satisfyStoragePolicy(testFile); + restartCluster(); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.SSD, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 2, timeout, fs); + + // test directory + fs.setStoragePolicy(parentDir, COLD); + fs.satisfyStoragePolicy(parentDir); + restartCluster(); + DFSTestUtil.waitExpectedStorageType( + parentFileName, StorageType.ARCHIVE, 3, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + childFileName, StorageType.DEFAULT, 3, timeout, fs); + } finally { + clusterShutdown(); + } + } + + /** + * Tests to verify satisfier persistence working well with + * federal HA env. This test case runs as below: + * 1. setup HA test environment with federal topology. + * 2. satisfy storage policy of file1. + * 3. switch active NameNode from nn0 to nn1. + * 4. switch active NameNode from nn2 to nn3. + * 5. check whether the storage policy of file1 is satisfied. + * @throws Exception + */ + @Test(timeout = 300000) + public void testWithFederationHA() throws Exception { + try { + conf = new HdfsConfiguration(); + final MiniDFSCluster haCluster = new MiniDFSCluster + .Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2)) + .storageTypes(storageTypes) + .numDataNodes(storageTypes.length).build(); + haCluster.waitActive(); + haCluster.transitionToActive(1); + haCluster.transitionToActive(3); + + fs = HATestUtil.configureFailoverFs(haCluster, conf); + createTestFiles(fs, (short) 3); + + fs.setStoragePolicy(testFile, WARM); + fs.satisfyStoragePolicy(testFile); + + haCluster.transitionToStandby(1); + haCluster.transitionToActive(0); + haCluster.transitionToStandby(3); + haCluster.transitionToActive(2); + + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.ARCHIVE, 2, timeout, fs); + + } finally { + clusterShutdown(); + } + } + + /** + * Restart the hole env and trigger the DataNode's heart beats. + * @throws Exception + */ + private void restartCluster() throws Exception { + cluster.restartDataNodes(); + cluster.restartNameNodes(); + cluster.waitActive(); + cluster.triggerHeartbeats(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java index 9abb78d..1c53894 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java @@ -108,7 +108,8 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -137,7 +138,8 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -164,8 +166,10 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier Identified that block to move to SSD // areas - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -195,8 +199,10 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till the block is moved to SSD areas - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -245,8 +251,10 @@ public class TestStoragePolicySatisfier { for (String fileName : files) { // Wait till the block is moved to SSD areas - waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000); - waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + fileName, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + fileName, StorageType.DISK, 2, 30000, dfs); } waitForBlocksMovementResult(blockCollectionIds.size(), 30000); @@ -279,7 +287,8 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till namenode notified about the block location details - waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -317,11 +326,14 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // take effect for the file in the directory. - waitExpectedStorageType(subFile1, StorageType.SSD, 1, 30000); - waitExpectedStorageType(subFile1, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + subFile1, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + subFile1, StorageType.DISK, 2, 30000, dfs); // take no effect for the sub-dir's file in the directory. - waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + subFile2, StorageType.DEFAULT, 3, 30000, dfs); } finally { shutdownCluster(); } @@ -367,6 +379,20 @@ public class TestStoragePolicySatisfier { } catch (FileNotFoundException e) { } + + try { + hdfsAdmin.satisfyStoragePolicy(new Path(file)); + hdfsAdmin.satisfyStoragePolicy(new Path(file)); + Assert.fail(String.format( + "Should failed to satisfy storage policy " + + "for %s ,since it has been " + + "added to satisfy movement queue.", file)); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains( + String.format("Cannot request to call satisfy storage policy " + + "on path %s, as this file/dir was already called for " + + "satisfying storage policy.", file), e); + } } finally { shutdownCluster(); } @@ -407,8 +433,10 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -451,7 +479,8 @@ public class TestStoragePolicySatisfier { // No block movement will be scheduled as there is no target node available // with the required storage type. waitForAttemptedItems(1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 3, 30000, dfs); // Since there is no target node the item will get timed out and then // re-attempted. waitForAttemptedItems(1, 30000); @@ -523,8 +552,10 @@ public class TestStoragePolicySatisfier { // with the required storage type. waitForAttemptedItems(1, 30000); waitForBlocksMovementResult(1, 30000); - waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000); - waitExpectedStorageType(file1, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.ARCHIVE, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file1, StorageType.DISK, 2, 30000, dfs); } /** @@ -571,8 +602,10 @@ public class TestStoragePolicySatisfier { hdfsCluster.triggerHeartbeats(); // Wait till StorgePolicySatisfier identified that block to move to // ARCHIVE area. - waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); - waitExpectedStorageType(file, StorageType.DISK, 3, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 2, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 3, 30000, dfs); waitForBlocksMovementResult(1, 30000); } finally { @@ -606,8 +639,10 @@ public class TestStoragePolicySatisfier { namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); hdfsCluster.triggerHeartbeats(); - waitExpectedStorageType(file, StorageType.SSD, 1, 30000); - waitExpectedStorageType(file, StorageType.DISK, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.SSD, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 2, 30000, dfs); } finally { shutdownCluster(); @@ -644,8 +679,10 @@ public class TestStoragePolicySatisfier { namesystem.getBlockManager().satisfyStoragePolicy(inode.getId()); hdfsCluster.triggerHeartbeats(); - waitExpectedStorageType(file, StorageType.DISK, 1, 30000); - waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.DISK, 1, 30000, dfs); + DFSTestUtil.waitExpectedStorageType( + file, StorageType.ARCHIVE, 2, 30000, dfs); } finally { shutdownCluster(); } @@ -771,33 +808,4 @@ public class TestStoragePolicySatisfier { cluster.waitActive(); return cluster; } - - // Check whether the Block movement has been successfully completed to satisfy - // the storage policy for the given file. - private void waitExpectedStorageType(final String fileName, - final StorageType expectedStorageType, int expectedStorageCount, - int timeout) throws Exception { - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - LocatedBlock lb = null; - try { - lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0); - } catch (IOException e) { - LOG.error("Exception while getting located blocks", e); - return false; - } - int actualStorageCount = 0; - for (StorageType storageType : lb.getStorageTypes()) { - if (expectedStorageType == storageType) { - actualStorageCount++; - } - } - LOG.info( - expectedStorageType + " replica count, expected={} and actual={}", - expectedStorageType, actualStorageCount); - return expectedStorageCount == actualStorageCount; - } - }, 100, timeout); - } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org For additional commands, e-mail: common-commits-help@hadoop.apache.org