hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject [50/50] [abbrv] hadoop git commit: HDFS-13050: [SPS]: Create start/stop script to start external SPS process. Contributed by Surendra Singh Lilhore.
Date Mon, 29 Jan 2018 04:12:55 GMT
HDFS-13050: [SPS]: Create start/stop script to start external SPS process. Contributed by Surendra
Singh Lilhore.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3b0deb6b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3b0deb6b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3b0deb6b

Branch: refs/heads/HDFS-10285
Commit: 3b0deb6be60f5f7fa6fec590da1d2620db0fcb73
Parents: db594af
Author: Rakesh Radhakrishnan <rakeshr@apache.org>
Authored: Mon Jan 29 03:10:48 2018 +0530
Committer: Rakesh Radhakrishnan <rakeshr@apache.org>
Committed: Mon Jan 29 09:22:09 2018 +0530

----------------------------------------------------------------------
 .../hadoop-hdfs/src/main/bin/hdfs               |   5 +
 .../server/blockmanagement/BlockManager.java    |   9 ++
 .../apache/hadoop/hdfs/server/mover/Mover.java  |   2 +-
 .../hdfs/server/namenode/sps/Context.java       |   5 -
 .../namenode/sps/IntraSPSNameNodeContext.java   |   4 -
 .../sps/IntraSPSNameNodeFileIdCollector.java    |  12 +-
 .../hdfs/server/namenode/sps/SPSPathIds.java    |   1 +
 .../namenode/sps/StoragePolicySatisfier.java    |  83 +++++++-----
 .../sps/ExternalSPSBlockMoveTaskHandler.java    |   2 +-
 .../hdfs/server/sps/ExternalSPSContext.java     |  57 +-------
 .../server/sps/ExternalSPSFileIDCollector.java  |  12 +-
 .../sps/ExternalStoragePolicySatisfier.java     | 130 +++++++++++++++++++
 .../src/site/markdown/ArchivalStorage.md        |  10 +-
 .../sps/TestStoragePolicySatisfier.java         |  22 ++--
 .../sps/TestExternalStoragePolicySatisfier.java |  33 +++--
 15 files changed, 259 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
index bc6e7a4..94426a5 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs
@@ -63,6 +63,7 @@ function hadoop_usage
   hadoop_add_subcommand "secondarynamenode" daemon "run the DFS secondary namenode"
   hadoop_add_subcommand "snapshotDiff" client "diff two snapshots of a directory or diff
the current directory contents with a snapshot"
   hadoop_add_subcommand "storagepolicies" admin "list/get/set/satisfyStoragePolicy block
storage policies"
+  hadoop_add_subcommand "sps" daemon "run external storagepolicysatisfier"
   hadoop_add_subcommand "version" client "print the version"
   hadoop_add_subcommand "zkfc" daemon "run the ZK Failover Controller daemon"
   hadoop_generate_usage "${HADOOP_SHELL_EXECNAME}" false
@@ -201,6 +202,10 @@ function hdfscmd_case
     storagepolicies)
       HADOOP_CLASSNAME=org.apache.hadoop.hdfs.tools.StoragePolicyAdmin
     ;;
+    sps)
+      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
+      HADOOP_CLASSNAME=org.apache.hadoop.hdfs.server.sps.ExternalStoragePolicySatisfier
+    ;;
     version)
       HADOOP_CLASSNAME=org.apache.hadoop.util.VersionInfo
     ;;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e012ae4..dd3ec43 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -94,6 +94,9 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.Namesystem;
 import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
+import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeFileIdCollector;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSPathIds;
 import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
 import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
@@ -5099,9 +5102,15 @@ public class BlockManager implements BlockStatsMXBean {
       return;
     }
     updateSPSMode(StoragePolicySatisfierMode.INTERNAL);
+    sps.init(new IntraSPSNameNodeContext(this.namesystem, this, sps),
+        new IntraSPSNameNodeFileIdCollector(this.namesystem.getFSDirectory(),
+            sps),
+        new IntraSPSNameNodeBlockMoveTaskHandler(this, this.namesystem), null);
     sps.start(true, spsMode);
   }
 
+
+
   /**
    * Enable storage policy satisfier by starting its service.
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
index b4e9716..2cc0e27 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java
@@ -672,7 +672,7 @@ public class Mover {
           }
           if (spsRunning) {
             System.err.println("Mover failed due to StoragePolicySatisfier"
-                + " is running. Exiting with status "
+                + " service running inside namenode. Exiting with status "
                 + ExitStatus.SKIPPED_DUE_TO_SPS + "... ");
             return ExitStatus.SKIPPED_DUE_TO_SPS.getExitCode();
           }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
index bddbc1b..ff4ad6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/Context.java
@@ -175,9 +175,4 @@ public interface Context {
    */
   String getFilePath(Long inodeId);
 
-  /**
-   * Close the resources.
-   */
-  void close() throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
index 191886c..ff6cc21 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeContext.java
@@ -196,8 +196,4 @@ public class IntraSPSNameNodeContext implements Context {
     return namesystem.getFilePath(inodeId);
   }
 
-  @Override
-  public void close() throws IOException {
-    // Nothing to clean.
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
index f7cd754..7a44dd9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/IntraSPSNameNodeFileIdCollector.java
@@ -158,11 +158,15 @@ public class IntraSPSNameNodeFileIdCollector extends FSTreeTraverser
    */
   public synchronized int remainingCapacity() {
     int size = service.processingQueueSize();
-    if (size >= maxQueueLimitToScan) {
-      return 0;
-    } else {
-      return (maxQueueLimitToScan - size);
+    int remainingSize = 0;
+    if (size < maxQueueLimitToScan) {
+      remainingSize = maxQueueLimitToScan - size;
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+          + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
+    }
+    return remainingSize;
   }
 
   class SPSTraverseInfo extends TraverseInfo {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
index cd6ad22..e0f4999 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/SPSPathIds.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 public class SPSPathIds {
 
   // List of pending dir to satisfy the policy
+  // TODO: Make this bounded queue.
   private final Queue<Long> spsDirsToBeTraveresed = new LinkedList<Long>();
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
index 89799fc..4ddfe2e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/sps/StoragePolicySatisfier.java
@@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 import org.apache.hadoop.hdfs.util.StripedBlockUtil;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -174,10 +175,11 @@ public class StoragePolicySatisfier implements SPSService, Runnable
{
       return;
     }
     if (reconfigStart) {
-      LOG.info("Starting StoragePolicySatisfier, as admin requested to "
-          + "start it.");
+      LOG.info("Starting {} StoragePolicySatisfier, as admin requested to "
+          + "start it.", StringUtils.toLowerCase(spsMode.toString()));
     } else {
-      LOG.info("Starting StoragePolicySatisfier.");
+      LOG.info("Starting {} StoragePolicySatisfier.",
+          StringUtils.toLowerCase(spsMode.toString()));
     }
 
     // Ensure that all the previously submitted block movements(if any) have to
@@ -243,7 +245,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
 
   @Override
   public void run() {
-    while (ctxt.isRunning()) {
+    while (isRunning) {
+      // Check if dependent service is running
+      if (!ctxt.isRunning()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Upstream service is down, skipping the sps work.");
+        }
+        continue;
+      }
       try {
         if (!ctxt.isInSafeMode()) {
           ItemInfo itemInfo = storageMovementNeeded.get();
@@ -284,33 +293,39 @@ public class StoragePolicySatisfier implements SPSService, Runnable
{
                 // Just add to monitor, so it will be tracked for report and
                 // be removed on storage movement attempt finished report.
               case BLOCKS_TARGETS_PAIRED:
+                if (LOG.isDebugEnabled()) {
+                  LOG.debug("Block analysis status:{} for the file path:{}."
+                      + " Adding to attempt monitor queue for the storage "
+                      + "movement attempt finished report",
+                      status.status, fileStatus.getPath());
+                }
                 this.storageMovementsMonitor.add(new AttemptedItemInfo(itemInfo
                     .getStartId(), itemInfo.getFileId(), monotonicNow(),
                     status.assignedBlocks, itemInfo.getRetryCount()));
                 break;
               case NO_BLOCKS_TARGETS_PAIRED:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + trackId
-                      + " back to retry queue as none of the blocks"
-                      + " found its eligible targets.");
+                  LOG.debug("Adding trackID:{} for the file path:{} back to"
+                      + " retry queue as none of the blocks found its eligible"
+                      + " targets.", trackId, fileStatus.getPath());
                 }
                 itemInfo.increRetryCount();
                 this.storageMovementNeeded.add(itemInfo);
                 break;
               case FEW_LOW_REDUNDANCY_BLOCKS:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + trackId
-                      + " back to retry queue as some of the blocks"
-                      + " are low redundant.");
+                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                      + "retry queue as some of the blocks are low redundant.",
+                      trackId, fileStatus.getPath());
                 }
                 itemInfo.increRetryCount();
                 this.storageMovementNeeded.add(itemInfo);
                 break;
               case BLOCKS_FAILED_TO_MOVE:
                 if (LOG.isDebugEnabled()) {
-                  LOG.debug("Adding trackID " + trackId
-                      + " back to retry queue as some of the blocks"
-                      + " movement failed.");
+                  LOG.debug("Adding trackID:{} for the file path:{} back to "
+                      + "retry queue as some of the blocks movement failed.",
+                      trackId, fileStatus.getPath());
                 }
                 this.storageMovementNeeded.add(itemInfo);
                 break;
@@ -318,8 +333,9 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
               case BLOCKS_TARGET_PAIRING_SKIPPED:
               case BLOCKS_ALREADY_SATISFIED:
               default:
-                LOG.info("Block analysis skipped or blocks already satisfied"
-                    + " with storages. So, Cleaning up the Xattrs.");
+                LOG.info("Block analysis status:{} for the file path:{}."
+                    + " So, Cleaning up the Xattrs.", status.status,
+                    fileStatus.getPath());
                 storageMovementNeeded.removeItemTrackInfo(itemInfo, true);
                 break;
               }
@@ -346,20 +362,20 @@ public class StoragePolicySatisfier implements SPSService, Runnable
{
     if (isRunning) {
       synchronized (this) {
         if (isRunning) {
-          isRunning = false;
-          // Stopping monitor thread and clearing queues as well
-          this.clearQueues();
-          this.storageMovementsMonitor.stopGracefully();
-          if (!(t instanceof InterruptedException)) {
-            LOG.info("StoragePolicySatisfier received an exception"
-                + " while shutting down.", t);
+          if (t instanceof InterruptedException) {
+            isRunning = false;
+            LOG.info("Stopping StoragePolicySatisfier.");
+            // Stopping monitor thread and clearing queues as well
+            this.clearQueues();
+            this.storageMovementsMonitor.stopGracefully();
+          } else {
+            LOG.error(
+                "StoragePolicySatisfier thread received runtime exception, "
+                    + "ignoring", t);
           }
-          LOG.info("Stopping StoragePolicySatisfier.");
         }
       }
     }
-    LOG.error("StoragePolicySatisfier thread received runtime exception. "
-        + "Stopping Storage policy satisfier work", t);
     return;
   }
 
@@ -374,9 +390,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
     final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
     if (!lastBlkComplete) {
       // Postpone, currently file is under construction
-      // So, should we add back? or leave it to user
-      LOG.info("BlockCollectionID: {} file is under construction. So, postpone"
-          + " this to the next retry iteration", fileInfo.getFileId());
+      LOG.info("File: {} is under construction. So, postpone"
+          + " this to the next retry iteration", fileInfo.getPath());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.ANALYSIS_SKIPPED_FOR_RETRY,
           new ArrayList<>());
@@ -384,8 +399,8 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
 
     List<LocatedBlock> blocks = locatedBlocks.getLocatedBlocks();
     if (blocks.size() == 0) {
-      LOG.info("BlockCollectionID: {} file is not having any blocks."
-          + " So, skipping the analysis.", fileInfo.getFileId());
+      LOG.info("File: {} is not having any blocks."
+          + " So, skipping the analysis.", fileInfo.getPath());
       return new BlocksMovingAnalysis(
           BlocksMovingAnalysis.Status.BLOCKS_TARGET_PAIRING_SKIPPED,
           new ArrayList<>());
@@ -970,4 +985,12 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
   public void markScanCompletedForPath(Long inodeId) {
     getStorageMovementQueue().markScanCompletedForDir(inodeId);
   }
+
+  /**
+   * Join main SPS thread.
+   */
+  public void join() throws InterruptedException {
+    //TODO Add join here on SPS rpc server also
+    storagePolicySatisfierThread.join();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
index a1c8eec..4a762649 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSBlockMoveTaskHandler.java
@@ -110,7 +110,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler
{
   /**
    * Initializes block movement tracker daemon and starts the thread.
    */
-  void init() {
+  public void init() {
     movementTrackerThread = new Daemon(this.blkMovementTracker);
     movementTrackerThread.setName("BlockStorageMovementTracker");
     movementTrackerThread.start();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
index e5b04ba..e3b3bbb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSContext.java
@@ -19,19 +19,13 @@
 package org.apache.hadoop.hdfs.server.sps;
 
 import java.io.IOException;
-import java.net.URI;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnresolvedLinkException;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
@@ -57,13 +51,12 @@ public class ExternalSPSContext implements Context {
       LoggerFactory.getLogger(ExternalSPSContext.class);
   private SPSService service;
   private NameNodeConnector nnc = null;
-  private Object nnConnectionLock = new Object();
   private BlockStoragePolicySuite createDefaultSuite =
       BlockStoragePolicySuite.createDefaultSuite();
 
-  public ExternalSPSContext(SPSService service) {
+  public ExternalSPSContext(SPSService service, NameNodeConnector nnc) {
     this.service = service;
-    initializeNamenodeConnector();
+    this.nnc = nnc;
   }
 
   @Override
@@ -73,7 +66,6 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public boolean isInSafeMode() {
-    initializeNamenodeConnector();
     try {
       return nnc != null ? nnc.getDistributedFileSystem().isInSafeMode()
           : false;
@@ -85,7 +77,6 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public boolean isMoverRunning() {
-    initializeNamenodeConnector();
     try {
       FSDataOutputStream out = nnc.getDistributedFileSystem()
           .append(HdfsServerConstants.MOVER_ID_PATH);
@@ -101,7 +92,6 @@ public class ExternalSPSContext implements Context {
   @Override
   public long getFileID(String path) throws UnresolvedLinkException,
       AccessControlException, ParentNotDirectoryException {
-    initializeNamenodeConnector();
     HdfsFileStatus fs = null;
     try {
       fs = (HdfsFileStatus) nnc.getDistributedFileSystem().getFileStatus(
@@ -121,7 +111,6 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public boolean isFileExist(long inodeId) {
-    initializeNamenodeConnector();
     String filePath = null;
     try {
       filePath = getFilePath(inodeId);
@@ -145,14 +134,12 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public void removeSPSHint(long inodeId) throws IOException {
-    initializeNamenodeConnector();
     nnc.getDistributedFileSystem().removeXAttr(new Path(getFilePath(inodeId)),
         HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY);
   }
 
   @Override
   public int getNumLiveDataNodes() {
-    initializeNamenodeConnector();
     try {
       return nnc.getDistributedFileSystem()
           .getDataNodeStats(DatanodeReportType.LIVE).length;
@@ -164,7 +151,6 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public HdfsFileStatus getFileInfo(long inodeID) throws IOException {
-    initializeNamenodeConnector();
     return nnc.getDistributedFileSystem().getClient()
         .getLocatedFileInfo(getFilePath(inodeID), false);
   }
@@ -172,13 +158,11 @@ public class ExternalSPSContext implements Context {
   @Override
   public DatanodeStorageReport[] getLiveDatanodeStorageReport()
       throws IOException {
-    initializeNamenodeConnector();
     return nnc.getLiveDatanodeStorageReport();
   }
 
   @Override
   public boolean hasLowRedundancyBlocks(long inodeID) {
-    initializeNamenodeConnector();
     try {
       return nnc.getNNProtocolConnection().hasLowRedundancyBlocks(inodeID);
     } catch (IOException e) {
@@ -191,7 +175,6 @@ public class ExternalSPSContext implements Context {
   @Override
   public boolean checkDNSpaceForScheduling(DatanodeInfo dn, StorageType type,
       long estimatedSize) {
-    initializeNamenodeConnector();
     try {
       return nnc.getNNProtocolConnection().checkDNSpaceForScheduling(dn, type,
           estimatedSize);
@@ -204,7 +187,6 @@ public class ExternalSPSContext implements Context {
 
   @Override
   public Long getNextSPSPathId() {
-    initializeNamenodeConnector();
     try {
       return nnc.getNNProtocolConnection().getNextSPSPathId();
     } catch (IOException e) {
@@ -233,39 +215,4 @@ public class ExternalSPSContext implements Context {
       return null;
     }
   }
-
-  @Override
-  public void close() throws IOException {
-    synchronized (nnConnectionLock) {
-      if (nnc != null) {
-        nnc.close();
-      }
-    }
-  }
-
-  private void initializeNamenodeConnector() {
-    synchronized (nnConnectionLock) {
-      if (nnc == null) {
-        try {
-          nnc = getNameNodeConnector(service.getConf());
-        } catch (IOException e) {
-          LOG.warn("Exception while creating Namenode Connector.."
-              + "Namenode might not have started.", e);
-        }
-      }
-    }
-  }
-
-  public static NameNodeConnector getNameNodeConnector(Configuration conf)
-      throws IOException {
-    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
-    List<NameNodeConnector> nncs = Collections.emptyList();
-    NameNodeConnector.checkOtherInstanceRunning(false);
-    nncs = NameNodeConnector.newNameNodeConnectors(namenodes,
-        ExternalSPSContext.class.getSimpleName(),
-        HdfsServerConstants.MOVER_ID_PATH, conf,
-        NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
-    return nncs.get(0);
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
index 964ee8c..ff277ba 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalSPSFileIDCollector.java
@@ -139,11 +139,15 @@ public class ExternalSPSFileIDCollector implements FileIdCollector {
    */
   public int remainingCapacity() {
     int size = service.processingQueueSize();
-    if (size >= maxQueueLimitToScan) {
-      return 0;
-    } else {
-      return (maxQueueLimitToScan - size);
+    int remainingSize = 0;
+    if (size < maxQueueLimitToScan) {
+      remainingSize = maxQueueLimitToScan - size;
+    }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SPS processing Q -> maximum capacity:{}, current size:{},"
+          + " remaining size:{}", maxQueueLimitToScan, size, remainingSize);
     }
+    return remainingSize;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
new file mode 100644
index 0000000..c64abc3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/sps/ExternalStoragePolicySatisfier.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.sps;
+
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class starts and runs external SPS service.
+ */
+@InterfaceAudience.Private
+public class ExternalStoragePolicySatisfier {
+  public static final Logger LOG = LoggerFactory
+      .getLogger(ExternalStoragePolicySatisfier.class);
+
+  /**
+   * Main method to start SPS service.
+   */
+  public static void main(String args[]) throws Exception {
+    NameNodeConnector nnc = null;
+    try {
+      StringUtils.startupShutdownMessage(StoragePolicySatisfier.class, args,
+          LOG);
+      HdfsConfiguration spsConf = new HdfsConfiguration();
+      //TODO : login with SPS keytab
+      StoragePolicySatisfier sps = new StoragePolicySatisfier(spsConf);
+      nnc = getNameNodeConnector(spsConf);
+
+      boolean spsRunning;
+      spsRunning = nnc.getDistributedFileSystem().getClient()
+          .isStoragePolicySatisfierRunning();
+      if (spsRunning) {
+        throw new RuntimeException(
+            "Startup failed due to StoragePolicySatisfier"
+                + " running inside Namenode.");
+      }
+
+      ExternalSPSContext context = new ExternalSPSContext(sps, nnc);
+      ExternalBlockMovementListener blkMoveListener =
+          new ExternalBlockMovementListener();
+      ExternalSPSBlockMoveTaskHandler externalHandler =
+          new ExternalSPSBlockMoveTaskHandler(spsConf, nnc, sps);
+      externalHandler.init();
+      sps.init(context, new ExternalSPSFileIDCollector(context, sps),
+          externalHandler, blkMoveListener);
+      sps.start(true, StoragePolicySatisfierMode.EXTERNAL);
+      if (sps != null) {
+        sps.join();
+      }
+    } catch (Throwable e) {
+      LOG.error("Failed to start storage policy satisfier.", e);
+      terminate(1, e);
+    } finally {
+      if (nnc != null) {
+        nnc.close();
+      }
+    }
+  }
+
+  private static NameNodeConnector getNameNodeConnector(Configuration conf)
+      throws IOException, InterruptedException {
+    final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
+    final Path externalSPSPathId = HdfsServerConstants.MOVER_ID_PATH;
+    while (true) {
+      try {
+        final List<NameNodeConnector> nncs = NameNodeConnector
+            .newNameNodeConnectors(namenodes,
+                StoragePolicySatisfier.class.getSimpleName(),
+                externalSPSPathId, conf,
+                NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+        return nncs.get(0);
+      } catch (IOException e) {
+        LOG.warn("Failed to connect with namenode", e);
+        Thread.sleep(3000); // retry the connection after few secs
+      }
+    }
+  }
+
+  /**
+   * It is implementation of BlockMovementListener.
+   */
+  private static class ExternalBlockMovementListener
+      implements BlockMovementListener {
+
+    private List<Block> actualBlockMovements = new ArrayList<>();
+
+    @Override
+    public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
+      for (Block block : moveAttemptFinishedBlks) {
+        actualBlockMovements.add(block);
+      }
+      LOG.info("Movement attempted blocks", actualBlockMovements);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 6b52c8f..75f046e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -236,5 +236,13 @@ Check the running status of Storage Policy Satisfier service in namenode.
If it
 ### Enable(internal service inside NN or external service outside NN) or Disable SPS without
restarting Namenode
 If administrator wants to switch modes of SPS feature while Namenode is running, first he/she
needs to update the desired value(internal or external or none) for the configuration item
`dfs.storage.policy.satisfier.mode` in configuration file (`hdfs-site.xml`) and then run the
following Namenode reconfig command
 
-+       hdfs dfsadmin -reconfig namenode <host:ipc_port> start
+* Command:
+
+       hdfs dfsadmin -reconfig namenode <host:ipc_port> start
+
+### Start External SPS Service.
+If administrator wants to start external sps, first he/she needs to configure property `dfs.storage.policy.satisfier.mode`
with `external` value in configuration file (`hdfs-site.xml`) and then run Namenode reconfig
command. After this start external sps service using following command
+
+* Command:
 
+      hdfs --daemon start sps

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
index 935d4f2..135d996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestStoragePolicySatisfier.java
@@ -603,7 +603,7 @@ public class TestStoragePolicySatisfier {
       if (out != null) {
         out.close();
       }
-      hdfsCluster.shutdown();
+      shutdownCluster();
     }
   }
 
@@ -626,9 +626,7 @@ public class TestStoragePolicySatisfier {
       Assert.assertTrue("SPS should be running as "
           + "no Mover really running", running);
     } finally {
-      if (hdfsCluster != null) {
-        hdfsCluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -672,9 +670,7 @@ public class TestStoragePolicySatisfier {
       DFSTestUtil.waitExpectedStorageType(
           file1, StorageType.DISK, 2, 30000, dfs);
     } finally {
-      if (hdfsCluster != null) {
-        hdfsCluster.shutdown();
-      }
+      shutdownCluster();
     }
   }
 
@@ -1381,7 +1377,11 @@ public class TestStoragePolicySatisfier {
     // Remove 10 element and make queue free, So other traversing will start.
     for (int i = 0; i < 10; i++) {
       String path = expectedTraverseOrder.remove(0);
-      long trackId = sps.getStorageMovementQueue().get().getFileId();
+      ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+      if (itemInfo == null) {
+        continue;
+      }
+      long trackId = itemInfo.getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));
@@ -1392,7 +1392,11 @@ public class TestStoragePolicySatisfier {
     // Check other element traversed in order and E, M, U, R, S should not be
     // added in queue which we already removed from expected list
     for (String path : expectedTraverseOrder) {
-      long trackId = sps.getStorageMovementQueue().get().getFileId();
+      ItemInfo itemInfo = sps.getStorageMovementQueue().get();
+      if (itemInfo == null) {
+        continue;
+      }
+      long trackId = itemInfo.getFileId();
       INode inode = fsDir.getInode(trackId);
       assertTrue("Failed to traverse tree, expected " + path + " but got "
           + inode.getFullPathName(), path.equals(inode.getFullPathName()));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3b0deb6b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
index fe08b8f..febc2ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/sps/TestExternalStoragePolicySatisfier.java
@@ -22,7 +22,6 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
@@ -43,8 +42,6 @@ import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
 import org.junit.Assert;
 import org.junit.Ignore;
 
-import com.google.common.collect.Maps;
-
 /**
  * Tests the external sps service plugins.
  */
@@ -95,7 +92,8 @@ public class TestExternalStoragePolicySatisfier
     SPSService spsService = blkMgr.getSPSService();
     spsService.stopGracefully();
 
-    ExternalSPSContext context = new ExternalSPSContext(spsService);
+    ExternalSPSContext context = new ExternalSPSContext(spsService,
+        getNameNodeConnector(conf));
 
     ExternalBlockMovementListener blkMoveListener =
         new ExternalBlockMovementListener();
@@ -124,7 +122,8 @@ public class TestExternalStoragePolicySatisfier
     spsService = blkMgr.getSPSService();
     spsService.stopGracefully();
 
-    ExternalSPSContext context = new ExternalSPSContext(spsService);
+    ExternalSPSContext context = new ExternalSPSContext(spsService,
+        getNameNodeConnector(getConf()));
     ExternalBlockMovementListener blkMoveListener =
         new ExternalBlockMovementListener();
     ExternalSPSBlockMoveTaskHandler externalHandler =
@@ -161,16 +160,22 @@ public class TestExternalStoragePolicySatisfier
       throws IOException {
     final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
     Assert.assertEquals(1, namenodes.size());
-    Map<URI, List<Path>> nnMap = Maps.newHashMap();
-    for (URI nn : namenodes) {
-      nnMap.put(nn, null);
-    }
     final Path externalSPSPathId = new Path("/system/tmp.id");
-    final List<NameNodeConnector> nncs = NameNodeConnector
-        .newNameNodeConnectors(nnMap,
-            StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
-            conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
-    return nncs.get(0);
+    NameNodeConnector.checkOtherInstanceRunning(false);
+    while (true) {
+      try {
+        final List<NameNodeConnector> nncs = NameNodeConnector
+            .newNameNodeConnectors(namenodes,
+                StoragePolicySatisfier.class.getSimpleName(),
+                externalSPSPathId, conf,
+                NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
+        return nncs.get(0);
+      } catch (IOException e) {
+        LOG.warn("Failed to connect with namenode", e);
+        // Ignore
+      }
+
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message