hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yq...@apache.org
Subject hadoop git commit: HDFS-12443. Ozone: Improve SCM block deletion throttling algorithm. Contributed by Yiqun Lin.
Date Mon, 06 Nov 2017 12:23:44 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 013c36f3c -> f6b937810


HDFS-12443. Ozone: Improve SCM block deletion throttling algorithm. Contributed by Yiqun Lin.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f6b93781
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f6b93781
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f6b93781

Branch: refs/heads/HDFS-7240
Commit: f6b937810228a21b0695faccbfd4372dffaf2cc6
Parents: 013c36f
Author: Yiqun Lin <yqlin@apache.org>
Authored: Mon Nov 6 20:21:51 2017 +0800
Committer: Yiqun Lin <yqlin@apache.org>
Committed: Mon Nov 6 20:21:51 2017 +0800

----------------------------------------------------------------------
 .../DeleteBlocksCommandHandler.java             |   4 +-
 .../ozone/scm/StorageContainerManager.java      |   3 +-
 .../hadoop/ozone/scm/block/BlockManager.java    |   5 +
 .../ozone/scm/block/BlockManagerImpl.java       |   7 +-
 .../block/DatanodeDeletedBlockTransactions.java | 130 ++++++++++++
 .../hadoop/ozone/scm/block/DeletedBlockLog.java |  10 +
 .../ozone/scm/block/DeletedBlockLogImpl.java    |  22 +++
 .../scm/block/SCMBlockDeletingService.java      | 197 +++++++------------
 .../ozone/TestStorageContainerManager.java      | 161 +++++++++++----
 .../ozone/scm/block/TestDeletedBlockLog.java    | 102 ++++++++++
 10 files changed, 470 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index f16a4ff..1c859ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -165,7 +165,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
         try {
           containerDB.writeBatch(batch);
           newDeletionBlocks++;
-          LOG.info("Transited Block {} to DELETING state in container {}",
+          LOG.debug("Transited Block {} to DELETING state in container {}",
               blk, containerId);
         } catch (IOException e) {
           // if some blocks failed to delete, we fail this TX,
@@ -175,7 +175,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
               "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
         }
       } else {
-        LOG.info("Block {} not found or already under deletion in"
+        LOG.debug("Block {} not found or already under deletion in"
                 + " container {}, skip deleting it.", blk, containerId);
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index d341e2c..b6f5239 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -811,7 +811,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
                   + "success={}", result.getTxID(), result.getSuccess());
         }
         if (result.getSuccess()) {
-          LOG.info("Purging TXID={} from block deletion log", result.getTxID());
+          LOG.debug("Purging TXID={} from block deletion log",
+              result.getTxID());
           this.getScmBlockManager().getDeletedBlockLog()
               .commitTransactions(Collections.singletonList(result.getTxID()));
         } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
index da13f4a..b0f4da6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
@@ -78,4 +78,9 @@ public interface BlockManager extends Closeable {
    * @throws IOException
    */
   void stop() throws IOException;
+
+  /**
+   * @return the block deleting service executed in SCM.
+   */
+  SCMBlockDeletingService getSCMBlockDeletingService();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index a6b5a5f..ff138e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -148,7 +148,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean
{
     blockDeletingService =
         new SCMBlockDeletingService(
             deletedBlockLog, containerManager, nodeManager, svcInterval,
-            serviceTimeout);
+            serviceTimeout, conf);
   }
 
   /**
@@ -525,4 +525,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean
{
     // factor. Hence returning 0 for now.
     // containers.get(OzoneProtos.LifeCycleState.OPEN).size();
   }
+
+  @Override
+  public SCMBlockDeletingService getSCMBlockDeletingService() {
+    return this.blockDeletingService;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java
new file mode 100644
index 0000000..31c61dd
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DatanodeDeletedBlockTransactions.java
@@ -0,0 +1,130 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.scm.container.Mapping;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+
+import com.google.common.collect.ArrayListMultimap;
+
+/**
+ * A wrapper class to hold info about datanode and all deleted block
+ * transactions that will be sent to this datanode.
+ */
+public class DatanodeDeletedBlockTransactions {
+  private int nodeNum;
+  // The throttle size for each datanode.
+  private int maximumAllowedTXNum;
+  // Current counter of inserted TX.
+  private int currentTXNum;
+  private Mapping mappingService;
+  // A list of TXs mapped to a certain datanode ID.
+  private final ArrayListMultimap<DatanodeID, DeletedBlocksTransaction>
+      transactions;
+
+  DatanodeDeletedBlockTransactions(Mapping mappingService,
+      int maximumAllowedTXNum, int nodeNum) {
+    this.transactions = ArrayListMultimap.create();
+    this.mappingService = mappingService;
+    this.maximumAllowedTXNum = maximumAllowedTXNum;
+    this.nodeNum = nodeNum;
+  }
+
+  public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
+    ContainerInfo info = null;
+    try {
+      info = mappingService.getContainer(tx.getContainerName());
+    } catch (IOException e) {
+      SCMBlockDeletingService.LOG.warn("Got container info error.", e);
+    }
+
+    if (info == null) {
+      SCMBlockDeletingService.LOG.warn(
+          "Container {} not found, continue to process next",
+          tx.getContainerName());
+      return;
+    }
+
+    for (DatanodeID dnID : info.getPipeline().getMachines()) {
+      if (transactions.containsKey(dnID)) {
+        List<DeletedBlocksTransaction> txs = transactions.get(dnID);
+        if (txs != null && txs.size() < maximumAllowedTXNum) {
+          boolean hasContained = false;
+          for (DeletedBlocksTransaction t : txs) {
+            if (t.getContainerName().equals(tx.getContainerName())) {
+              hasContained = true;
+              break;
+            }
+          }
+
+          if (!hasContained) {
+            txs.add(tx);
+            currentTXNum++;
+          }
+        }
+      } else {
+        currentTXNum++;
+        transactions.put(dnID, tx);
+      }
+      SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID,
+          tx.getTxID());
+    }
+  }
+
+  Set<DatanodeID> getDatanodes() {
+    return transactions.keySet();
+  }
+
+  boolean isEmpty() {
+    return transactions.isEmpty();
+  }
+
+  boolean hasTransactions(DatanodeID dnID) {
+    return transactions.containsKey(dnID) && !transactions.get(dnID).isEmpty();
+  }
+
+  List<DeletedBlocksTransaction> getDatanodeTransactions(
+      DatanodeID dnID) {
+    return transactions.get(dnID);
+  }
+
+  List<String> getTransactionIDList(DatanodeID dnID) {
+    if (hasTransactions(dnID)) {
+      return transactions.get(dnID).stream()
+          .map(DeletedBlocksTransaction::getTxID).map(String::valueOf)
+          .collect(Collectors.toList());
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  boolean isFull() {
+    return currentTXNum >= maximumAllowedTXNum * nodeNum;
+  }
+
+  int getTXNum() {
+    return currentTXNum;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
index bcbbe15..d8af853 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -46,6 +46,16 @@ public interface DeletedBlockLog extends Closeable {
       throws IOException;
 
   /**
+   * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
+   * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
+   * stop.
+   * @param transactions a list of TXs will be set into.
+   * @throws IOException
+   */
+  void getTransactions(DatanodeDeletedBlockTransactions transactions)
+      throws IOException;
+
+  /**
    * Return all failed transactions in the log. A transaction is considered
    * to be failed if it has been sent more than MAX_RETRY limit and its
    * count is reset to -1.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
index e7e92d1..d14da62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -326,4 +326,26 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
       deletedStore.close();
     }
   }
+
+  @Override
+  public void getTransactions(DatanodeDeletedBlockTransactions transactions)
+      throws IOException {
+    lock.lock();
+    try {
+      deletedStore.iterate(null, (key, value) -> {
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction block = DeletedBlocksTransaction
+              .parseFrom(value);
+
+          if (block.getCount() > -1 && block.getCount() <= maxRetry) {
+            transactions.addTransaction(block);
+          }
+          return !transactions.isFull();
+        }
+        return true;
+      });
+    } finally {
+      lock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
index 3058b1e..a723c2d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
@@ -16,14 +16,16 @@
  */
 package org.apache.hadoop.ozone.scm.block;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 import org.apache.hadoop.ozone.scm.container.Mapping;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.BackgroundTask;
@@ -32,13 +34,12 @@ import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
+
 import java.io.IOException;
-import java.util.Collections;
 import java.util.List;
-import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * A background service running in SCM to delete blocks. This service scans
@@ -49,7 +50,7 @@ import java.util.stream.Collectors;
  */
 public class SCMBlockDeletingService extends BackgroundService {
 
-  private static final Logger LOG =
+  static final Logger LOG =
       LoggerFactory.getLogger(SCMBlockDeletingService.class);
 
   // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
@@ -58,28 +59,36 @@ public class SCMBlockDeletingService extends BackgroundService {
   private final Mapping mappingService;
   private final NodeManager nodeManager;
 
-  // Default container size is 5G and block size is 256MB, a full container
-  // at most contains 20 blocks. At most each TX contains 20 blocks.
-  // When SCM sends block deletion TXs to datanode, each command we allow
-  // at most 50 containers so that will limit number of to be deleted blocks
-  // less than 1000.
-  // TODO - a better throttle algorithm
-  // Note, this is not an accurate limit of blocks. When we scan
-  // the log, worst case we may get 50 TX for 50 different datanodes,
-  // that will cause the deletion message sent by SCM extremely small.
-  // As a result, the deletion will be slow. An improvement is to scan
-  // log multiple times until we get enough TXs for each datanode, or
-  // the entire log is scanned.
-  private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
+  // Block delete limit size is dynamically calculated based on container
+  // delete limit size (ozone.block.deleting.container.limit.per.interval)
+  // that configured for datanode. To ensure DN not wait for
+  // delete commands, we use this value multiply by a factor 2 as the final
+  // limit TX size for each node.
+  // Currently we implement a throttle algorithm that throttling delete blocks
+  // for each datanode. Each node is limited by the calculation size. Firstly
+  // current node info is fetched from nodemanager, then scan entire delLog
+  // from the beginning to end. If one node reaches maximum value, its records
+  // will be skipped. If not, keep scanning until it reaches maximum value.
+  // Once all node are full, the scan behavior will stop.
+  private int blockDeleteLimitSize;
 
   public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
       Mapping mapper, NodeManager nodeManager,
-      int interval, long serviceTimeout) {
+      int interval, long serviceTimeout, Configuration conf) {
     super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
         BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
     this.deletedBlockLog = deletedBlockLog;
     this.mappingService = mapper;
     this.nodeManager = nodeManager;
+
+    int containerLimit = conf.getInt(
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL,
+        OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT);
+    Preconditions.checkArgument(containerLimit > 0,
+        "Container limit size should be " + "positive.");
+    // Use container limit value multiply by a factor 2 to ensure DN
+    // not wait for orders.
+    this.blockDeleteLimitSize = containerLimit * 2;
   }
 
   @Override
@@ -104,126 +113,60 @@ public class SCMBlockDeletingService extends BackgroundService {
       // Scan SCM DB in HB interval and collect a throttled list of
       // to delete blocks.
       LOG.debug("Running DeletedBlockTransactionScanner");
-      DatanodeDeletedBlockTransactions transactions =
-          getToDeleteContainerBlocks();
+      DatanodeDeletedBlockTransactions transactions = null;
+      List<DatanodeID> datanodes = nodeManager.getNodes(NodeState.HEALTHY);
+      if (datanodes != null) {
+        transactions = new DatanodeDeletedBlockTransactions(mappingService,
+            blockDeleteLimitSize, datanodes.size());
+        try {
+          deletedBlockLog.getTransactions(transactions);
+        } catch (IOException e) {
+          // We may tolerant a number of failures for sometime
+          // but if it continues to fail, at some point we need to raise
+          // an exception and probably fail the SCM ? At present, it simply
+          // continues to retry the scanning.
+          LOG.error("Failed to get block deletion transactions from delTX log",
+              e);
+        }
+        LOG.debug("Scanned deleted blocks log and got {} delTX to process.",
+            transactions.getTXNum());
+      }
+
       if (transactions != null && !transactions.isEmpty()) {
         for (DatanodeID datanodeID : transactions.getDatanodes()) {
           List<DeletedBlocksTransaction> dnTXs = transactions
               .getDatanodeTransactions(datanodeID);
-          dnTxCount += dnTXs.size();
-          // TODO commandQueue needs a cap.
-          // We should stop caching new commands if num of un-processed
-          // command is bigger than a limit, e.g 50. In case datanode goes
-          // offline for sometime, the cached commands be flooded.
-          nodeManager.addDatanodeCommand(datanodeID,
-              new DeleteBlocksCommand(dnTXs));
-          LOG.debug(
-              "Added delete block command for datanode {} in the queue,"
-                  + " number of delete block transactions: {}, TxID list: {}",
-              datanodeID, dnTXs.size(),
-              String.join(",", transactions.getTransactionIDList(datanodeID)));
+          if (dnTXs != null && !dnTXs.isEmpty()) {
+            dnTxCount += dnTXs.size();
+            // TODO commandQueue needs a cap.
+            // We should stop caching new commands if num of un-processed
+            // command is bigger than a limit, e.g 50. In case datanode goes
+            // offline for sometime, the cached commands be flooded.
+            nodeManager.addDatanodeCommand(datanodeID,
+                new DeleteBlocksCommand(dnTXs));
+            LOG.debug(
+                "Added delete block command for datanode {} in the queue,"
+                    + " number of delete block transactions: {}, TxID list: {}",
+                datanodeID, dnTXs.size(), String.join(",",
+                    transactions.getTransactionIDList(datanodeID)));
+          }
         }
       }
 
       if (dnTxCount > 0) {
-        LOG.info("Totally added {} delete blocks command for"
-            + " {} datanodes, task elapsed time: {}ms",
+        LOG.info(
+            "Totally added {} delete blocks command for"
+                + " {} datanodes, task elapsed time: {}ms",
             dnTxCount, transactions.getDatanodes().size(),
             Time.monotonicNow() - startTime);
       }
 
       return EmptyTaskResult.newResult();
     }
-
-    // Scan deleteBlocks.db to get a number of to-delete blocks.
-    // this is going to be properly throttled.
-    private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
-      DatanodeDeletedBlockTransactions dnTXs =
-          new DatanodeDeletedBlockTransactions();
-      List<DeletedBlocksTransaction> txs = null;
-      try {
-        // Get a limited number of TXs to send via HB at a time.
-        txs = deletedBlockLog
-            .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
-        LOG.debug("Scanned deleted blocks log and got {} delTX to process",
-            txs.size());
-      } catch (IOException e) {
-        // We may tolerant a number of failures for sometime
-        // but if it continues to fail, at some point we need to raise
-        // an exception and probably fail the SCM ? At present, it simply
-        // continues to retry the scanning.
-        LOG.error("Failed to get block deletion transactions from delTX log",
-            e);
-      }
-
-      if (txs != null) {
-        for (DeletedBlocksTransaction tx : txs) {
-          try {
-            ContainerInfo info = mappingService
-                .getContainer(tx.getContainerName());
-            // Find out the datanode where this TX is supposed to send to.
-            info.getPipeline().getMachines()
-                .forEach(entry -> dnTXs.addTransaction(entry, tx));
-          } catch (IOException e) {
-            LOG.warn("Container {} not found, continue to process next",
-                tx.getContainerName(), e);
-          }
-        }
-      }
-      return dnTXs;
-    }
   }
 
-  /**
-   * A wrapper class to hold info about datanode and all deleted block
-   * transactions that will be sent to this datanode.
-   */
-  private static class DatanodeDeletedBlockTransactions {
-
-    // A list of TXs mapped to a certain datanode ID.
-    private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
-
-    DatanodeDeletedBlockTransactions() {
-      this.transactions = Maps.newHashMap();
-    }
-
-    void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
-      if (transactions.containsKey(dnID)) {
-        transactions.get(dnID).add(tx);
-      } else {
-        List<DeletedBlocksTransaction> first = Lists.newArrayList();
-        first.add(tx);
-        transactions.put(dnID, first);
-      }
-      LOG.debug("Transaction added: {} <- TX({})", dnID, tx.getTxID());
-    }
-
-    Set<DatanodeID> getDatanodes() {
-      return transactions.keySet();
-    }
-
-    boolean isEmpty() {
-      return transactions.isEmpty();
-    }
-
-    boolean hasTransactions(DatanodeID dnID) {
-      return transactions.containsKey(dnID) &&
-          !transactions.get(dnID).isEmpty();
-    }
-
-    List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
-      return transactions.get(dnID);
-    }
-
-    List<String> getTransactionIDList(DatanodeID dnID) {
-      if (hasTransactions(dnID)) {
-        return transactions.get(dnID).stream()
-            .map(DeletedBlocksTransaction::getTxID)
-            .map(String::valueOf)
-            .collect(Collectors.toList());
-      } else {
-        return Collections.emptyList();
-      }
-    }
+  @VisibleForTesting
+  public void setBlockDeleteTXNum(int numTXs) {
+    blockDeleteLimitSize = numTXs;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 6e6cc8d..251b949 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -22,9 +22,17 @@ import java.io.IOException;
 
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
 import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
+import org.apache.hadoop.ozone.scm.block.SCMBlockDeletingService;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.junit.Rule;
@@ -166,11 +174,17 @@ public class TestStorageContainerManager {
 
   @Test
   public void testBlockDeletionTransactions() throws Exception {
+    int numKeys = 5;
     OzoneConfiguration conf = new OzoneConfiguration();
     conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
     conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
     conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
     conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
+    // Reset container provision size, otherwise only one container
+    // is created by default.
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
+        numKeys);
+
     MiniOzoneCluster cluster =
         new MiniOzoneCluster.Builder(conf).numDataNodes(1)
             .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
@@ -180,48 +194,14 @@ public class TestStorageContainerManager {
           .getScmBlockManager().getDeletedBlockLog();
       Assert.assertEquals(0, delLog.getNumOfValidTransactions());
 
-      // Create 20 random names keys.
+      // Create {numKeys} random names keys.
       TestStorageContainerManagerHelper helper =
           new TestStorageContainerManagerHelper(cluster, conf);
-      Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
-
-      // These keys will be written into a bunch of containers,
-      // gets a set of container names, verify container containerBlocks
-      // on datanodes.
-      Set<String> containerNames = new HashSet<>();
-      for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
-        entry.getValue().getKeyLocationList()
-            .forEach(loc -> containerNames.add(loc.getContainerName()));
-      }
+      Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
 
-      // Total number of containerBlocks of these containers should be equal to
-      // total number of containerBlocks via creation call.
-      int totalCreatedBlocks = 0;
-      for (KsmKeyInfo info : keyLocations.values()) {
-        totalCreatedBlocks += info.getKeyLocationList().size();
-      }
-      Assert.assertTrue(totalCreatedBlocks > 0);
-      Assert.assertEquals(totalCreatedBlocks,
-          helper.getAllBlocks(containerNames).size());
-
-      // Create a deletion TX for each key.
-      Map<String, List<String>> containerBlocks = Maps.newHashMap();
-      for (KsmKeyInfo info : keyLocations.values()) {
-        List<KsmKeyLocationInfo> list = info.getKeyLocationList();
-        list.forEach(location -> {
-          if (containerBlocks.containsKey(location.getContainerName())) {
-            containerBlocks.get(location.getContainerName())
-                .add(location.getBlockID());
-          } else {
-            List<String> blks = Lists.newArrayList();
-            blks.add(location.getBlockID());
-            containerBlocks.put(location.getContainerName(), blks);
-          }
-        });
-      }
-      for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
-        delLog.addTransaction(tx.getKey(), tx.getValue());
-      }
+      Map<String, List<String>> containerBlocks = createDeleteTXLog(delLog,
+          keyLocations, helper);
+      Set<String> containerNames = containerBlocks.keySet();
 
       // Verify a few TX gets created in the TX log.
       Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
@@ -268,4 +248,105 @@ public class TestStorageContainerManager {
       }
     }
   }
-}
+
+  @Test
+  public void testBlockDeletingThrottling() throws Exception {
+    int numKeys = 15;
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
+    conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
+        numKeys);
+
+    MiniOzoneCluster cluster = new MiniOzoneCluster.Builder(conf)
+        .numDataNodes(1).setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED)
+        .build();
+
+    DeletedBlockLog delLog = cluster.getStorageContainerManager()
+        .getScmBlockManager().getDeletedBlockLog();
+    Assert.assertEquals(0, delLog.getNumOfValidTransactions());
+
+    int limitSize = 1;
+    // Reset limit value to 1, so that we only allow one TX is dealt per
+    // datanode.
+    SCMBlockDeletingService delService = cluster.getStorageContainerManager()
+        .getScmBlockManager().getSCMBlockDeletingService();
+    delService.setBlockDeleteTXNum(limitSize);
+
+    // Create {numKeys} random names keys.
+    TestStorageContainerManagerHelper helper =
+        new TestStorageContainerManagerHelper(cluster, conf);
+    Map<String, KsmKeyInfo> keyLocations = helper.createKeys(numKeys, 4096);
+
+    createDeleteTXLog(delLog, keyLocations, helper);
+    // Verify a few TX gets created in the TX log.
+    Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
+
+    // Verify the size in delete commands is expected.
+    GenericTestUtils.waitFor(() -> {
+      NodeManager nodeManager = cluster.getStorageContainerManager()
+          .getScmNodeManager();
+      ReportState reportState = ReportState.newBuilder()
+          .setState(ReportState.states.noContainerReports).setCount(0).build();
+      List<SCMCommand> commands = nodeManager.sendHeartbeat(
+          nodeManager.getNodes(NodeState.HEALTHY).get(0), null, reportState);
+
+      if (commands != null) {
+        for (SCMCommand cmd : commands) {
+          if (cmd.getType() == Type.deleteBlocksCommand) {
+            List<DeletedBlocksTransaction> deletedTXs =
+                ((DeleteBlocksCommand) cmd).blocksTobeDeleted();
+            return deletedTXs != null && deletedTXs.size() == limitSize;
+          }
+        }
+      }
+      return false;
+    }, 500, 10000);
+  }
+
+  private Map<String, List<String>> createDeleteTXLog(DeletedBlockLog delLog,
+      Map<String, KsmKeyInfo> keyLocations,
+      TestStorageContainerManagerHelper helper) throws IOException {
+    // These keys will be written into a bunch of containers,
+    // gets a set of container names, verify container containerBlocks
+    // on datanodes.
+    Set<String> containerNames = new HashSet<>();
+    for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
+      entry.getValue().getKeyLocationList()
+          .forEach(loc -> containerNames.add(loc.getContainerName()));
+    }
+
+    // Total number of containerBlocks of these containers should be equal to
+    // total number of containerBlocks via creation call.
+    int totalCreatedBlocks = 0;
+    for (KsmKeyInfo info : keyLocations.values()) {
+      totalCreatedBlocks += info.getKeyLocationList().size();
+    }
+    Assert.assertTrue(totalCreatedBlocks > 0);
+    Assert.assertEquals(totalCreatedBlocks,
+        helper.getAllBlocks(containerNames).size());
+
+    // Create a deletion TX for each key.
+    Map<String, List<String>> containerBlocks = Maps.newHashMap();
+    for (KsmKeyInfo info : keyLocations.values()) {
+      List<KsmKeyLocationInfo> list = info.getKeyLocationList();
+      list.forEach(location -> {
+        if (containerBlocks.containsKey(location.getContainerName())) {
+          containerBlocks.get(location.getContainerName())
+              .add(location.getBlockID());
+        } else {
+          List<String> blks = Lists.newArrayList();
+          blks.add(location.getBlockID());
+          containerBlocks.put(location.getContainerName(), blks);
+        }
+      });
+    }
+    for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
+      delLog.addTransaction(tx.getKey(), tx.getValue());
+    }
+
+    return containerBlocks;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f6b93781/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
index 0aea9b8..c4006e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestDeletedBlockLog.java
@@ -19,9 +19,14 @@ package org.apache.hadoop.ozone.scm.block;
 
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.scm.container.ContainerMapping;
+import org.apache.hadoop.ozone.scm.container.Mapping;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.utils.MetadataKeyFilters;
 import org.apache.hadoop.utils.MetadataStore;
@@ -29,11 +34,14 @@ import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mockito;
 
 import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
@@ -42,6 +50,7 @@ import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.scm.ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY;
+import static org.mockito.Mockito.mock;
 
 /**
  * Tests for DeletedBlockLog.
@@ -237,4 +246,97 @@ public class TestDeletedBlockLog {
     blocks = deletedBlockLog.getTransactions(10);
     Assert.assertEquals(10, blocks.size());
   }
+
+  @Test
+  public void testDeletedBlockTransactions() throws IOException {
+    int txNum = 10;
+    int maximumAllowedTXNum = 5;
+    List<DeletedBlocksTransaction> blocks = null;
+    List<String> containerNames = new LinkedList<>();
+
+    int count = 0;
+    String containerName = null;
+    DatanodeID dnID1 = new DatanodeID(null, null, "node1", 0, 0, 0, 0);
+    DatanodeID dnID2 = new DatanodeID(null, null, "node2", 0, 0, 0, 0);
+    Mapping mappingService = mock(ContainerMapping.class);
+    // Creates {TXNum} TX in the log.
+    for (Map.Entry<String, List<String>> entry : generateData(txNum)
+        .entrySet()) {
+      count++;
+      containerName = entry.getKey();
+      containerNames.add(containerName);
+      deletedBlockLog.addTransaction(containerName, entry.getValue());
+
+      // make TX[1-6] for datanode1; TX[7-10] for datanode2
+      if (count <= (maximumAllowedTXNum + 1)) {
+        mockContainerInfo(mappingService, containerName, dnID1);
+      } else {
+        mockContainerInfo(mappingService, containerName, dnID2);
+      }
+    }
+
+    DatanodeDeletedBlockTransactions transactions =
+        new DatanodeDeletedBlockTransactions(mappingService,
+            maximumAllowedTXNum, 2);
+    deletedBlockLog.getTransactions(transactions);
+
+    List<Long> txIDs = new LinkedList<>();
+    for (DatanodeID dnID : transactions.getDatanodes()) {
+      List<DeletedBlocksTransaction> txs = transactions
+          .getDatanodeTransactions(dnID);
+      for (DeletedBlocksTransaction tx : txs) {
+        txIDs.add(tx.getTxID());
+      }
+    }
+
+    // delete TX ID
+    deletedBlockLog.commitTransactions(txIDs);
+    blocks = deletedBlockLog.getTransactions(txNum);
+    // There should be one block remained since dnID1 reaches
+    // the maximum value (5).
+    Assert.assertEquals(1, blocks.size());
+
+    Assert.assertFalse(transactions.isFull());
+    // The number of TX in dnID1 won't more than maximum value.
+    Assert.assertEquals(maximumAllowedTXNum,
+        transactions.getDatanodeTransactions(dnID1).size());
+
+    int size = transactions.getDatanodeTransactions(dnID2).size();
+    // add duplicated container in dnID2, this should be failed.
+    DeletedBlocksTransaction.Builder builder =
+        DeletedBlocksTransaction.newBuilder();
+    builder.setTxID(11);
+    builder.setContainerName(containerName);
+    builder.setCount(0);
+    transactions.addTransaction(builder.build());
+
+    // The number of TX in dnID2 should not be changed.
+    Assert.assertEquals(size,
+        transactions.getDatanodeTransactions(dnID2).size());
+
+    // Add new TX in dnID2, then dnID2 will reach maximum value.
+    containerName = "newContainer";
+    builder = DeletedBlocksTransaction.newBuilder();
+    builder.setTxID(12);
+    builder.setContainerName(containerName);
+    builder.setCount(0);
+    mockContainerInfo(mappingService, containerName, dnID2);
+    transactions.addTransaction(builder.build());
+    // Since all node are full, then transactions is full.
+    Assert.assertTrue(transactions.isFull());
+  }
+
+  private void mockContainerInfo(Mapping mappingService, String containerName,
+      DatanodeID dnID) throws IOException {
+    Pipeline pipeline = new Pipeline("fake");
+    pipeline.addMember(dnID);
+
+    ContainerInfo.Builder builder = new ContainerInfo.Builder();
+    builder.setPipeline(pipeline);
+    builder.setContainerName(containerName);
+
+    ContainerInfo conatinerInfo = builder.build();
+    Mockito.doReturn(conatinerInfo).when(mappingService)
+        .getContainer(containerName);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message