hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang.
Date Mon, 28 Aug 2017 02:05:22 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 1586f20fc -> 76a156ebc


HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/76a156eb
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/76a156eb
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/76a156eb

Branch: refs/heads/HDFS-7240
Commit: 76a156ebce06291a4fbb4f2cf8e2a08528e1c2e2
Parents: 1586f20
Author: Weiwei Yang <wwei@apache.org>
Authored: Mon Aug 28 10:04:46 2017 +0800
Committer: Weiwei Yang <wwei@apache.org>
Committed: Mon Aug 28 10:04:46 2017 +0800

----------------------------------------------------------------------
 .../helpers/DeletedContainerBlocksSummary.java  | 103 +++++++++
 .../statemachine/DatanodeStateMachine.java      |  14 +-
 .../background/BlockDeletingService.java        |  30 ++-
 .../DeleteBlocksCommandHandler.java             | 198 +++++++++++++++++
 .../states/endpoint/HeartbeatEndpointTask.java  |  15 ++
 .../StorageContainerDatanodeProtocol.java       |  12 +-
 .../protocol/commands/DeleteBlocksCommand.java  |  63 ++++++
 ...rDatanodeProtocolClientSideTranslatorPB.java |  14 ++
 ...rDatanodeProtocolServerSideTranslatorPB.java |  13 ++
 .../ozone/scm/StorageContainerManager.java      |  66 +++++-
 .../hadoop/ozone/scm/block/BlockManager.java    |  17 ++
 .../ozone/scm/block/BlockManagerImpl.java       |  43 ++++
 .../hadoop/ozone/scm/block/DeletedBlockLog.java |  21 +-
 .../ozone/scm/block/DeletedBlockLogImpl.java    |  53 ++++-
 .../scm/block/SCMBlockDeletingService.java      | 217 +++++++++++++++++++
 .../hadoop/ozone/scm/node/NodeManager.java      |   9 +
 .../hadoop/ozone/scm/node/SCMNodeManager.java   |   5 +
 .../hadoop/utils/BackgroundTaskResult.java      |  17 +-
 .../StorageContainerDatanodeProtocol.proto      |  27 ++-
 .../ozone/TestStorageContainerManager.java      | 116 +++++++++-
 .../TestStorageContainerManagerHelper.java      | 180 +++++++++++++++
 .../ozone/container/common/ScmTestMock.java     |   9 +
 22 files changed, 1216 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
new file mode 100644
index 0000000..de5e2d0
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/DeletedContainerBlocksSummary.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.helpers;
+
+import com.google.common.collect.Maps;
+import org.apache.hadoop.ozone.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * A helper class to wrap the info about under deletion container blocks.
+ */
+public final class DeletedContainerBlocksSummary {
+
+  private final List<DeletedBlocksTransaction> blocks;
+  // key : txID
+  // value : times of this tx has been processed
+  private final Map<Long, Integer> txSummary;
+  // key : container name
+  // value : the number of blocks need to be deleted in this container
+  // if the message contains multiple entries for same block,
+  // blocks will be merged
+  private final Map<String, Integer> blockSummary;
+  // total number of blocks in this message
+  private int numOfBlocks;
+
+  private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) {
+    this.blocks = blocks;
+    txSummary = Maps.newHashMap();
+    blockSummary = Maps.newHashMap();
+    blocks.forEach(entry -> {
+      txSummary.put(entry.getTxID(), entry.getCount());
+      if (blockSummary.containsKey(entry.getContainerName())) {
+        blockSummary.put(entry.getContainerName(),
+            blockSummary.get(entry.getContainerName())
+                + entry.getBlockIDCount());
+      } else {
+        blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
+      }
+      numOfBlocks += entry.getBlockIDCount();
+    });
+  }
+
+  public static DeletedContainerBlocksSummary getFrom(
+      List<DeletedBlocksTransaction> blocks) {
+    return new DeletedContainerBlocksSummary(blocks);
+  }
+
+  public int getNumOfBlocks() {
+    return numOfBlocks;
+  }
+
+  public int getNumOfContainers() {
+    return blockSummary.size();
+  }
+
+  public String getTXIDs() {
+    return String.join(",", txSummary.keySet()
+        .stream().map(String::valueOf).collect(Collectors.toList()));
+  }
+
+  public String getTxIDSummary() {
+    List<String> txSummaryEntry = txSummary.entrySet().stream()
+        .map(entry -> entry.getKey() + "(" + entry.getValue() + ")")
+        .collect(Collectors.toList());
+    return "[" + String.join(",", txSummaryEntry) + "]";
+  }
+
+  @Override public String toString() {
+    StringBuffer sb = new StringBuffer();
+    for (DeletedBlocksTransaction blks : blocks) {
+      sb.append(" ")
+          .append("TXID=")
+          .append(blks.getTxID())
+          .append(", ")
+          .append("TimesProceed=")
+          .append(blks.getCount())
+          .append(", ")
+          .append(blks.getContainerName())
+          .append(" : [")
+          .append(String.join(",", blks.getBlockIDList())).append("]")
+          .append("\n");
+    }
+    return sb.toString();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 66992af..45e47db 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
 import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.util.Time;
@@ -77,15 +78,16 @@ public class DatanodeStateMachine implements Closeable {
     this.datanodeID = datanodeID;
     nextHB = new AtomicLong(Time.monotonicNow());
 
-
      // When we add new handlers just adding a new handler here should do the
      // trick.
     commandDispatcher = CommandDispatcher.newBuilder()
-      .addHandler(new ContainerReportHandler())
-      .setConnectionManager(connectionManager)
-      .setContainer(container)
-      .setContext(context)
-      .build();
+        .addHandler(new ContainerReportHandler())
+        .addHandler(new DeleteBlocksCommandHandler(
+            container.getContainerManager(), conf))
+        .setConnectionManager(connectionManager)
+        .setContainer(container)
+        .setContext(context)
+        .build();
   }
 
   public void setDatanodeID(DatanodeID datanodeID) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
index 498264e..48cf329 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
@@ -116,6 +117,11 @@ public class BlockDeletingService extends BackgroundService{
       LOG.warn("Failed to initiate block deleting tasks, "
           + "caused by unable to get containers info. "
           + "Retry in next interval. ", e);
+    } catch (Exception e) {
+      // In case listContainer call throws any uncaught RuntimeException.
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Unexpected error occurs during deleting blocks.", e);
+      }
     }
     return queue;
   }
@@ -159,6 +165,7 @@ public class BlockDeletingService extends BackgroundService{
 
     @Override
     public BackgroundTaskResult call() throws Exception {
+      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
       long startTime = Time.monotonicNow();
       // Scan container's db and get list of under deletion blocks
       MetadataStore meta = KeyUtils.getDB(containerData, conf);
@@ -175,17 +182,24 @@ public class BlockDeletingService extends BackgroundService{
       List<String> succeedBlocks = new LinkedList<>();
       LOG.debug("Container : {}, To-Delete blocks : {}",
           containerData.getContainerName(), toDeleteBlocks.size());
+      File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
+      if (!dataDir.exists() || !dataDir.isDirectory()) {
+        LOG.error("Invalid container data dir {} : "
+            + "not exist or not a directory", dataDir.getAbsolutePath());
+        return crr;
+      }
+
       toDeleteBlocks.forEach(entry -> {
         String blockName = DFSUtil.bytes2String(entry.getKey());
         LOG.debug("Deleting block {}", blockName);
         try {
           ContainerProtos.KeyData data =
               ContainerProtos.KeyData.parseFrom(entry.getValue());
-
           for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
-            File chunkFile = new File(chunkInfo.getChunkName());
+            File chunkFile = dataDir.toPath()
+                .resolve(chunkInfo.getChunkName()).toFile();
             if (FileUtils.deleteQuietly(chunkFile)) {
-              LOG.debug("block {} chunk {} deleted", blockName,
+              LOG.info("block {} chunk {} deleted", blockName,
                   chunkFile.getAbsolutePath());
             }
           }
@@ -201,11 +215,11 @@ public class BlockDeletingService extends BackgroundService{
           batch.delete(DFSUtil.string2Bytes(entry)));
       meta.writeBatch(batch);
 
-      LOG.info("The elapsed time of task@{} for"
-          + " deleting blocks: {}ms.",
-          Integer.toHexString(this.hashCode()),
-          Time.monotonicNow() - startTime);
-      ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
+      if (!succeedBlocks.isEmpty()) {
+        LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
+            containerData.getContainerName(), succeedBlocks.size(),
+            Time.monotonicNow() - startTime);
+      }
       crr.addAll(succeedBlocks);
       return crr;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
new file mode 100644
index 0000000..a833cdf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
+import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
+import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Handle block deletion commands.
+ */
+public class DeleteBlocksCommandHandler implements CommandHandler {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
+
+  private ContainerManager containerManager;
+  private Configuration conf;
+  private int invocationCount;
+  private long totalTime;
+
+  public DeleteBlocksCommandHandler(ContainerManager containerManager,
+      Configuration conf) {
+    this.containerManager = containerManager;
+    this.conf = conf;
+  }
+
+  @Override
+  public void handle(SCMCommand command, OzoneContainer container,
+      StateContext context, SCMConnectionManager connectionManager) {
+    if (command.getType() != Type.deleteBlocksCommand) {
+      LOG.warn("Skipping handling command, expected command "
+              + "type {} but found {}",
+          Type.deleteBlocksCommand, command.getType());
+      return;
+    }
+    LOG.debug("Processing block deletion command.");
+    invocationCount++;
+    long startTime = Time.monotonicNow();
+
+    // move blocks to deleting state.
+    // this is a metadata update, the actual deletion happens in another
+    // recycling thread.
+    DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
+    List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
+
+
+    DeletedContainerBlocksSummary summary =
+        DeletedContainerBlocksSummary.getFrom(containerBlocks);
+    LOG.info("Start to delete container blocks, TXIDs={}, "
+            + "numOfContainers={}, numOfBlocks={}",
+        summary.getTxIDSummary(),
+        summary.getNumOfContainers(),
+        summary.getNumOfBlocks());
+
+    ContainerBlocksDeletionACKProto.Builder resultBuilder =
+        ContainerBlocksDeletionACKProto.newBuilder();
+    containerBlocks.forEach(entry -> {
+      DeleteBlockTransactionResult.Builder txResultBuilder =
+          DeleteBlockTransactionResult.newBuilder();
+      txResultBuilder.setTxID(entry.getTxID());
+      try {
+        deleteContainerBlocks(entry, conf);
+        txResultBuilder.setSuccess(true);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete blocks for container={}, TXID={}",
+            entry.getContainerName(), entry.getTxID(), e);
+        txResultBuilder.setSuccess(false);
+      }
+      resultBuilder.addResults(txResultBuilder.build());
+    });
+    ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
+
+    // Send ACK back to SCM as long as meta updated
+    // TODO Or we should wait until the blocks are actually deleted?
+    if (!containerBlocks.isEmpty()) {
+      for (EndpointStateMachine endPoint : connectionManager.getValues()) {
+        try {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Sending following block deletion ACK to SCM");
+            for (DeleteBlockTransactionResult result :
+                blockDeletionACK.getResultsList()) {
+              LOG.debug(result.getTxID() + " : " + result.getSuccess());
+            }
+          }
+          endPoint.getEndPoint()
+              .sendContainerBlocksDeletionACK(blockDeletionACK);
+        } catch (IOException e) {
+          LOG.error("Unable to send block deletion ACK to SCM {}",
+              endPoint.getAddress().toString(), e);
+        }
+      }
+    }
+
+    long endTime = Time.monotonicNow();
+    totalTime += endTime - startTime;
+  }
+
+  /**
+   * Move a bunch of blocks from a container to deleting state.
+   * This is a meta update, the actual deletes happen in async mode.
+   *
+   * @param delTX a block deletion transaction.
+   * @param config configuration.
+   * @throws IOException if I/O error occurs.
+   */
+  private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
+      Configuration config) throws IOException {
+    String containerId = delTX.getContainerName();
+    ContainerData containerInfo = containerManager.readContainer(containerId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Processing Container : {}, DB path : {}", containerId,
+          containerInfo.getDBPath());
+    }
+    MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
+    for (String blk : delTX.getBlockIDList()) {
+      BatchOperation batch = new BatchOperation();
+      byte[] blkBytes = DFSUtil.string2Bytes(blk);
+      byte[] blkInfo = containerDB.get(blkBytes);
+      if (blkInfo != null) {
+        // Found the block in container db,
+        // use an atomic update to change its state to deleting.
+        batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
+            blkInfo);
+        batch.delete(blkBytes);
+        try {
+          containerDB.writeBatch(batch);
+          LOG.info("Transited Block {} to DELETING state in container {}",
+              blk, containerId);
+        } catch (IOException e) {
+          // if some blocks failed to delete, we fail this TX,
+          // without sending this ACK to SCM, SCM will resend the TX
+          // with a certain number of retries.
+          throw new IOException(
+              "Failed to delete blocks for TXID = " + delTX.getTxID(), e);
+        }
+      } else {
+        LOG.info("Block {} not found or already under deletion in"
+                + " container {}, skip deleting it.", blk, containerId);
+      }
+    }
+  }
+
+  @Override
+  public Type getCommandType() {
+    return Type.deleteBlocksCommand;
+  }
+
+  @Override
+  public int getInvocationCount() {
+    return this.invocationCount;
+  }
+
+  @Override
+  public long getAverageRunTime() {
+    if (invocationCount > 0) {
+      return totalTime / invocationCount;
+    }
+    return 0;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 764e6d2..d68351c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -21,11 +21,14 @@ package org.apache.hadoop.ozone.container.common.states.endpoint;
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.container.common.helpers
+    .DeletedContainerBlocksSummary;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
 import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
@@ -144,6 +147,18 @@ public class HeartbeatEndpointTask
           }
         }
         break;
+      case deleteBlocksCommand:
+        DeleteBlocksCommand db = DeleteBlocksCommand
+            .getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
+        if (!db.blocksTobeDeleted().isEmpty()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(DeletedContainerBlocksSummary
+                .getFrom(db.blocksTobeDeleted())
+                .toString());
+          }
+          this.context.addCommand(db);
+        }
+        break;
       default:
         throw new IllegalArgumentException("Unknown response : "
             + commandResponseProto.getCmdType().name());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
index 75bd771..a7b8717 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerDatanodeProtocol.java
@@ -25,7 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
 import java.io.IOException;
 
 /**
@@ -70,4 +71,13 @@ public interface StorageContainerDatanodeProtocol {
    */
   SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
       throws IOException;
+
+  /**
+   * Used by datanode to send block deletion ACK to SCM.
+   * @param request block deletion transactions.
+   * @return block deletion transaction response.
+   * @throws IOException
+   */
+  ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto request) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
new file mode 100644
index 0000000..8e3463d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocol.commands;
+
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+
+import java.util.List;
+
+/**
+ * A SCM command asks a datanode to delete a number of blocks.
+ */
+public class DeleteBlocksCommand extends
+    SCMCommand<SCMDeleteBlocksCmdResponseProto> {
+
+  private List<DeletedBlocksTransaction> blocksTobeDeleted;
+
+
+  public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
+    this.blocksTobeDeleted = blocks;
+  }
+
+  public List<DeletedBlocksTransaction> blocksTobeDeleted() {
+    return this.blocksTobeDeleted;
+  }
+
+  @Override
+  public Type getType() {
+    return Type.deleteBlocksCommand;
+  }
+
+  @Override
+  public byte[] getProtoBufMessage() {
+    return getProto().toByteArray();
+  }
+
+  public static DeleteBlocksCommand getFromProtobuf(
+      SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
+    return new DeleteBlocksCommand(deleteBlocksProto
+        .getDeletedBlocksTransactionsList());
+  }
+
+  public SCMDeleteBlocksCmdResponseProto getProto() {
+    return SCMDeleteBlocksCmdResponseProto.newBuilder()
+        .addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
index e0e3bee..033513d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolClientSideTranslatorPB.java
@@ -38,6 +38,8 @@ import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -177,4 +179,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
     return resp;
   }
 
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
+    final ContainerBlocksDeletionACKResponseProto resp;
+    try {
+      resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
+          deletedBlocks);
+    } catch (ServiceException e) {
+      throw ProtobufHelper.getRemoteException(e);
+    }
+    return resp;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
index 116cc38..9808f3e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerDatanodeProtocolServerSideTranslatorPB.java
@@ -24,6 +24,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
 
 import java.io.IOException;
 
@@ -97,4 +99,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
       throw new ServiceException(e);
     }
   }
+
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      RpcController controller, ContainerBlocksDeletionACKProto request)
+      throws ServiceException {
+    try {
+      return impl.sendContainerBlocksDeletionACK(request);
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index fa14ad0..5a97735 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
 import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@@ -52,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
 import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
@@ -91,6 +95,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
+import java.util.Collections;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
@@ -322,8 +328,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
    * @throws InvalidProtocolBufferException
    */
   @VisibleForTesting
-  public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
-      throws InvalidProtocolBufferException {
+  public SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
+      throws IOException {
     Type type = cmd.getType();
     SCMCommandResponseProto.Builder builder =
         SCMCommandResponseProto.newBuilder();
@@ -346,6 +352,17 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
           .setReregisterProto(SCMReregisterCmdResponseProto
               .getDefaultInstance())
           .build();
+    case deleteBlocksCommand:
+      // Once SCM sends out the deletion message, increment the count.
+      // this is done here instead of when SCM receives the ACK, because
+      // DN might not be able to response the ACK for sometime. In case
+      // it times out, SCM needs to re-send the message some more times.
+      List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
+          .stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
+      this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
+      return builder.setCmdType(Type.deleteBlocksCommand)
+          .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+          .build();
     default:
       throw new IllegalArgumentException("Not implemented");
     }
@@ -591,6 +608,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
         datanodeRpcAddress));
     datanodeRpcServer.start();
     httpServer.start();
+    scmBlockManager.start();
 
     setStartTime();
 
@@ -628,6 +646,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
       LOG.error("Storage Container Manager HTTP server stop failed.", ex);
     }
 
+    try {
+      LOG.info("Stopping Block Manager Service.");
+      scmBlockManager.stop();
+    } catch (Exception ex) {
+      LOG.error("SCM block manager service stop failed.", ex);
+    }
+
     unregisterMXBean();
     IOUtils.cleanupWithLogger(LOG, scmContainerManager);
     IOUtils.cleanupWithLogger(LOG, scmBlockManager);
@@ -715,6 +740,38 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
+   * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
+   * SCM considers the blocks are deleted and update the metadata in SCM DB.
+   *
+   * @param acks
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto acks) throws IOException {
+    if (acks.getResultsCount() > 0) {
+      List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
+      for (DeleteBlockTransactionResult result : resultList) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+                  + "success={}", result.getTxID(), result.getSuccess());
+        }
+        if (result.getSuccess()) {
+          LOG.info("Purging TXID={} from block deletion log", result.getTxID());
+          this.getScmBlockManager().getDeletedBlockLog()
+              .commitTransactions(Collections.singletonList(result.getTxID()));
+        } else {
+          LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+              + "TX in next interval", result.getTxID());
+        }
+      }
+    }
+    return ContainerBlocksDeletionACKResponseProto.newBuilder()
+        .getDefaultInstanceForType();
+  }
+
+  /**
    * Returns the Number of Datanodes that are communicating with SCM.
    *
    * @param nodestate Healthy, Dead etc.
@@ -742,6 +799,11 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
     return scmNodeManager;
   }
 
+  @VisibleForTesting
+  public BlockManager getScmBlockManager() {
+    return scmBlockManager;
+  }
+
   /**
    * Get block locations.
    * @param keys batch of block keys to retrieve.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
index d1487fc..4672b33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
@@ -51,4 +51,21 @@ public interface BlockManager extends Closeable {
    * @throws IOException
    */
   void deleteBlock(String key) throws IOException;
+
+  /**
+   * @return the block deletion transaction log maintained by SCM.
+   */
+  DeletedBlockLog getDeletedBlockLog();
+
+  /**
+   * Start block manager background services.
+   * @throws IOException
+   */
+  void start() throws IOException;
+
+  /**
+   * Shutdown block manager background services.
+   * @throws IOException
+   */
+  void stop() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 43ca21c..d920c42 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -46,6 +46,7 @@ import java.io.IOException;
 
 import java.util.ArrayList;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 import java.util.List;
@@ -70,6 +71,14 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
     FAILED_TO_LOAD_OPEN_CONTAINER;
 import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
     INVALID_BLOCK_SIZE;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 
 /**
  * Block Manager manages the block access for SCM.
@@ -89,6 +98,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   // Track all containers owned by block service.
   private final MetadataStore containerStore;
   private final DeletedBlockLog deletedBlockLog;
+  private final SCMBlockDeletingService blockDeletingService;
 
   private Map<OzoneProtos.LifeCycleState,
       Map<String, BlockContainerInfo>> containers;
@@ -143,7 +153,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     this.lock = new ReentrantLock();
 
     mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
+
+    // SCM block deleting transaction log and deleting service.
     deletedBlockLog = new DeletedBlockLogImpl(conf);
+    int svcInterval = conf.getInt(
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+    long serviceTimeout = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
+        containerManager, nodeManager, svcInterval, serviceTimeout);
+  }
+
+  /**
+   * Start block manager services.
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    this.blockDeletingService.start();
+  }
+
+  /**
+   * Shutdown block manager services.
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    this.blockDeletingService.shutdown();
+    this.close();
   }
 
   // TODO: close full (or almost full) containers with a separate thread.
@@ -475,6 +512,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     }
   }
 
+  @Override
+  public DeletedBlockLog getDeletedBlockLog() {
+    return this.deletedBlockLog;
+  }
+
   @VisibleForTesting
   public String getDeletedKeyName(String key) {
     return StringUtils.format(".Deleted/%s", key);
@@ -495,6 +537,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     if (deletedBlockLog != null) {
       deletedBlockLog.close();
     }
+    blockDeletingService.shutdown();
     MBeans.unregister(mxBean);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
index 60d53af..9e268a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.ozone.scm.block;
 
-
 import org.apache.hadoop.ozone.protocol.proto
     .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
 
@@ -46,6 +45,17 @@ public interface DeletedBlockLog extends Closeable {
       throws IOException;
 
   /**
+   * Return all failed transactions in the log. A transaction is considered
+   * to be failed if it has been sent more than MAX_RETRY limit and its
+   * count is reset to -1.
+   *
+   * @return a list of failed deleted block transactions.
+   * @throws IOException
+   */
+  List<DeletedBlocksTransaction> getFailedTransactions()
+      throws IOException;
+
+  /**
    * Increments count for given list of transactions by 1.
    * The log maintains a valid range of counts for each transaction
    * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
@@ -75,4 +85,13 @@ public interface DeletedBlockLog extends Closeable {
    */
   void addTransaction(String containerName, List<String> blocks)
       throws IOException;
+
+  /**
+   * Returns the total number of valid transactions. A transaction is
+   * considered to be valid as long as its count is in range [0, MAX_RETRY].
+   *
+   * @return number of a valid transactions.
+   * @throws IOException
+   */
+  int getNumOfValidTransactions() throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
index ef1a515..738157d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.ozone.scm.block;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
 import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
@@ -36,6 +37,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
@@ -147,6 +149,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     return result;
   }
 
+  @Override
+  public List<DeletedBlocksTransaction> getFailedTransactions()
+      throws IOException {
+    lock.lock();
+    try {
+      final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
+      deletedStore.iterate(null, (key, value) -> {
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction delTX =
+              DeletedBlocksTransaction.parseFrom(value);
+          if (delTX.getCount() == -1) {
+            failedTXs.add(delTX);
+          }
+        }
+        return true;
+      });
+      return failedTXs;
+    } finally {
+      lock.unlock();
+    }
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -163,13 +187,14 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
           DeletedBlocksTransaction block = DeletedBlocksTransaction
               .parseFrom(deletedStore.get(Longs.toByteArray(txID)));
           DeletedBlocksTransaction.Builder builder = block.toBuilder();
-          if (block.getCount() > -1) {
-            builder.setCount(block.getCount() + 1);
+          int currentCount = block.getCount();
+          if (currentCount > -1) {
+            builder.setCount(++currentCount);
           }
           // if the retry time exceeds the maxRetry value
           // then set the retry value to -1, stop retrying, admins can
           // analyze those blocks and purge them manually by SCMCli.
-          if (block.getCount() > maxRetry) {
+          if (currentCount > maxRetry) {
             builder.setCount(-1);
           }
           deletedStore.put(Longs.toByteArray(txID),
@@ -238,6 +263,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   }
 
   @Override
+  public int getNumOfValidTransactions() throws IOException {
+    lock.lock();
+    try {
+      final AtomicInteger num = new AtomicInteger(0);
+      deletedStore.iterate(null, (key, value) -> {
+        // Exclude latest txid record
+        if (!Arrays.equals(LATEST_TXID, key)) {
+          DeletedBlocksTransaction delTX =
+              DeletedBlocksTransaction.parseFrom(value);
+          if (delTX.getCount() > -1) {
+            num.incrementAndGet();
+          }
+        }
+        return true;
+      });
+      return num.get();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
   public void close() throws IOException {
     if (deletedStore != null) {
       deletedStore.close();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
new file mode 100644
index 0000000..3ca1133
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/SCMBlockDeletingService.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone.scm.block;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.ozone.scm.container.Mapping;
+import org.apache.hadoop.ozone.scm.node.NodeManager;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+/**
+ * A background service running in SCM to delete blocks. This service scans
+ * block deletion log in certain interval and caches block deletion commands
+ * in {@link org.apache.hadoop.ozone.scm.node.CommandQueue}, asynchronously
+ * SCM HB thread polls cached commands and sends them to datanode for physical
+ * processing.
+ */
+public class SCMBlockDeletingService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(SCMBlockDeletingService.class);
+
+  // ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
+  private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
+  private final DeletedBlockLog deletedBlockLog;
+  private final Mapping mappingService;
+  private final NodeManager nodeManager;
+
+  // Default container size is 5G and block size is 256MB, a full container
+  // at most contains 20 blocks. At most each TX contains 20 blocks.
+  // When SCM sends block deletion TXs to datanode, each command we allow
+  // at most 50 containers so that will limit number of to be deleted blocks
+  // less than 1000.
+  // TODO - a better throttle algorithm
+  // Note, this is not an accurate limit of blocks. When we scan
+  // the log, worst case we may get 50 TX for 50 different datanodes,
+  // that will cause the deletion message sent by SCM extremely small.
+  // As a result, the deletion will be slow. An improvement is to scan
+  // log multiple times until we get enough TXs for each datanode, or
+  // the entire log is scanned.
+  private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
+
+  public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
+      Mapping mapper, NodeManager nodeManager,
+      int interval, long serviceTimeout) {
+    super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
+        BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
+    this.deletedBlockLog = deletedBlockLog;
+    this.mappingService = mapper;
+    this.nodeManager = nodeManager;
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new DeletedBlockTransactionScanner());
+    return queue;
+  }
+
+  private class DeletedBlockTransactionScanner
+      implements BackgroundTask<EmptyTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 1;
+    }
+
+    @Override
+    public EmptyTaskResult call() throws Exception {
+      // Scan SCM DB in HB interval and collect a throttled list of
+      // to delete blocks.
+      LOG.info("Running DeletedBlockTransactionScanner");
+      DatanodeDeletedBlockTransactions transactions =
+          getToDeleteContainerBlocks();
+      if (transactions != null && !transactions.isEmpty()) {
+        transactions.getDatanodes().forEach(datanodeID -> {
+          List<DeletedBlocksTransaction> dnTXs =
+              transactions.getDatanodeTransactions(datanodeID);
+          // TODO commandQueue needs a cap.
+          // We should stop caching new commands if num of un-processed
+          // command is bigger than a limit, e.g 50. In case datanode goes
+          // offline for sometime, the cached commands be flooded.
+          nodeManager.addDatanodeCommand(datanodeID,
+              new DeleteBlocksCommand(dnTXs));
+          LOG.info("Added delete block command for datanode {} in the queue,"
+                  + " number delete block transactions: {}, TxID list: {}",
+              datanodeID, dnTXs.size(),
+              String.join(",", transactions.getTransactionIDList(datanodeID)));
+
+        });
+      }
+      return EmptyTaskResult.newResult();
+    }
+
+    // Scan deleteBlocks.db to get a number of to-delete blocks.
+    // this is going to be properly throttled.
+    private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
+      DatanodeDeletedBlockTransactions dnTXs =
+          new DatanodeDeletedBlockTransactions();
+      List<DeletedBlocksTransaction> txs = null;
+      try {
+        // Get a limited number of TXs to send via HB at a time.
+        txs = deletedBlockLog
+            .getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
+        LOG.info("Scanned deleted blocks log and got {} delTX to process",
+            txs.size());
+      } catch (IOException e) {
+        // We may tolerant a number of failures for sometime
+        // but if it continues to fail, at some point we need to raise
+        // an exception and probably fail the SCM ? At present, it simply
+        // continues to retry the scanning.
+        LOG.error("Failed to get block deletion transactions from delTX log",
+            e);
+      }
+
+      if (txs != null) {
+        for (DeletedBlocksTransaction tx : txs) {
+          try {
+            ContainerInfo info = mappingService
+                .getContainer(tx.getContainerName());
+            // Find out the datanode where this TX is supposed to send to.
+            info.getPipeline().getMachines()
+                .forEach(entry -> dnTXs.addTransaction(entry, tx));
+          } catch (IOException e) {
+            LOG.warn("Container {} not found, continue to process next",
+                tx.getContainerName(), e);
+          }
+        }
+      }
+      return dnTXs;
+    }
+  }
+
+  /**
+   * A wrapper class to hold info about datanode and all deleted block
+   * transactions that will be sent to this datanode.
+   */
+  private static class DatanodeDeletedBlockTransactions {
+
+    // A list of TXs mapped to a certain datanode ID.
+    private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
+
+    DatanodeDeletedBlockTransactions() {
+      this.transactions = Maps.newHashMap();
+    }
+
+    void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
+      if (transactions.containsKey(dnID)) {
+        transactions.get(dnID).add(tx);
+      } else {
+        List<DeletedBlocksTransaction> first = Lists.newArrayList();
+        first.add(tx);
+        transactions.put(dnID, first);
+      }
+      LOG.info("Transaction added: {} <- TX({})", dnID, tx.getTxID());
+    }
+
+    Set<DatanodeID> getDatanodes() {
+      return transactions.keySet();
+    }
+
+    boolean isEmpty() {
+      return transactions.isEmpty();
+    }
+
+    boolean hasTransactions(DatanodeID dnID) {
+      return transactions.containsKey(dnID) &&
+          !transactions.get(dnID).isEmpty();
+    }
+
+    List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
+      return transactions.get(dnID);
+    }
+
+    List<String> getTransactionIDList(DatanodeID dnID) {
+      if (hasTransactions(dnID)) {
+        return transactions.get(dnID).stream()
+            .map(DeletedBlocksTransaction::getTxID)
+            .map(String::valueOf)
+            .collect(Collectors.toList());
+      } else {
+        return Collections.emptyList();
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
index c21e62c..dec3776 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/NodeManager.java
@@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
 import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
 import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
@@ -134,4 +135,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
    * @return Healthy/Stale/Dead.
    */
   NodeState getNodeState(DatanodeID id);
+
+  /**
+   * Add a {@link SCMCommand} to the command queue, which are
+   * handled by HB thread asynchronously.
+   * @param id
+   * @param command
+   */
+  default void addDatanodeCommand(DatanodeID id, SCMCommand command) {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
index bcd3a02..6e2805a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/node/SCMNodeManager.java
@@ -836,4 +836,9 @@ public class SCMNodeManager
     }
     return nodeCountMap;
   }
+
+  @Override
+  public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
+    this.commandQueue.addCommand(id, command);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
index b37a5db..198300f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
@@ -23,7 +23,22 @@ package org.apache.hadoop.utils;
 public interface BackgroundTaskResult {
 
   /**
-   *   Returns the size of entries included in this result.
+   * Returns the size of entries included in this result.
    */
   int getSize();
+
+  /**
+   * An empty task result implementation.
+   */
+  class EmptyTaskResult implements BackgroundTaskResult {
+
+    public static EmptyTaskResult newResult() {
+      return new EmptyTaskResult();
+    }
+
+    @Override
+    public int getSize() {
+      return 0;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
index a8cfa57..bb3d137 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -122,7 +122,6 @@ message ContainerReportsProto {
   required reportType type = 3;
 }
 
-
 /**
 * This message is send along with the heart beat to report datanode
 * storage utilization by SCM.
@@ -210,6 +209,7 @@ enum Type {
   registeredCommand = 3;
   sendContainerReport = 4;
   reregisterCommand = 5;
+  deleteBlocksCommand = 6;
 }
 
 /*
@@ -221,6 +221,7 @@ message SCMCommandResponseProto {
   optional SCMVersionResponseProto versionProto = 4;
   optional SendContainerReportProto sendReport = 5;
   optional SCMReregisterCmdResponseProto reregisterProto = 6;
+  optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
 }
 
 
@@ -231,6 +232,24 @@ message SCMHeartbeatResponseProto {
   repeated SCMCommandResponseProto commands = 1;
 }
 
+// HB response from SCM, contains a list of block deletion transactions.
+message SCMDeleteBlocksCmdResponseProto {
+  repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
+}
+
+// SendACK response returned by datanode to SCM, currently empty.
+message ContainerBlocksDeletionACKResponseProto {
+}
+
+// ACK message datanode sent to SCM, contains the result of
+// block deletion transactions.
+message ContainerBlocksDeletionACKProto {
+  message DeleteBlockTransactionResult {
+    required int64 txID = 1;
+    required bool success = 2;
+  }
+  repeated DeleteBlockTransactionResult results = 1;
+}
 
 /**
  * Protocol used from a datanode to StorageContainerManager.
@@ -318,6 +337,10 @@ service StorageContainerDatanodeProtocolService {
     send container reports sends the container report to SCM. This will
     return a null command as response.
   */
-  rpc sendContainerReport(ContainerReportsProto)  returns (SCMHeartbeatResponseProto);
+  rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
 
+  /**
+   * Sends the block deletion ACK to SCM.
+   */
+  rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 5d8308e..d0c363d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -17,22 +17,35 @@
  */
 package org.apache.hadoop.ozone;
 
-import static org.junit.Assert.*;
-
+import static org.junit.Assert.fail;
 import java.io.IOException;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.scm.StorageContainerManager;
+import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
 import org.apache.hadoop.scm.XceiverClientManager;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.junit.Rule;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.Map;
+import java.util.Collections;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
+import org.apache.hadoop.scm.ScmConfigKeys;
 
 import org.apache.hadoop.io.IOUtils;
 import org.junit.rules.Timeout;
 import org.mockito.Mockito;
+import org.apache.hadoop.test.GenericTestUtils;
 
 /**
  * Test class that exercises the StorageContainerManager.
@@ -149,4 +162,103 @@ public class TestStorageContainerManager {
     Assert.assertTrue(e instanceof IOException);
     Assert.assertEquals(expectedErrorMessage, e.getMessage());
   }
+
+  @Test
+  public void testBlockDeletionTransactions() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
+    conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
+    conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
+    MiniOzoneCluster cluster =
+        new MiniOzoneCluster.Builder(conf).numDataNodes(1)
+            .setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
+
+    DeletedBlockLog delLog = cluster.getStorageContainerManager()
+        .getScmBlockManager().getDeletedBlockLog();
+    Assert.assertEquals(0, delLog.getNumOfValidTransactions());
+
+    // Create 20 random names keys.
+    TestStorageContainerManagerHelper helper =
+        new TestStorageContainerManagerHelper(cluster, conf);
+    Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
+
+    // These keys will be written into a bunch of containers,
+    // gets a set of container names, verify container containerBlocks
+    // on datanodes.
+    Set<String> containerNames = new HashSet<>();
+    for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
+      entry.getValue().getKeyLocationList()
+          .forEach(loc -> containerNames.add(loc.getContainerName()));
+    }
+
+    // Total number of containerBlocks of these containers should be equal to
+    // total number of containerBlocks via creation call.
+    int totalCreatedBlocks = 0;
+    for (KsmKeyInfo info : keyLocations.values()) {
+      totalCreatedBlocks += info.getKeyLocationList().size();
+    }
+    Assert.assertTrue(totalCreatedBlocks > 0);
+    Assert.assertEquals(totalCreatedBlocks,
+        helper.getAllBlocks(containerNames).size());
+
+    // Create a deletion TX for each key.
+    Map<String, List<String>> containerBlocks = Maps.newHashMap();
+    for (KsmKeyInfo info : keyLocations.values()) {
+      List<KsmKeyLocationInfo> list = info.getKeyLocationList();
+      list.forEach(location -> {
+        if (containerBlocks.containsKey(location.getContainerName())) {
+          containerBlocks.get(location.getContainerName())
+              .add(location.getBlockID());
+        } else {
+          List<String> blks = Lists.newArrayList();
+          blks.add(location.getBlockID());
+          containerBlocks.put(location.getContainerName(), blks);
+        }
+      });
+    }
+    for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
+      delLog.addTransaction(tx.getKey(), tx.getValue());
+    }
+
+    // Verify a few TX gets created in the TX log.
+    Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
+
+    // Once TXs are written into the log, SCM starts to fetch TX
+    // entries from the log and schedule block deletions in HB interval,
+    // after sometime, all the TX should be proceed and by then
+    // the number of containerBlocks of all known containers will be
+    // empty again.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return delLog.getNumOfValidTransactions() == 0;
+      } catch (IOException e) {
+        return false;
+      }
+    }, 1000, 10000);
+    Assert.assertTrue(helper.getAllBlocks(containerNames).isEmpty());
+
+    // Continue the work, add some TXs that with known container names,
+    // but unknown block IDs.
+    for (String containerName : containerBlocks.keySet()) {
+      // Add 2 TXs per container.
+      delLog.addTransaction(containerName,
+          Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
+      delLog.addTransaction(containerName,
+          Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
+    }
+
+    // Verify a few TX gets created in the TX log.
+    Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
+
+    // These blocks cannot be found in the container, skip deleting them
+    // eventually these TX will success.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        return delLog.getFailedTransactions().size() == 0;
+      } catch (IOException e) {
+        return false;
+      }
+    }, 1000, 10000);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
new file mode 100644
index 0000000..2123d6f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManagerHelper.java
@@ -0,0 +1,180 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.web.handlers.BucketArgs;
+import org.apache.hadoop.ozone.web.handlers.KeyArgs;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.LinkedList;
+import java.util.Set;
+
+/**
+ * A helper class used by {@link TestStorageContainerManager} to generate
+ * some keys and helps to verify containers and blocks locations.
+ */
+public class TestStorageContainerManagerHelper {
+
+  private final MiniOzoneCluster cluster;
+  private final Configuration conf;
+  private final StorageHandler storageHandler;
+
+  public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
+      Configuration conf) throws IOException {
+    this.cluster = cluster;
+    this.conf = conf;
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+  }
+
+  public Map<String, KsmKeyInfo> createKeys(int numOfKeys, int keySize)
+      throws Exception {
+    Map<String, KsmKeyInfo> keyLocationMap = Maps.newHashMap();
+    String volume = "volume" + RandomStringUtils.randomNumeric(5);
+    String bucket = "bucket" + RandomStringUtils.randomNumeric(5);
+    String userName = "user" + RandomStringUtils.randomNumeric(5);
+    String adminName = "admin" + RandomStringUtils.randomNumeric(5);
+    UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
+
+    VolumeArgs createVolumeArgs = new VolumeArgs(volume, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
+    storageHandler.createVolume(createVolumeArgs);
+
+    BucketArgs bucketArgs = new BucketArgs(bucket, createVolumeArgs);
+    bucketArgs.setAddAcls(new LinkedList<>());
+    bucketArgs.setRemoveAcls(new LinkedList<>());
+    bucketArgs.setStorageType(StorageType.DISK);
+    storageHandler.createBucket(bucketArgs);
+
+    // Write 20 keys in bucket.
+    Set<String> keyNames = Sets.newHashSet();
+    KeyArgs keyArgs;
+    for (int i = 0; i < numOfKeys; i++) {
+      String keyName = RandomStringUtils.randomAlphabetic(5) + i;
+      keyNames.add(keyName);
+      keyArgs = new KeyArgs(keyName, bucketArgs);
+      keyArgs.setSize(keySize);
+      // Just for testing list keys call, so no need to write real data.
+      OutputStream stream = storageHandler.newKeyWriter(keyArgs);
+      stream.write(DFSUtil.string2Bytes(
+          RandomStringUtils.randomAlphabetic(5)));
+      stream.close();
+    }
+
+    for (String key : keyNames) {
+      KsmKeyArgs arg = new KsmKeyArgs.Builder()
+          .setVolumeName(volume)
+          .setBucketName(bucket)
+          .setKeyName(key)
+          .build();
+      KsmKeyInfo location = cluster.getKeySpaceManager()
+          .lookupKey(arg);
+      keyLocationMap.put(key, location);
+    }
+    return keyLocationMap;
+  }
+
+  public List<String> getPendingDeletionBlocks(String containerName)
+      throws IOException {
+    List<String> pendingDeletionBlocks = Lists.newArrayList();
+    MetadataStore meta = getContainerMetadata(containerName);
+    KeyPrefixFilter filter =
+        new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
+    List<Map.Entry<byte[], byte[]>> kvs = meta
+        .getRangeKVs(null, Integer.MAX_VALUE, filter);
+    kvs.forEach(entry -> {
+      String key = DFSUtil.bytes2String(entry.getKey());
+      pendingDeletionBlocks
+          .add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
+    });
+    return pendingDeletionBlocks;
+  }
+
+  public List<String> getAllBlocks(Set<String> containerNames)
+      throws IOException {
+    List<String> allBlocks = Lists.newArrayList();
+    for (String containerName : containerNames) {
+      allBlocks.addAll(getAllBlocks(containerName));
+    }
+    return allBlocks;
+  }
+
+  public List<String> getAllBlocks(String containerName) throws IOException {
+    List<String> allBlocks = Lists.newArrayList();
+    MetadataStore meta = getContainerMetadata(containerName);
+    MetadataKeyFilter filter =
+        (preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey)
+            .startsWith(OzoneConsts.DELETING_KEY_PREFIX);
+    List<Map.Entry<byte[], byte[]>> kvs =
+        meta.getRangeKVs(null, Integer.MAX_VALUE, filter);
+    kvs.forEach(entry -> {
+      String key = DFSUtil.bytes2String(entry.getKey());
+      allBlocks.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
+    });
+    return allBlocks;
+  }
+
+  private MetadataStore getContainerMetadata(String containerName)
+      throws IOException {
+    Pipeline pipeline = cluster.getStorageContainerManager()
+        .getContainer(containerName);
+    DatanodeID leadDN = pipeline.getLeader();
+    OzoneContainer containerServer =
+        getContainerServerByDatanodeID(leadDN.getDatanodeUuid());
+    ContainerData containerData = containerServer.getContainerManager()
+        .readContainer(containerName);
+    return KeyUtils.getDB(containerData, conf);
+  }
+
+  private OzoneContainer getContainerServerByDatanodeID(String dnUUID)
+      throws IOException {
+    for (DataNode dn : cluster.getDataNodes()) {
+      if (dn.getDatanodeId().getDatanodeUuid().equals(dnUUID)) {
+        return dn.getOzoneContainerManager();
+      }
+    }
+    throw new IOException("Unable to get the ozone container "
+        + "for given datanode ID " + dnUUID);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/76a156eb/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index c0bdd9e..edef2b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -25,6 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
 import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
 import org.apache.hadoop.ozone.scm.VersionInfo;
 
 import java.io.IOException;
@@ -188,6 +190,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
         .build();
   }
 
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto request) throws IOException {
+    return ContainerBlocksDeletionACKResponseProto
+        .newBuilder().getDefaultInstanceForType();
+  }
+
   public ReportState getReportState() {
     return this.reportState;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message