hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From umamah...@apache.org
Subject hadoop git commit: HDFS-11150: [SPS]: Provide persistence when satisfying storage policy. Contributed by Yuanbo Liu
Date Wed, 11 Jan 2017 21:49:29 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 402cf3513 -> d5ae4ed4f


HDFS-11150: [SPS]: Provide persistence when satisfying storage policy. Contributed by Yuanbo
Liu


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d5ae4ed4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d5ae4ed4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d5ae4ed4

Branch: refs/heads/HDFS-10285
Commit: d5ae4ed4fe305090852a2360cf431fe89727e527
Parents: 402cf35
Author: Uma Maheswara Rao G <uma.gangumalla@intel.com>
Authored: Wed Jan 11 13:48:58 2017 -0800
Committer: Uma Maheswara Rao G <uma.gangumalla@intel.com>
Committed: Wed Jan 11 13:48:58 2017 -0800

----------------------------------------------------------------------
 .../hadoop/hdfs/protocol/ClientProtocol.java    |   2 +-
 .../hdfs/server/common/HdfsServerConstants.java |   3 +
 .../hdfs/server/namenode/FSDirAttrOp.java       |  81 +++--
 .../hdfs/server/namenode/FSDirXAttrOp.java      |   8 +
 .../hdfs/server/namenode/FSDirectory.java       |  14 +
 .../hdfs/server/namenode/FSNamesystem.java      |   6 +-
 .../hdfs/server/namenode/NameNodeRpcServer.java |  13 +-
 .../server/namenode/StoragePolicySatisfier.java |  22 +-
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  37 +++
 .../TestPersistentStoragePolicySatisfier.java   | 311 +++++++++++++++++++
 .../namenode/TestStoragePolicySatisfier.java    | 112 +++----
 11 files changed, 519 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
index 764ba52..44634a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
@@ -1558,7 +1558,7 @@ public interface ClientProtocol {
    * @throws org.apache.hadoop.hdfs.server.namenode.SafeModeException append not
    *           allowed in safemode.
    */
-  @Idempotent
+  @AtMostOnce
   void satisfyStoragePolicy(String path) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index d1f1d82..b028533 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -372,6 +372,9 @@ public interface HdfsServerConstants {
   String XATTR_ERASURECODING_POLICY =
       "system.hdfs.erasurecoding.policy";
 
+  String XATTR_SATISFY_STORAGE_POLICY =
+      "system.hdfs.satisfy.storage.policy";
+
   Path MOVER_ID_PATH = new Path("/system/mover.id");
 
   long BLOCK_GROUP_INDEX_MASK = 15;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 9fa5b8c..fa7bb61 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.fs.XAttrSetFlag;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -42,12 +43,14 @@ import com.google.common.collect.Lists;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_QUOTA_BY_STORAGETYPE_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 
 public class FSDirAttrOp {
   static HdfsFileStatus setPermission(
@@ -197,10 +200,11 @@ public class FSDirAttrOp {
     return fsd.getAuditFileInfo(iip);
   }
 
-  static void satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
-      String src) throws IOException {
+  static HdfsFileStatus satisfyStoragePolicy(FSDirectory fsd, BlockManager bm,
+      String src, boolean logRetryCache) throws IOException {
 
     FSPermissionChecker pc = fsd.getPermissionChecker();
+    List<XAttr> xAttrs = Lists.newArrayListWithCapacity(1);
     INodesInPath iip;
     fsd.writeLock();
     try {
@@ -210,10 +214,13 @@ public class FSDirAttrOp {
       if (fsd.isPermissionEnabled()) {
         fsd.checkPathAccess(pc, iip, FsAction.WRITE);
       }
-      unprotectedSatisfyStoragePolicy(bm, iip);
+      XAttr satisfyXAttr = unprotectedSatisfyStoragePolicy(iip, bm, fsd);
+      xAttrs.add(satisfyXAttr);
     } finally {
       fsd.writeUnlock();
     }
+    fsd.getEditLog().logSetXAttrs(src, xAttrs, logRetryCache);
+    return fsd.getAuditFileInfo(iip);
   }
 
   static BlockStoragePolicy[] getStoragePolicies(BlockManager bm)
@@ -477,33 +484,61 @@ public class FSDirAttrOp {
     }
   }
 
-  static void unprotectedSatisfyStoragePolicy(BlockManager bm,
-      INodesInPath iip) throws IOException {
+  static XAttr unprotectedSatisfyStoragePolicy(INodesInPath iip,
+      BlockManager bm, FSDirectory fsd) throws IOException {
 
-    // check whether file exists.
-    INode inode = iip.getLastINode();
-    if (inode == null) {
-      throw new FileNotFoundException("File/Directory does not exist: "
-          + iip.getPath());
-    }
+    final INode inode = FSDirectory.resolveLastINode(iip);
+    final int snapshotId = iip.getLatestSnapshotId();
+    final List<INode> candidateNodes = new ArrayList<>();
 
-    // TODO: need to check whether inode's storage policy
-    // has been satisfied or inode exists in the satisfier
-    // list before calling satisfyStoragePolicy in BlockManager.
-    if (inode.isDirectory()) {
-      final int snapshotId = iip.getLatestSnapshotId();
+    // TODO: think about optimization here, label the dir instead
+    // of the sub-files of the dir.
+    if (inode.isFile()) {
+      candidateNodes.add(inode);
+    } else if (inode.isDirectory()) {
       for (INode node : inode.asDirectory().getChildrenList(snapshotId)) {
         if (node.isFile()) {
-          bm.satisfyStoragePolicy(node.getId());
-
+          candidateNodes.add(node);
         }
       }
-    } else if (inode.isFile()) {
-      bm.satisfyStoragePolicy(inode.getId());
-    } else {
-      throw new FileNotFoundException("File/Directory does not exist: "
-          + iip.getPath());
     }
+
+    // If node has satisfy xattr, then stop adding it
+    // to satisfy movement queue.
+    if (inodeHasSatisfyXAttr(candidateNodes)) {
+      throw new IOException(
+          "Cannot request to call satisfy storage policy on path "
+          + iip.getPath()
+          + ", as this file/dir was already called for satisfying "
+          + "storage policy.");
+    }
+
+    final List<XAttr> xattrs = Lists.newArrayListWithCapacity(1);
+    final XAttr satisfyXAttr =
+        XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
+    xattrs.add(satisfyXAttr);
+
+    for (INode node : candidateNodes) {
+      bm.satisfyStoragePolicy(node.getId());
+      List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(node);
+      List<XAttr> newXAttrs = FSDirXAttrOp.setINodeXAttrs(
+          fsd, existingXAttrs, xattrs, EnumSet.of(XAttrSetFlag.CREATE));
+      XAttrStorage.updateINodeXAttrs(node, newXAttrs, snapshotId);
+    }
+    return satisfyXAttr;
+  }
+
+  private static boolean inodeHasSatisfyXAttr(List<INode> candidateNodes) {
+    // If the node is a directory and one of the child files
+    // has satisfy xattr, then return true for this directory.
+    for (INode inode : candidateNodes) {
+      final XAttrFeature f = inode.getXAttrFeature();
+      if (inode.isFile() &&
+          f != null && f.getXAttr(XATTR_SATISFY_STORAGE_POLICY) != null) {
+        return true;
+      }
+    }
+    return false;
   }
 
   private static void setDirStoragePolicy(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
index f676f36..91f34a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirXAttrOp.java
@@ -41,6 +41,7 @@ import java.util.ListIterator;
 
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 
 class FSDirXAttrOp {
   private static final XAttr KEYID_XATTR =
@@ -278,6 +279,13 @@ class FSDirXAttrOp {
             ezProto.getKeyName());
       }
 
+      // Add inode id to movement queue if xattrs contain satisfy xattr.
+      if (XATTR_SATISFY_STORAGE_POLICY.equals(xaName)) {
+        FSDirAttrOp.unprotectedSatisfyStoragePolicy(iip,
+            fsd.getBlockManager(), fsd);
+        continue;
+      }
+
       if (!isFile && SECURITY_XATTR_UNREADABLE_BY_SUPERUSER.equals(xaName)) {
         throw new IOException("Can only set '" +
             SECURITY_XATTR_UNREADABLE_BY_SUPERUSER + "' on a file.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index b21442d..277a06f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -87,6 +87,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_DE
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.CRYPTO_XATTR_ENCRYPTION_ZONE;
 import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
 import static org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot.CURRENT_STATE_ID;
 
 /**
@@ -1334,10 +1335,23 @@ public class FSDirectory implements Closeable {
       if (!inode.isSymlink()) {
         final XAttrFeature xaf = inode.getXAttrFeature();
         addEncryptionZone((INodeWithAdditionalFields) inode, xaf);
+        addStoragePolicySatisfier((INodeWithAdditionalFields) inode, xaf);
       }
     }
   }
 
+  private void addStoragePolicySatisfier(INodeWithAdditionalFields inode,
+      XAttrFeature xaf) {
+    if (xaf == null || inode.isDirectory()) {
+      return;
+    }
+    XAttr xattr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
+    if (xattr == null) {
+      return;
+    }
+    getBlockManager().satisfyStoragePolicy(inode.getId());
+  }
+
   private void addEncryptionZone(INodeWithAdditionalFields inode,
       XAttrFeature xaf) {
     if (xaf == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
index 79e54cc..88538da 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
@@ -2059,7 +2059,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
    *
    * @param src file/directory path
    */
-  void satisfyStoragePolicy(String src) throws IOException {
+  void satisfyStoragePolicy(String src, boolean logRetryCache)
+      throws IOException {
     checkOperation(OperationCategory.WRITE);
     writeLock();
     try {
@@ -2081,8 +2082,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
                 + " by admin. Seek for an admin help to activate it "
                 + "or use Mover tool.");
       }
-      // TODO: need to update editlog for persistence.
-      FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src);
+      FSDirAttrOp.satisfyStoragePolicy(dir, blockManager, src, logRetryCache);
     } finally {
       writeUnlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 1f5198c..4b6e8ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -1321,7 +1321,18 @@ public class NameNodeRpcServer implements NamenodeProtocols {
   @Override // ClientProtocol
   public void satisfyStoragePolicy(String src) throws IOException {
     checkNNStartup();
-    namesystem.satisfyStoragePolicy(src);
+    namesystem.checkOperation(OperationCategory.WRITE);
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    boolean success = false;
+    try {
+      namesystem.satisfyStoragePolicy(src, cacheEntry != null);
+      success = true;
+    } finally {
+      RetryCache.setState(cacheEntry, success);
+    }
   }
 
   @Override // ClientProtocol

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index b1b1464..3b19833 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -168,16 +168,18 @@ public class StoragePolicySatisfier implements Runnable {
     }
     while (namesystem.isRunning() && isRunning) {
       try {
-        Long blockCollectionID = storageMovementNeeded.get();
-        if (blockCollectionID != null) {
-          BlockCollection blockCollection =
-              namesystem.getBlockCollection(blockCollectionID);
-          // Check blockCollectionId existence.
-          if (blockCollection != null) {
-            boolean allBlockLocsAttemptedToSatisfy =
-                computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
-            this.storageMovementsMonitor.add(blockCollectionID,
-                allBlockLocsAttemptedToSatisfy);
+        if (!namesystem.isInSafeMode()) {
+          Long blockCollectionID = storageMovementNeeded.get();
+          if (blockCollectionID != null) {
+            BlockCollection blockCollection =
+                namesystem.getBlockCollection(blockCollectionID);
+            // Check blockCollectionId existence.
+            if (blockCollection != null) {
+              boolean allBlockLocsAttemptedToSatisfy =
+                  computeAndAssignStorageMismatchedBlocksToDNs(blockCollection);
+              this.storageMovementsMonitor
+                  .add(blockCollectionID, allBlockLocsAttemptedToSatisfy);
+            }
           }
         }
         // TODO: We can think to make this as configurable later, how frequently

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 1fbc1d9..159253a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -2114,4 +2114,41 @@ public class DFSTestUtil {
       assertFalse("File in trash : " + trashPath, fs.exists(trashPath));
     }
   }
+
+  /**
+   * Check whether the Block movement has been successfully
+   * completed to satisfy the storage policy for the given file.
+   * @param fileName file name.
+   * @param expectedStorageType storage type.
+   * @param expectedStorageCount expected storage type.
+   * @param timeout timeout.
+   * @param fs distributedFileSystem.
+   * @throws Exception
+   */
+  public static void waitExpectedStorageType(String fileName,
+      final StorageType expectedStorageType, int expectedStorageCount,
+      int timeout, DistributedFileSystem fs) throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        final LocatedBlock lb;
+        try {
+          lb = fs.getClient().getLocatedBlocks(fileName, 0).get(0);
+        } catch (IOException e) {
+          LOG.error("Exception while getting located blocks", e);
+          return false;
+        }
+        int actualStorageCount = 0;
+        for(StorageType type : lb.getStorageTypes()) {
+          if (expectedStorageType == type) {
+            actualStorageCount++;
+          }
+        }
+        LOG.info(
+            expectedStorageType + " replica count, expected="
+                + expectedStorageCount + " and actual=" + actualStorageCount);
+        return expectedStorageCount == actualStorageCount;
+      }
+    }, 1000, timeout);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
new file mode 100644
index 0000000..e4b4290
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -0,0 +1,311 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
+import org.junit.Test;
+
+import java.io.IOException;
+
+/**
+ * Test persistence of satisfying files/directories.
+ */
+public class TestPersistentStoragePolicySatisfier {
+
+  private static Configuration conf;
+
+  private static MiniDFSCluster cluster;
+  private static DistributedFileSystem fs;
+
+  private static Path testFile =
+      new Path("/testFile");
+  private static String testFileName = testFile.toString();
+
+  private static Path parentDir = new Path("/parentDir");
+  private static Path parentFile = new Path(parentDir, "parentFile");
+  private static String parentFileName = parentFile.toString();
+  private static Path childDir = new Path(parentDir, "childDir");
+  private static Path childFile = new Path(childDir, "childFile");
+  private static String childFileName = childFile.toString();
+
+  private static final String COLD = "COLD";
+  private static final String WARM = "WARM";
+  private static final String ONE_SSD = "ONE_SSD";
+  private static final String ALL_SSD = "ALL_SSD";
+
+  private static StorageType[][] storageTypes = new StorageType[][] {
+      {StorageType.ARCHIVE, StorageType.DISK},
+      {StorageType.DISK, StorageType.SSD},
+      {StorageType.SSD, StorageType.RAM_DISK},
+      {StorageType.ARCHIVE, StorageType.DISK},
+      {StorageType.ARCHIVE, StorageType.SSD}
+  };
+
+  private final int timeout = 300000;
+
+  /**
+   * Setup environment for every test case.
+   * @throws IOException
+   */
+  public void clusterSetUp() throws Exception {
+    clusterSetUp(false);
+  }
+
+  /**
+   * Setup cluster environment.
+   * @param isHAEnabled if true, enable simple HA.
+   * @throws IOException
+   */
+  private void clusterSetUp(boolean isHAEnabled) throws Exception {
+    conf = new HdfsConfiguration();
+    final int dnNumber = storageTypes.length;
+    final short replication = 3;
+    MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
+        .storageTypes(storageTypes)
+        .numDataNodes(dnNumber);
+    if (isHAEnabled) {
+      clusterBuilder.nnTopology(MiniDFSNNTopology.simpleHATopology());
+    }
+    cluster = clusterBuilder.build();
+    cluster.waitActive();
+    if (isHAEnabled) {
+      cluster.transitionToActive(0);
+      fs = HATestUtil.configureFailoverFs(cluster, conf);
+    } else {
+      fs = cluster.getFileSystem();
+    }
+
+    createTestFiles(fs, replication);
+  }
+
+  /**
+   * Setup test files for testing.
+   * @param dfs
+   * @param replication
+   * @throws Exception
+   */
+  private void createTestFiles(DistributedFileSystem dfs,
+      short replication) throws Exception {
+    DFSTestUtil.createFile(dfs, testFile, 1024L, replication, 0L);
+    DFSTestUtil.createFile(dfs, parentFile, 1024L, replication, 0L);
+    DFSTestUtil.createFile(dfs, childFile, 1024L, replication, 0L);
+
+    DFSTestUtil.waitReplication(dfs, testFile, replication);
+    DFSTestUtil.waitReplication(dfs, parentFile, replication);
+    DFSTestUtil.waitReplication(dfs, childFile, replication);
+  }
+
+  /**
+   * Tear down environment for every test case.
+   * @throws IOException
+   */
+  private void clusterShutdown() throws IOException{
+    if(fs != null) {
+      fs.close();
+      fs = null;
+    }
+    if(cluster != null) {
+      cluster.shutdown(true);
+      cluster = null;
+    }
+  }
+
+  /**
+   * While satisfying file/directory, trigger the cluster's checkpoint to
+   * make sure satisfier persistence work as expected. This test case runs
+   * as below:
+   * 1. use satisfyStoragePolicy and add xAttr to the file.
+   * 2. do the checkpoint by secondary NameNode.
+   * 3. restart the cluster immediately.
+   * 4. make sure all the storage policies are satisfied.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testWithCheckpoint() throws Exception {
+    try {
+      clusterSetUp();
+      fs.setStoragePolicy(testFile, WARM);
+      fs.satisfyStoragePolicy(testFile);
+
+      // Start the checkpoint.
+      conf.set(
+          DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+      SecondaryNameNode secondary = new SecondaryNameNode(conf);
+      secondary.doCheckpoint();
+      restartCluster();
+
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.DISK, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.ARCHIVE, 2, timeout, fs);
+
+      fs.setStoragePolicy(parentDir, COLD);
+      fs.satisfyStoragePolicy(parentDir);
+
+      DFSTestUtil.waitExpectedStorageType(
+          parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
+   * Tests to verify satisfier persistence working as expected
+   * in HA env. This test case runs as below:
+   * 1. setup HA cluster env with simple HA topology.
+   * 2. switch the active NameNode from nn0/nn1 to nn1/nn0.
+   * 3. make sure all the storage policies are satisfied.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testWithHA() throws Exception {
+    try {
+      // Enable HA env for testing.
+      clusterSetUp(true);
+
+      fs.setStoragePolicy(testFile, ALL_SSD);
+      fs.satisfyStoragePolicy(testFile);
+
+      cluster.transitionToStandby(0);
+      cluster.transitionToActive(1);
+
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.SSD, 3, timeout, fs);
+
+      // test directory
+      fs.setStoragePolicy(parentDir, WARM);
+      fs.satisfyStoragePolicy(parentDir);
+      cluster.transitionToStandby(1);
+      cluster.transitionToActive(0);
+
+      DFSTestUtil.waitExpectedStorageType(
+          parentFileName, StorageType.DISK, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          parentFileName, StorageType.ARCHIVE, 2, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+
+  /**
+   * Tests to verify satisfier persistence working well with multiple
+   * restarts operations. This test case runs as below:
+   * 1. satisfy the storage policy of file1.
+   * 2. restart the cluster.
+   * 3. check whether all the blocks are satisfied.
+   * 4. satisfy the storage policy of file2.
+   * 5. restart the cluster.
+   * 6. check whether all the blocks are satisfied.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testWithRestarts() throws Exception {
+    try {
+      clusterSetUp();
+      fs.setStoragePolicy(testFile, ONE_SSD);
+      fs.satisfyStoragePolicy(testFile);
+      restartCluster();
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.SSD, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.DISK, 2, timeout, fs);
+
+      // test directory
+      fs.setStoragePolicy(parentDir, COLD);
+      fs.satisfyStoragePolicy(parentDir);
+      restartCluster();
+      DFSTestUtil.waitExpectedStorageType(
+          parentFileName, StorageType.ARCHIVE, 3, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          childFileName, StorageType.DEFAULT, 3, timeout, fs);
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
+   * Tests to verify satisfier persistence working well with
+   * federal HA env. This test case runs as below:
+   * 1. setup HA test environment with federal topology.
+   * 2. satisfy storage policy of file1.
+   * 3. switch active NameNode from nn0 to nn1.
+   * 4. switch active NameNode from nn2 to nn3.
+   * 5. check whether the storage policy of file1 is satisfied.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testWithFederationHA() throws Exception {
+    try {
+      conf = new HdfsConfiguration();
+      final MiniDFSCluster haCluster = new MiniDFSCluster
+          .Builder(conf)
+          .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
+          .storageTypes(storageTypes)
+          .numDataNodes(storageTypes.length).build();
+      haCluster.waitActive();
+      haCluster.transitionToActive(1);
+      haCluster.transitionToActive(3);
+
+      fs = HATestUtil.configureFailoverFs(haCluster, conf);
+      createTestFiles(fs, (short) 3);
+
+      fs.setStoragePolicy(testFile, WARM);
+      fs.satisfyStoragePolicy(testFile);
+
+      haCluster.transitionToStandby(1);
+      haCluster.transitionToActive(0);
+      haCluster.transitionToStandby(3);
+      haCluster.transitionToActive(2);
+
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.DISK, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.ARCHIVE, 2, timeout, fs);
+
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
+   * Restart the hole env and trigger the DataNode's heart beats.
+   * @throws Exception
+   */
+  private void restartCluster() throws Exception {
+    cluster.restartDataNodes();
+    cluster.restartNameNodes();
+    cluster.waitActive();
+    cluster.triggerHeartbeats();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d5ae4ed4/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
index 9abb78d..1c53894 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -108,7 +108,8 @@ public class TestStoragePolicySatisfier {
 
       hdfsCluster.triggerHeartbeats();
       // Wait till namenode notified about the block location details
-      waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.ARCHIVE, 3, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -137,7 +138,8 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
-      waitExpectedStorageType(file, StorageType.SSD, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.SSD, 3, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -164,8 +166,10 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier Identified that block to move to SSD
       // areas
-      waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 2, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -195,8 +199,10 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
 
       // Wait till the block is moved to SSD areas
-      waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 2, 30000, dfs);
 
       waitForBlocksMovementResult(1, 30000);
     } finally {
@@ -245,8 +251,10 @@ public class TestStoragePolicySatisfier {
 
       for (String fileName : files) {
         // Wait till the block is moved to SSD areas
-        waitExpectedStorageType(fileName, StorageType.SSD, 1, 30000);
-        waitExpectedStorageType(fileName, StorageType.DISK, 2, 30000);
+        DFSTestUtil.waitExpectedStorageType(
+            fileName, StorageType.SSD, 1, 30000, dfs);
+        DFSTestUtil.waitExpectedStorageType(
+            fileName, StorageType.DISK, 2, 30000, dfs);
       }
 
       waitForBlocksMovementResult(blockCollectionIds.size(), 30000);
@@ -279,7 +287,8 @@ public class TestStoragePolicySatisfier {
 
       hdfsCluster.triggerHeartbeats();
       // Wait till namenode notified about the block location details
-      waitExpectedStorageType(file, StorageType.ARCHIVE, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.ARCHIVE, 3, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -317,11 +326,14 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
 
       // take effect for the file in the directory.
-      waitExpectedStorageType(subFile1, StorageType.SSD, 1, 30000);
-      waitExpectedStorageType(subFile1, StorageType.DISK, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile1, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile1, StorageType.DISK, 2, 30000, dfs);
 
       // take no effect for the sub-dir's file in the directory.
-      waitExpectedStorageType(subFile2, StorageType.DEFAULT, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          subFile2, StorageType.DEFAULT, 3, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -367,6 +379,20 @@ public class TestStoragePolicySatisfier {
       } catch (FileNotFoundException e) {
 
       }
+
+      try {
+        hdfsAdmin.satisfyStoragePolicy(new Path(file));
+        hdfsAdmin.satisfyStoragePolicy(new Path(file));
+        Assert.fail(String.format(
+            "Should failed to satisfy storage policy "
+            + "for %s ,since it has been "
+            + "added to satisfy movement queue.", file));
+      } catch (IOException e) {
+        GenericTestUtils.assertExceptionContains(
+            String.format("Cannot request to call satisfy storage policy "
+                + "on path %s, as this file/dir was already called for "
+                + "satisfying storage policy.", file), e);
+      }
     } finally {
       shutdownCluster();
     }
@@ -407,8 +433,10 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
-      waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.ARCHIVE, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 2, 30000, dfs);
 
       waitForBlocksMovementResult(1, 30000);
     } finally {
@@ -451,7 +479,8 @@ public class TestStoragePolicySatisfier {
       // No block movement will be scheduled as there is no target node available
       // with the required storage type.
       waitForAttemptedItems(1, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 3, 30000, dfs);
       // Since there is no target node the item will get timed out and then
       // re-attempted.
       waitForAttemptedItems(1, 30000);
@@ -523,8 +552,10 @@ public class TestStoragePolicySatisfier {
     // with the required storage type.
     waitForAttemptedItems(1, 30000);
     waitForBlocksMovementResult(1, 30000);
-    waitExpectedStorageType(file1, StorageType.ARCHIVE, 1, 30000);
-    waitExpectedStorageType(file1, StorageType.DISK, 2, 30000);
+    DFSTestUtil.waitExpectedStorageType(
+        file1, StorageType.ARCHIVE, 1, 30000, dfs);
+    DFSTestUtil.waitExpectedStorageType(
+        file1, StorageType.DISK, 2, 30000, dfs);
   }
 
   /**
@@ -571,8 +602,10 @@ public class TestStoragePolicySatisfier {
       hdfsCluster.triggerHeartbeats();
       // Wait till StorgePolicySatisfier identified that block to move to
       // ARCHIVE area.
-      waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 3, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.ARCHIVE, 2, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 3, 30000, dfs);
 
       waitForBlocksMovementResult(1, 30000);
     } finally {
@@ -606,8 +639,10 @@ public class TestStoragePolicySatisfier {
 
       namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
       hdfsCluster.triggerHeartbeats();
-      waitExpectedStorageType(file, StorageType.SSD, 1, 30000);
-      waitExpectedStorageType(file, StorageType.DISK, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.SSD, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 2, 30000, dfs);
 
     } finally {
       shutdownCluster();
@@ -644,8 +679,10 @@ public class TestStoragePolicySatisfier {
       namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
       hdfsCluster.triggerHeartbeats();
 
-      waitExpectedStorageType(file, StorageType.DISK, 1, 30000);
-      waitExpectedStorageType(file, StorageType.ARCHIVE, 2, 30000);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.DISK, 1, 30000, dfs);
+      DFSTestUtil.waitExpectedStorageType(
+          file, StorageType.ARCHIVE, 2, 30000, dfs);
     } finally {
       shutdownCluster();
     }
@@ -771,33 +808,4 @@ public class TestStoragePolicySatisfier {
     cluster.waitActive();
     return cluster;
   }
-
-  // Check whether the Block movement has been successfully completed to satisfy
-  // the storage policy for the given file.
-  private void waitExpectedStorageType(final String fileName,
-      final StorageType expectedStorageType, int expectedStorageCount,
-      int timeout) throws Exception {
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        LocatedBlock lb = null;
-        try {
-          lb = dfs.getClient().getLocatedBlocks(fileName, 0).get(0);
-        } catch (IOException e) {
-          LOG.error("Exception while getting located blocks", e);
-          return false;
-        }
-        int actualStorageCount = 0;
-        for (StorageType storageType : lb.getStorageTypes()) {
-          if (expectedStorageType == storageType) {
-            actualStorageCount++;
-          }
-        }
-        LOG.info(
-            expectedStorageType + " replica count, expected={} and actual={}",
-            expectedStorageType, actualStorageCount);
-        return expectedStorageCount == actualStorageCount;
-      }
-    }, 100, timeout);
-  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message