hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject [40/50] [abbrv] hadoop git commit: HDFS-11336: [SPS]: Remove xAttrs when movements done or SPS disabled. Contributed by Yuanbo Liu.
Date Tue, 11 Jul 2017 16:25:26 GMT
HDFS-11336: [SPS]: Remove xAttrs when movements done or SPS disabled. Contributed by Yuanbo
Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8da2dc74
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8da2dc74
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8da2dc74

Branch: refs/heads/HDFS-10285
Commit: 8da2dc74a44c08bfd48beab824bdadcce8eb36ce
Parents: fa6a160
Author: Uma Maheswara Rao G <uma.gangumalla@intel.com>
Authored: Tue Mar 14 00:52:24 2017 -0700
Committer: Rakesh Radhakrishnan <rakeshr@apache.org>
Committed: Tue Jul 11 19:35:07 2017 +0530

----------------------------------------------------------------------
 .../BlockStorageMovementAttemptedItems.java     |  14 ++-
 .../hdfs/server/namenode/FSDirAttrOp.java       |   8 ++
 .../hdfs/server/namenode/FSDirectory.java       |  16 +++
 .../server/namenode/StoragePolicySatisfier.java |  45 ++++++--
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |   2 +-
 .../TestBlockStorageMovementAttemptedItems.java |   6 +-
 .../TestPersistentStoragePolicySatisfier.java   | 112 ++++++++++++++++++-
 7 files changed, 186 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
index 042aca3..f15db73 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.util.Time.monotonicNow;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
@@ -54,6 +55,7 @@ public class BlockStorageMovementAttemptedItems {
   private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
   private volatile boolean monitorRunning = true;
   private Daemon timerThread = null;
+  private final StoragePolicySatisfier sps;
   //
   // It might take anywhere between 30 to 60 minutes before
   // a request is timed out.
@@ -69,7 +71,8 @@ public class BlockStorageMovementAttemptedItems {
 
   public BlockStorageMovementAttemptedItems(long recheckTimeout,
       long selfRetryTimeout,
-      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
+      BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
+      StoragePolicySatisfier sps) {
     if (recheckTimeout > 0) {
       this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout);
     }
@@ -78,6 +81,7 @@ public class BlockStorageMovementAttemptedItems {
     this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
     storageMovementAttemptedItems = new HashMap<>();
     storageMovementAttemptedResults = new ArrayList<>();
+    this.sps = sps;
   }
 
   /**
@@ -200,6 +204,9 @@ public class BlockStorageMovementAttemptedItems {
         } catch (InterruptedException ie) {
           LOG.info("BlocksStorageMovementAttemptResultMonitor thread "
               + "is interrupted.", ie);
+        } catch (IOException ie) {
+          LOG.warn("BlocksStorageMovementAttemptResultMonitor thread "
+              + "received exception and exiting.", ie);
         }
       }
     }
@@ -248,7 +255,7 @@ public class BlockStorageMovementAttemptedItems {
   }
 
   @VisibleForTesting
-  void blockStorageMovementResultCheck() {
+  void blockStorageMovementResultCheck() throws IOException {
     synchronized (storageMovementAttemptedResults) {
       Iterator<BlocksStorageMovementResult> resultsIter =
           storageMovementAttemptedResults.iterator();
@@ -296,6 +303,9 @@ public class BlockStorageMovementAttemptedItems {
                   + " reported from co-ordinating datanode. But the trackID "
                   + "doesn't exists in storageMovementAttemptedItems list",
                   storageMovementAttemptedResult.getTrackId());
+              // Remove xattr for the track id.
+              this.sps.notifyBlkStorageMovementFinished(
+                  storageMovementAttemptedResult.getTrackId());
             }
           }
           // Remove trackID from the attempted list, if any.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
index 2a27df1..e604726 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java
@@ -541,6 +541,14 @@ public class FSDirAttrOp {
     return false;
   }
 
+  static void unprotectedRemoveSPSXAttr(INode inode, XAttr spsXAttr)
+      throws IOException{
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    existingXAttrs.remove(spsXAttr);
+    XAttrStorage.updateINodeXAttrs(inode, existingXAttrs,
+        INodesInPath.fromINode(inode).getLatestSnapshotId());
+  }
+
   private static void setDirStoragePolicy(
       FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException {
     INode inode = FSDirectory.resolveLastINode(iip);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
index 87030db9..cabc849 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
@@ -1355,6 +1355,22 @@ public class FSDirectory implements Closeable {
     getBlockManager().satisfyStoragePolicy(inode.getId());
   }
 
+  /**
+   * Remove the SPS xattr from the inode, retrieve the inode from the
+   * block collection id.
+   * @param id
+   *           - file block collection id.
+   */
+  public void removeSPSXattr(long id) throws IOException {
+    final INode inode = getInode(id);
+    final XAttrFeature xaf = inode.getXAttrFeature();
+    final XAttr spsXAttr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY);
+
+    if (spsXAttr != null) {
+      FSDirAttrOp.unprotectedRemoveSPSXAttr(inode, spsXAttr);
+    }
+  }
+
   private void addEncryptionZone(INodeWithAdditionalFields inode,
       XAttrFeature xaf) {
     if (xaf == null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 29c8a5d..337d5b5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -91,7 +92,8 @@ public class StoragePolicySatisfier implements Runnable {
         conf.getLong(
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY,
             DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT),
-        storageMovementNeeded);
+        storageMovementNeeded,
+        this);
   }
 
   /**
@@ -119,12 +121,6 @@ public class StoragePolicySatisfier implements Runnable {
    */
   public synchronized void stop(boolean reconfigStop) {
     isRunning = false;
-    if (reconfigStop) {
-      LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
-          + "deactivate it.");
-    } else {
-      LOG.info("Stopping StoragePolicySatisfier.");
-    }
     if (storagePolicySatisfierThread == null) {
       return;
     }
@@ -135,8 +131,12 @@ public class StoragePolicySatisfier implements Runnable {
     }
     this.storageMovementsMonitor.stop();
     if (reconfigStop) {
-      this.clearQueues();
+      LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+          + "deactivate it.");
+      this.clearQueuesWithNotification();
       this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+    } else {
+      LOG.info("Stopping StoragePolicySatisfier.");
     }
   }
 
@@ -717,4 +717,33 @@ public class StoragePolicySatisfier implements Runnable {
         + "user requests on satisfying block storages would be discarded.");
     storageMovementNeeded.clearAll();
   }
+
+  /**
+   * Clean all the movements in storageMovementNeeded and notify
+   * to clean up required resources.
+   * @throws IOException
+   */
+  private void clearQueuesWithNotification() {
+    Long id;
+    while ((id = storageMovementNeeded.get()) != null) {
+      try {
+        notifyBlkStorageMovementFinished(id);
+      } catch (IOException ie) {
+        LOG.warn("Failed to remove SPS "
+            + "xattr for collection id " + id, ie);
+      }
+    }
+  }
+
+  /**
+   * When block movement has been finished successfully, some additional
+   * operations should be notified, for example, SPS xattr should be
+   * removed.
+   * @param trackId track id i.e., block collection id.
+   * @throws IOException
+   */
+  public void notifyBlkStorageMovementFinished(long trackId)
+      throws IOException {
+    this.namesystem.getFSDirectory().removeSPSXattr(trackId);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 7e48cde..aea4dac 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -2320,6 +2320,6 @@ public class DFSTestUtil {
                 + expectedStorageCount + " and actual=" + actualStorageCount);
         return expectedStorageCount == actualStorageCount;
       }
-    }, 1000, timeout);
+    }, 500, timeout);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
index 6641134..95142d3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 /**
  * Tests that block storage movement attempt failures are reported from DN and
@@ -36,10 +37,11 @@ public class TestBlockStorageMovementAttemptedItems {
   private final int selfRetryTimeout = 500;
 
   @Before
-  public void setup() {
+  public void setup() throws Exception {
     unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
+    StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class);
     bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
-        selfRetryTimeout, unsatisfiedStorageMovementFiles);
+        selfRetryTimeout, unsatisfiedStorageMovementFiles, sps);
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8da2dc74/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
index e4b4290..8c3359a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java
@@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttr;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.XAttrHelper;
 import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.List;
+
+import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
+import static org.junit.Assert.assertFalse;
 
 /**
  * Test persistence of satisfying files/directories.
@@ -72,7 +78,16 @@ public class TestPersistentStoragePolicySatisfier {
    * @throws IOException
    */
   public void clusterSetUp() throws Exception {
-    clusterSetUp(false);
+    clusterSetUp(false, new HdfsConfiguration());
+  }
+
+  /**
+   * Setup environment for every test case.
+   * @param hdfsConf hdfs conf.
+   * @throws Exception
+   */
+  public void clusterSetUp(Configuration hdfsConf) throws Exception {
+    clusterSetUp(false, hdfsConf);
   }
 
   /**
@@ -80,8 +95,9 @@ public class TestPersistentStoragePolicySatisfier {
    * @param isHAEnabled if true, enable simple HA.
    * @throws IOException
    */
-  private void clusterSetUp(boolean isHAEnabled) throws Exception {
-    conf = new HdfsConfiguration();
+  private void clusterSetUp(boolean isHAEnabled, Configuration newConf)
+      throws Exception {
+    conf = newConf;
     final int dnNumber = storageTypes.length;
     final short replication = 3;
     MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf)
@@ -188,7 +204,7 @@ public class TestPersistentStoragePolicySatisfier {
   public void testWithHA() throws Exception {
     try {
       // Enable HA env for testing.
-      clusterSetUp(true);
+      clusterSetUp(true, new HdfsConfiguration());
 
       fs.setStoragePolicy(testFile, ALL_SSD);
       fs.satisfyStoragePolicy(testFile);
@@ -298,6 +314,94 @@ public class TestPersistentStoragePolicySatisfier {
   }
 
   /**
+   * Tests to verify SPS xattr will be removed if the satisfy work has
+   * been finished, expect that the method satisfyStoragePolicy can be
+   * invoked on the same file again after the block movement has been
+   * finished:
+   * 1. satisfy storage policy of file1.
+   * 2. wait until storage policy is satisfied.
+   * 3. satisfy storage policy of file1 again
+   * 4. make sure step 3 works as expected.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testMultipleSatisfyStoragePolicy() throws Exception {
+    try {
+      // Lower block movement check for testing.
+      conf = new HdfsConfiguration();
+      final long minCheckTimeout = 500; // minimum value
+      conf.setLong(
+          DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
+          minCheckTimeout);
+      clusterSetUp(conf);
+      fs.setStoragePolicy(testFile, ONE_SSD);
+      fs.satisfyStoragePolicy(testFile);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.SSD, 1, timeout, fs);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.DISK, 2, timeout, fs);
+
+      // Make sure that SPS xattr has been removed.
+      int retryTime = 0;
+      while (retryTime < 30) {
+        if (!fileContainsSPSXAttr(testFile)) {
+          break;
+        }
+        Thread.sleep(minCheckTimeout);
+        retryTime += 1;
+      }
+
+      fs.setStoragePolicy(testFile, COLD);
+      fs.satisfyStoragePolicy(testFile);
+      DFSTestUtil.waitExpectedStorageType(
+          testFileName, StorageType.ARCHIVE, 3, timeout, fs);
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
+   * Tests to verify SPS xattr is removed after SPS is dropped,
+   * expect that if the SPS is disabled/dropped, the SPS
+   * xattr should be removed accordingly:
+   * 1. satisfy storage policy of file1.
+   * 2. drop SPS thread in block manager.
+   * 3. make sure sps xattr is removed.
+   * @throws Exception
+   */
+  @Test(timeout = 300000)
+  public void testDropSPS() throws Exception {
+    try {
+      clusterSetUp();
+      fs.setStoragePolicy(testFile, ONE_SSD);
+      fs.satisfyStoragePolicy(testFile);
+
+      cluster.getNamesystem().getBlockManager().deactivateSPS();
+
+      // Make sure satisfy xattr has been removed.
+      assertFalse(fileContainsSPSXAttr(testFile));
+
+    } finally {
+      clusterShutdown();
+    }
+  }
+
+  /**
+   * Check whether file contains SPS xattr.
+   * @param fileName file name.
+   * @return true if file contains SPS xattr.
+   * @throws IOException
+   */
+  private boolean fileContainsSPSXAttr(Path fileName) throws IOException {
+    final INode inode = cluster.getNamesystem()
+        .getFSDirectory().getINode(fileName.toString());
+    final XAttr satisfyXAttr =
+        XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY);
+    List<XAttr> existingXAttrs = XAttrStorage.readINodeXAttrs(inode);
+    return existingXAttrs.contains(satisfyXAttr);
+  }
+
+  /**
    * Restart the hole env and trigger the DataNode's heart beats.
    * @throws Exception
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message