hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rake...@apache.org
Subject hadoop git commit: HDFS-11243. [SPS]: Add a protocol command from NN to DN for dropping the SPS work and queues. Contributed by Uma Maheswara Rao G
Date Tue, 31 Jan 2017 18:15:34 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-10285 bd419bbe1 -> 71c9da2ca


HDFS-11243. [SPS]: Add a protocol command from NN to DN for dropping the SPS work and queues.
Contributed by Uma Maheswara Rao G


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/71c9da2c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/71c9da2c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/71c9da2c

Branch: refs/heads/HDFS-10285
Commit: 71c9da2caeede5a93133543f945f09814cdfc7bb
Parents: bd419bb
Author: Rakesh Radhakrishnan <rakeshr@apache.org>
Authored: Tue Jan 31 23:44:01 2017 +0530
Committer: Rakesh Radhakrishnan <rakeshr@apache.org>
Committed: Tue Jan 31 23:44:01 2017 +0530

----------------------------------------------------------------------
 .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 12 ++++
 .../server/blockmanagement/BlockManager.java    | 13 ++---
 .../blockmanagement/DatanodeDescriptor.java     | 18 ++++++
 .../server/blockmanagement/DatanodeManager.java | 19 +++++++
 .../hdfs/server/datanode/BPOfferService.java    |  4 ++
 .../datanode/BlockStorageMovementTracker.java   | 12 ++++
 .../datanode/StoragePolicySatisfyWorker.java    | 22 +++++++-
 .../server/namenode/StoragePolicySatisfier.java | 25 +++++++--
 .../hdfs/server/protocol/DatanodeProtocol.java  |  2 +
 .../server/protocol/DropSPSWorkCommand.java     | 36 ++++++++++++
 .../src/main/proto/DatanodeProtocol.proto       |  9 +++
 .../TestStoragePolicySatisfyWorker.java         | 59 ++++++++++++++++++++
 12 files changed, 216 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
index 8655809..4e4587f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
@@ -40,6 +40,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdComma
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DropSPSWorkCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
@@ -102,6 +103,7 @@ import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.hdfs.server.protocol.DropSPSWorkCommand;
 import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
 import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
 import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@@ -131,6 +133,10 @@ public class PBHelper {
   private static final RegisterCommandProto REG_CMD_PROTO = 
       RegisterCommandProto.newBuilder().build();
   private static final RegisterCommand REG_CMD = new RegisterCommand();
+  private static final DropSPSWorkCommandProto DROP_SPS_WORK_CMD_PROTO =
+      DropSPSWorkCommandProto.newBuilder().build();
+  private static final DropSPSWorkCommand DROP_SPS_WORK_CMD =
+      new DropSPSWorkCommand();
 
   private PBHelper() {
     /** Hidden constructor */
@@ -465,6 +471,8 @@ public class PBHelper {
       return PBHelper.convert(proto.getBlkECReconstructionCmd());
     case BlockStorageMovementCommand:
       return PBHelper.convert(proto.getBlkStorageMovementCmd());
+    case DropSPSWorkCommand:
+      return DROP_SPS_WORK_CMD;
     default:
       return null;
     }
@@ -604,6 +612,10 @@ public class PBHelper {
           .setBlkStorageMovementCmd(
               convert((BlockStorageMovementCommand) datanodeCommand));
       break;
+    case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
+      builder.setCmdType(DatanodeCommandProto.Type.DropSPSWorkCommand)
+          .setDropSPSWorkCmd(DROP_SPS_WORK_CMD_PROTO);
+      break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
       builder.setCmdType(DatanodeCommandProto.Type.NullDatanodeCommand);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 0e850f6..e2a1d60 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -649,13 +649,13 @@ public class BlockManager implements BlockStatsMXBean {
     mxBeanName = MBeans.register("NameNode", "BlockStats", this);
     bmSafeMode.activate(blockTotal);
     if (sps != null && !haEnabled) {
-      sps.start();
+      sps.start(false);
     }
   }
 
   public void close() {
     if (sps != null) {
-      sps.stop();
+      sps.stop(false);
     }
     bmSafeMode.close();
     try {
@@ -4780,7 +4780,7 @@ public class BlockManager implements BlockStatsMXBean {
       return;
     }
 
-    sps.start();
+    sps.start(true);
   }
 
   /**
@@ -4794,12 +4794,7 @@ public class BlockManager implements BlockStatsMXBean {
       LOG.info("Storage policy satisfier is already stopped.");
       return;
     }
-    sps.stop();
-    // TODO: add command to DNs for stop in-progress processing SPS commands?
-    // to avoid confusions in cluster, I think sending commands from centralized
-    // place would be better to drop pending queues at DN. Anyway in progress
-    // work will be finished in a while, but this command can void starting
-    // fresh movements at DN.
+    sps.stop(true);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 4339439..c58b81f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -211,6 +211,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
    */
   private final Queue<BlockStorageMovementInfosBatch> storageMovementBlocks =
       new LinkedList<>();
+  private volatile boolean dropSPSWork = false;
 
   /* Variables for maintaining number of blocks scheduled to be written to
    * this storage. This count is approximate and might be slightly bigger
@@ -967,4 +968,21 @@ public class DatanodeDescriptor extends DatanodeInfo {
       return storageMovementBlocks.poll();
     }
   }
+
+  /**
+   * Set whether to drop SPS related queues at DN side.
+   *
+   * @param dropSPSWork
+   *          - true if need to drop SPS queues, otherwise false.
+   */
+  public synchronized void setDropSPSWork(boolean dropSPSWork) {
+    this.dropSPSWork = dropSPSWork;
+  }
+
+  /**
+   * @return true if need to drop SPS queues at DN.
+   */
+  public synchronized boolean shouldDropSPSWork() {
+    return this.dropSPSWork;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index 57de7f5..ff3c209 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -1644,6 +1644,13 @@ public class DatanodeManager {
           blkStorageMovementInfosBatch.getBlockMovingInfo()));
     }
 
+    if (nodeinfo.shouldDropSPSWork()) {
+      cmds.add(DropSPSWorkCommand.DNA_DROP_SPS_WORK_COMMAND);
+      // Set back to false to indicate that the new value has been sent to the
+      // datanode.
+      nodeinfo.setDropSPSWork(false);
+    }
+
     if (!cmds.isEmpty()) {
       return cmds.toArray(new DatanodeCommand[cmds.size()]);
     }
@@ -1846,5 +1853,17 @@ public class DatanodeManager {
     this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds),
         DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT);
   }
+
+  /**
+   * Mark all DNs to drop SPS queues. A DNA_DROP_SPS_WORK_COMMAND will be added
+   * in heartbeat response, which will indicate DN to drop SPS queues
+   */
+  public void addDropSPSWorkCommandsToAllDNs() {
+    synchronized (this) {
+      for (DatanodeDescriptor dn : datanodeMap.values()) {
+        dn.setDropSPSWork(true);
+      }
+    }
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index f5f6738..d2bad19 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -739,6 +739,10 @@ class BPOfferService {
           blkSPSCmd.getTrackID(), blkSPSCmd.getBlockPoolId(),
           blkSPSCmd.getBlockMovingTasks());
       break;
+    case DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND:
+      LOG.info("DatanodeCommand action: DNA_DROP_SPS_WORK_COMMAND");
+      dn.getStoragePolicySatisfyWorker().dropSPSWork();
+      break;
     default:
       LOG.warn("Unknown DatanodeCommand action: " + cmd.getAction());
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
index bd35b09..e623cef 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -146,4 +146,16 @@ public class BlockStorageMovementTracker implements Runnable {
       moverTaskFutures.notify();
     }
   }
+
+  /**
+   * Clear the pending movement and movement result queues.
+   */
+  void removeAll() {
+    synchronized (moverTaskFutures) {
+      moverTaskFutures.clear();
+    }
+    synchronized (movementResults) {
+      movementResults.clear();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 1648d8d..cebddfd 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -115,7 +115,6 @@ public class StoragePolicySatisfyWorker {
         TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
         new Daemon.DaemonFactory() {
           private final AtomicInteger threadIndex = new AtomicInteger(0);
-
           @Override
           public Thread newThread(Runnable r) {
             Thread t = super.newThread(r);
@@ -420,10 +419,31 @@ public class StoragePolicySatisfyWorker {
         }
       }
     }
+
+    /**
+     * Clear the trackID vs movement status tracking map.
+     */
+    void removeAll() {
+      synchronized (trackIdVsMovementStatus) {
+        trackIdVsMovementStatus.clear();
+      }
+    }
+
   }
 
   @VisibleForTesting
   BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
     return handler;
   }
+
+  /**
+   * Drop the in-progress SPS work queues.
+   */
+  public void dropSPSWork() {
+    LOG.info("Received request to drop StoragePolicySatisfierWorker queues. "
+        + "So, none of the SPS Worker queued block movements will"
+        + " be scheduled.");
+    movementTracker.removeAll();
+    handler.removeAll();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
index 1c48910..dc58294 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -99,9 +99,14 @@ public class StoragePolicySatisfier implements Runnable {
    * Start storage policy satisfier demon thread. Also start block storage
    * movements monitor for retry the attempts if needed.
    */
-  public synchronized void start() {
+  public synchronized void start(boolean reconfigStart) {
     isRunning = true;
-    LOG.info("Starting StoragePolicySatisfier.");
+    if (reconfigStart) {
+      LOG.info("Starting StoragePolicySatisfier, as admin requested to "
+          + "activate it.");
+    } else {
+      LOG.info("Starting StoragePolicySatisfier.");
+    }
     storagePolicySatisfierThread = new Daemon(this);
     storagePolicySatisfierThread.setName("StoragePolicySatisfier");
     storagePolicySatisfierThread.start();
@@ -110,10 +115,17 @@ public class StoragePolicySatisfier implements Runnable {
 
   /**
    * Stop storage policy satisfier demon thread.
+   *
+   * @param reconfigStop
    */
-  public synchronized void stop() {
+  public synchronized void stop(boolean reconfigStop) {
     isRunning = false;
-    LOG.info("Stopping StoragePolicySatisfier.");
+    if (reconfigStop) {
+      LOG.info("Stopping StoragePolicySatisfier, as admin requested to "
+          + "deactivate it.");
+    } else {
+      LOG.info("Stopping StoragePolicySatisfier.");
+    }
     if (storagePolicySatisfierThread == null) {
       return;
     }
@@ -123,7 +135,10 @@ public class StoragePolicySatisfier implements Runnable {
     } catch (InterruptedException ie) {
     }
     this.storageMovementsMonitor.stop();
-    this.clearQueues();
+    if (reconfigStop) {
+      this.clearQueues();
+      this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs();
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
index 38d2680..f1bfe59 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
@@ -78,6 +78,8 @@ public interface DatanodeProtocol {
   final static int DNA_UNCACHE = 10;   // uncache blocks
   final static int DNA_ERASURE_CODING_RECONSTRUCTION = 11; // erasure coding reconstruction
command
   final static int DNA_BLOCK_STORAGE_MOVEMENT = 12; // block storage movement command
+  final static int DNA_DROP_SPS_WORK_COMMAND = 13; // block storage movement
+                                                   // command
 
   /** 
    * Register Datanode.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
new file mode 100644
index 0000000..806f713
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DropSPSWorkCommand.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+
+/**
+ * A DropSPSWorkCommand is an instruction to a datanode to drop the SPSWorker's
+ * pending block storage movement queues.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class DropSPSWorkCommand extends DatanodeCommand {
+  public static final DropSPSWorkCommand DNA_DROP_SPS_WORK_COMMAND =
+      new DropSPSWorkCommand();
+
+  public DropSPSWorkCommand() {
+    super(DatanodeProtocol.DNA_DROP_SPS_WORK_COMMAND);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
index cc379fa..08bc547 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
@@ -61,6 +61,7 @@ message DatanodeCommandProto {
     BlockIdCommand = 8;
     BlockECReconstructionCommand = 9;
     BlockStorageMovementCommand = 10;
+    DropSPSWorkCommand = 11;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -76,6 +77,7 @@ message DatanodeCommandProto {
   optional BlockIdCommandProto blkIdCmd = 8;
   optional BlockECReconstructionCommandProto blkECReconstructionCmd = 9;
   optional BlockStorageMovementCommandProto blkStorageMovementCmd = 10;
+  optional DropSPSWorkCommandProto dropSPSWorkCmd = 11;
 }
 
 /**
@@ -166,6 +168,13 @@ message BlockStorageMovementCommandProto {
 }
 
 /**
+ * Instruct datanode to drop SPS work queues
+ */
+message DropSPSWorkCommandProto {
+  // void
+}
+
+/**
  * Block storage movement information
  */
 message BlockStorageMovementProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/71c9da2c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index 8e02d41..86b8b50 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
+import static org.junit.Assert.*;
+
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.ArrayList;
@@ -186,6 +188,63 @@ public class TestStoragePolicySatisfyWorker {
     waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
   }
 
+  /**
+   * Tests that drop SPS work method clears all the queues.
+   *
+   * @throws Exception
+   */
+  @Test(timeout = 120000)
+  public void testDropSPSWork() throws Exception {
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(20).build();
+
+    cluster.waitActive();
+    final DistributedFileSystem dfs = cluster.getFileSystem();
+    final String file = "/testDropSPSWork";
+    DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 50 * 100,
+        DEFAULT_BLOCK_SIZE, (short) 2, 0, false, null);
+
+    // move to ARCHIVE
+    dfs.setStoragePolicy(new Path(file), "COLD");
+
+    DataNode src = cluster.getDataNodes().get(2);
+    DatanodeInfo targetDnInfo =
+        DFSTestUtil.getLocalDatanodeInfo(src.getXferPort());
+
+    StoragePolicySatisfyWorker worker =
+        new StoragePolicySatisfyWorker(conf, src);
+    List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+    List<LocatedBlock> locatedBlocks =
+        dfs.getClient().getLocatedBlocks(file, 0).getLocatedBlocks();
+    for (LocatedBlock locatedBlock : locatedBlocks) {
+      BlockMovingInfo blockMovingInfo =
+          prepareBlockMovingInfo(locatedBlock.getBlock().getLocalBlock(),
+              locatedBlock.getLocations()[0], targetDnInfo,
+              locatedBlock.getStorageTypes()[0], StorageType.ARCHIVE);
+      blockMovingInfos.add(blockMovingInfo);
+    }
+    INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+    worker.processBlockMovingTasks(inode.getId(),
+        cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+    // Wait till results queue build up
+    waitForBlockMovementResult(worker, inode.getId(), 30000);
+    worker.dropSPSWork();
+    assertTrue(worker.getBlocksMovementsCompletionHandler()
+        .getBlksMovementResults().size() == 0);
+  }
+
+  private void waitForBlockMovementResult(
+      final StoragePolicySatisfyWorker worker, final long inodeId, int timeout)
+          throws Exception {
+    GenericTestUtils.waitFor(new Supplier<Boolean>() {
+      @Override
+      public Boolean get() {
+        List<BlocksStorageMovementResult> completedBlocks = worker
+            .getBlocksMovementsCompletionHandler().getBlksMovementResults();
+        return completedBlocks.size() > 0;
+      }
+    }, 100, timeout);
+  }
+
   private void waitForBlockMovementCompletion(
       final StoragePolicySatisfyWorker worker, final long inodeId,
       int expectedFailedItemsCount, int timeout) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message