hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject hadoop git commit: HDFS-12235. Ozone: DeleteKey-3: KSM SCM block deletion message and ACK interactions. Contributed by Weiwei Yang and Yuanbo Liu.
Date Mon, 11 Sep 2017 06:32:15 GMT
Repository: hadoop
Updated Branches:
  refs/heads/HDFS-7240 a73531be1 -> 81f71b479


HDFS-12235. Ozone: DeleteKey-3: KSM SCM block deletion message and ACK interactions. Contributed by Weiwei Yang and Yuanbo Liu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/81f71b47
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/81f71b47
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/81f71b47

Branch: refs/heads/HDFS-7240
Commit: 81f71b47957644ae80db401acd8560fda65a156b
Parents: a73531b
Author: Weiwei Yang <wwei@apache.org>
Authored: Mon Sep 11 14:28:20 2017 +0800
Committer: Weiwei Yang <wwei@apache.org>
Committed: Mon Sep 11 14:28:20 2017 +0800

----------------------------------------------------------------------
 .../apache/hadoop/ozone/common/BlockGroup.java  |  87 +++++
 .../ozone/common/DeleteBlockGroupResult.java    |  94 ++++++
 .../apache/hadoop/ozone/ksm/KSMConfigKeys.java  |   4 +
 .../scm/protocol/ScmBlockLocationProtocol.java  |  10 +-
 ...kLocationProtocolClientSideTranslatorPB.java |  45 +--
 .../main/proto/ScmBlockLocationProtocol.proto   |  41 ++-
 .../hadoop/ozone/ksm/KSMMetadataManager.java    |  13 +
 .../ozone/ksm/KSMMetadataManagerImpl.java       |  30 +-
 .../hadoop/ozone/ksm/KeyDeletingService.java    | 132 ++++++++
 .../org/apache/hadoop/ozone/ksm/KeyManager.java |  34 ++
 .../apache/hadoop/ozone/ksm/KeyManagerImpl.java |  64 +++-
 .../hadoop/ozone/ksm/KeySpaceManager.java       |   4 +-
 ...kLocationProtocolServerSideTranslatorPB.java |  38 ++-
 .../ozone/scm/StorageContainerManager.java      |  34 +-
 .../hadoop/ozone/scm/block/BlockManager.java    |  14 +-
 .../ozone/scm/block/BlockManagerImpl.java       |  90 +++--
 .../hadoop/ozone/scm/block/DeletedBlockLog.java |  17 +
 .../ozone/scm/block/DeletedBlockLogImpl.java    |  48 ++-
 .../src/main/resources/ozone-default.xml        |  11 +
 .../ozone/scm/block/TestBlockManager.java       |   3 +-
 .../hadoop/ozone/web/client/TestKeys.java       | 335 ++++++++++++++-----
 .../hadoop/ozone/web/client/TestKeysRatis.java  |  20 +-
 22 files changed, 979 insertions(+), 189 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
new file mode 100644
index 0000000..5571f0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/BlockGroup.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.ozone.protocol.proto
+    .ScmBlockLocationProtocolProtos.KeyBlocks;
+
+import java.util.List;
+
+/**
+ * A group of blocks relations relevant, e.g belong to a certain object key.
+ */
+public final class BlockGroup {
+
+  private String groupID;
+  private List<String> blockIDs;
+  private BlockGroup(String groupID, List<String> blockIDs) {
+    this.groupID = groupID;
+    this.blockIDs = blockIDs;
+  }
+
+  public List<String> getBlockIDList() {
+    return blockIDs;
+  }
+
+  public String getGroupID() {
+    return groupID;
+  }
+
+  public KeyBlocks getProto() {
+    return KeyBlocks.newBuilder().setKey(groupID)
+        .addAllBlocks(blockIDs).build();
+  }
+
+  /**
+   * Parses a KeyBlocks proto to a group of blocks.
+   * @param proto KeyBlocks proto.
+   * @return a group of blocks.
+   */
+  public static BlockGroup getFromProto(KeyBlocks proto) {
+    return BlockGroup.newBuilder().setKeyName(proto.getKey())
+        .addAllBlockIDs(proto.getBlocksList()).build();
+  }
+
+  public static Builder newBuilder() {
+    return new Builder();
+  }
+
+  /**
+   * BlockGroup instance builder.
+   */
+  public static class Builder {
+
+    private String groupID;
+    private List<String> blockIDs;
+
+    public Builder setKeyName(String blockGroupID) {
+      this.groupID = blockGroupID;
+      return this;
+    }
+
+    public Builder addAllBlockIDs(List<String> keyBlocks) {
+      this.blockIDs = keyBlocks;
+      return this;
+    }
+
+    public BlockGroup build() {
+      return new BlockGroup(groupID, blockIDs);
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
new file mode 100644
index 0000000..f56f832
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/common/DeleteBlockGroupResult.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.common;
+
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
+import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Result to delete a group of blocks.
+ */
+public class DeleteBlockGroupResult {
+  private String objectKey;
+  private List<DeleteBlockResult> blockResultList;
+  public DeleteBlockGroupResult(String objectKey,
+      List<DeleteBlockResult> blockResultList) {
+    this.objectKey = objectKey;
+    this.blockResultList = blockResultList;
+  }
+
+  public String getObjectKey() {
+    return objectKey;
+  }
+
+  public List<DeleteBlockResult> getBlockResultList() {
+    return blockResultList;
+  }
+
+  public List<DeleteScmBlockResult> getBlockResultProtoList() {
+    List<DeleteScmBlockResult> resultProtoList =
+        new ArrayList<>(blockResultList.size());
+    for (DeleteBlockResult result : blockResultList) {
+      DeleteScmBlockResult proto = DeleteScmBlockResult.newBuilder()
+          .setKey(result.getKey())
+          .setResult(result.getResult()).build();
+      resultProtoList.add(proto);
+    }
+    return resultProtoList;
+  }
+
+  public static List<DeleteBlockResult> convertBlockResultProto(
+      List<DeleteScmBlockResult> results) {
+    List<DeleteBlockResult> protoResults = new ArrayList<>(results.size());
+    for (DeleteScmBlockResult result : results) {
+      protoResults.add(new DeleteBlockResult(result.getKey(),
+          result.getResult()));
+    }
+    return protoResults;
+  }
+
+  /**
+   * Only if all blocks are successfully deleted, this group is considered
+   * to be successfully executed.
+   *
+   * @return true if all blocks are successfully deleted, false otherwise.
+   */
+  public boolean isSuccess() {
+    for (DeleteBlockResult result : blockResultList) {
+      if (result.getResult() != Result.success) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  /**
+   * @return A list of deletion failed block IDs.
+   */
+  public List<String> getFailedBlocks() {
+    List<String> failedBlocks = blockResultList.stream()
+        .filter(result -> result.getResult() != Result.success)
+        .map(DeleteBlockResult::getKey).collect(Collectors.toList());
+    return failedBlocks;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
index e69300c..6710b0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/ozone/ksm/KSMConfigKeys.java
@@ -74,4 +74,8 @@ public final class KSMConfigKeys {
       "ozone.ksm.group.rights";
   public static final OzoneAcl.OzoneACLRights OZONE_KSM_GROUP_RIGHTS_DEFAULT =
       OzoneAcl.OzoneACLRights.READ_WRITE;
+
+  public static final String OZONE_KEY_DELETING_LIMIT_PER_TASK =
+      "ozone.key.deleting.limit.per.task";
+  public static final int OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT = 1000;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
index e944375..77d040d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocol/ScmBlockLocationProtocol.java
@@ -22,8 +22,9 @@ import java.io.IOException;
 import java.util.List;
 import java.util.Set;
 
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
 
 /**
  * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@@ -53,13 +54,14 @@ public interface ScmBlockLocationProtocol {
   AllocatedBlock allocateBlock(long size) throws IOException;
 
   /**
-   * Delete the set of keys specified.
+   * Delete blocks for a set of object keys.
    *
-   * @param keys batch of block keys to delete.
+   * @param keyBlocksInfoList Map of object key and its blocks.
    * @return list of block deletion results.
    * @throws IOException if there is any failure.
    *
    */
-  List<DeleteBlockResult> deleteBlocks(Set<String> keys) throws IOException;
+  List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
index 4a0d50b..2e323e0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java
@@ -24,24 +24,24 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
     .AllocateScmBlockRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
     .AllocateScmBlockResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
-    .DeleteScmBlocksRequestProto;
-import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
-    .DeleteScmBlocksResponseProto;
-import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
-    .DeleteScmBlockResult;
+    .DeleteScmKeyBlocksResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
     .GetScmBlockLocationsRequestProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
     .GetScmBlockLocationsResponseProto;
 import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
     .ScmLocatedBlockProto;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos
+    .KeyBlocks;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 
@@ -50,6 +50,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This class is the client-side translator to translate the requests made on
@@ -147,30 +148,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
   /**
    * Delete the set of keys specified.
    *
-   * @param keys batch of block keys to delete.
+   * @param keyBlocksInfoList batch of block keys to delete.
    * @return list of block deletion results.
    * @throws IOException if there is any failure.
    *
    */
   @Override
-  public List<DeleteBlockResult> deleteBlocks(Set<String> keys)
-      throws IOException {
-    Preconditions.checkArgument(keys != null && !keys.isEmpty(),
-        "keys to be deleted cannot be null or empty");
-    DeleteScmBlocksRequestProto request = DeleteScmBlocksRequestProto
-        .newBuilder()
-        .addAllKeys(keys)
-        .build();
-    final DeleteScmBlocksResponseProto resp;
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException {
+    List<KeyBlocks> keyBlocksProto = keyBlocksInfoList.stream()
+        .map(BlockGroup::getProto).collect(Collectors.toList());
+    DeleteScmKeyBlocksRequestProto request = DeleteScmKeyBlocksRequestProto
+        .newBuilder().addAllKeyBlocks(keyBlocksProto).build();
+
+    final DeleteScmKeyBlocksResponseProto resp;
     try {
-      resp = rpcProxy.deleteScmBlocks(NULL_RPC_CONTROLLER, request);
+      resp = rpcProxy.deleteScmKeyBlocks(NULL_RPC_CONTROLLER, request);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
-    List<DeleteBlockResult> results = new ArrayList(resp.getResultsCount());
-    for (DeleteScmBlockResult result : resp.getResultsList()) {
-      results.add(new DeleteBlockResult(result.getKey(), result.getResult()));
-    }
+    List<DeleteBlockGroupResult> results =
+        new ArrayList<>(resp.getResultsCount());
+    results.addAll(resp.getResultsList().stream().map(
+        result -> new DeleteBlockGroupResult(result.getObjectKey(),
+            DeleteBlockGroupResult
+                .convertBlockResultProto(result.getBlockResultsList())))
+        .collect(Collectors.toList()));
     return results;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
index e9c3abf..51899f9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/ScmBlockLocationProtocol.proto
@@ -64,17 +64,40 @@ message AllocateScmBlockRequestProto {
 }
 
 /**
- * keys - batch of block keys to deleted
+ * A delete key request sent by KSM to SCM, it contains
+ * multiple number of keys (and their blocks).
  */
-message DeleteScmBlocksRequestProto {
-  repeated string keys = 1;
+message DeleteScmKeyBlocksRequestProto {
+  repeated KeyBlocks keyBlocks = 1;
+}
+
+/**
+ * A object key and all its associated blocks.
+ * We need to encapsulate object key name plus the blocks in this potocol
+ * because SCM needs to response KSM with the keys it has deleted.
+ * If the response only contains blocks, it will be very expensive for
+ * KSM to figure out what keys have been deleted.
+ */
+message KeyBlocks {
+  required string key = 1;
+  repeated string blocks = 2;
+}
+
+/**
+ * A delete key response from SCM to KSM, it contains multiple child-results.
+ * Each child-result represents a key deletion result, only if all blocks of
+ * a key are successfully deleted, this key result is considered as succeed.
+ */
+message DeleteScmKeyBlocksResponseProto {
+  repeated DeleteKeyBlocksResultProto results = 1;
 }
 
 /**
- * deletedKeys - keys that are deleted successfully
+ * A key deletion result. It contains all the block deletion results.
  */
-message DeleteScmBlocksResponseProto {
-  repeated DeleteScmBlockResult results = 1;
+message DeleteKeyBlocksResultProto {
+  required string objectKey = 1;
+  repeated DeleteScmBlockResult blockResults = 2;
 }
 
 message DeleteScmBlockResult {
@@ -126,8 +149,8 @@ service ScmBlockLocationProtocolService {
       returns (AllocateScmBlockResponseProto);
 
   /**
-   * Deletes one or multiple block keys from SCM.
+   * Deletes blocks for a set of object keys from SCM.
    */
-  rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
-      returns (DeleteScmBlocksResponseProto);
+  rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
+      returns (DeleteScmKeyBlocksResponseProto);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
index 1027bd4..010de58 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManager.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
 import org.apache.hadoop.utils.BatchOperation;
@@ -206,4 +207,16 @@ public interface KSMMetadataManager {
    */
   List<KsmVolumeArgs> listVolumes(String userName, String prefix,
       String startKey, int maxKeys) throws IOException;
+
+  /**
+   * Returns a list of pending deletion key info that ups to the given count.
+   * Each entry is a {@link BlockGroup}, which contains the info about the
+   * key name and all its associated block IDs. A pending deletion key is
+   * stored with #deleting# prefix in KSM DB.
+   *
+   * @param count max number of keys to return.
+   * @return a list of {@link BlockGroup} represent keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
index e3b3db2..9413e1d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KSMMetadataManagerImpl.java
@@ -21,9 +21,8 @@ import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
-import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.*;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.exceptions.KSMException;
@@ -47,6 +46,7 @@ import java.util.Map;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
 
 import static org.apache.hadoop.ozone.OzoneConsts.DELETING_KEY_PREFIX;
 import static org.apache.hadoop.ozone.OzoneConsts.KSM_DB_NAME;
@@ -440,4 +440,28 @@ public class KSMMetadataManagerImpl implements KSMMetadataManager {
 
     return builder.build();
   }
+
+  @Override
+  public List<BlockGroup> getPendingDeletionKeys(final int count)
+      throws IOException {
+    List<BlockGroup> keyBlocksList = Lists.newArrayList();
+    final MetadataKeyFilter deletingKeyFilter =
+        new KeyPrefixFilter(DELETING_KEY_PREFIX);
+    List<Map.Entry<byte[], byte[]>> rangeResult =
+        store.getRangeKVs(null, count, deletingKeyFilter);
+    for (Map.Entry<byte[], byte[]> entry : rangeResult) {
+      KsmKeyInfo info =
+          KsmKeyInfo.getFromProtobuf(KeyInfo.parseFrom(entry.getValue()));
+      // Get block keys as a list.
+      List<String> item = info.getKeyLocationList().stream()
+          .map(KsmKeyLocationInfo::getBlockID)
+          .collect(Collectors.toList());
+      BlockGroup keyBlocks = BlockGroup.newBuilder()
+          .setKeyName(DFSUtil.bytes2String(entry.getKey()))
+          .addAllBlockIDs(item)
+          .build();
+      keyBlocksList.add(keyBlocks);
+    }
+    return keyBlocksList;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
new file mode 100644
index 0000000..293e12d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyDeletingService.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.ksm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.utils.BackgroundService;
+import org.apache.hadoop.utils.BackgroundTask;
+import org.apache.hadoop.utils.BackgroundTaskQueue;
+import org.apache.hadoop.utils.BackgroundTaskResult;
+import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK;
+import static org.apache.hadoop.ozone.ksm.KSMConfigKeys.OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT;
+
+/**
+ * This is the background service to delete keys.
+ * Scan the metadata of ksm periodically to get
+ * the keys with prefix "#deleting" and ask scm to
+ * delete metadata accordingly, if scm returns
+ * success for keys, then clean up those keys.
+ */
+public class KeyDeletingService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(KeyDeletingService.class);
+
+  // The thread pool size for key deleting service.
+  private final static int KEY_DELETING_CORE_POOL_SIZE = 2;
+
+  private final ScmBlockLocationProtocol scmClient;
+  private final KeyManager manager;
+  private final int keyLimitPerTask;
+
+  public KeyDeletingService(ScmBlockLocationProtocol scmClient,
+      KeyManager manager, int serviceInterval,
+      long serviceTimeout, Configuration conf) {
+    super("KeyDeletingService", serviceInterval, TimeUnit.MILLISECONDS,
+        KEY_DELETING_CORE_POOL_SIZE, serviceTimeout);
+    this.scmClient = scmClient;
+    this.manager = manager;
+    this.keyLimitPerTask = conf.getInt(OZONE_KEY_DELETING_LIMIT_PER_TASK,
+        OZONE_KEY_DELETING_LIMIT_PER_TASK_DEFAULT);
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+    queue.add(new KeyDeletingTask());
+    return queue;
+  }
+
+  /**
+   * A key deleting task scans KSM DB and looking for a certain number
+   * of pending-deletion keys, sends these keys along with their associated
+   * blocks to SCM for deletion. Once SCM confirms keys are deleted (once
+   * SCM persisted the blocks info in its deletedBlockLog), it removes
+   * these keys from the DB.
+   */
+  private class KeyDeletingTask implements
+      BackgroundTask<BackgroundTaskResult> {
+
+    @Override
+    public int getPriority() {
+      return 0;
+    }
+
+    @Override
+    public BackgroundTaskResult call() throws Exception {
+      try {
+        List<BlockGroup> keyBlocksList = manager
+            .getPendingDeletionKeys(keyLimitPerTask);
+        if (keyBlocksList.size() > 0) {
+          LOG.info("Found {} to-delete keys in KSM", keyBlocksList.size());
+          List<DeleteBlockGroupResult> results =
+              scmClient.deleteKeyBlocks(keyBlocksList);
+          for (DeleteBlockGroupResult result : results) {
+            if (result.isSuccess()) {
+              try {
+                // Purge key from KSM DB.
+                manager.deletePendingDeletionKey(result.getObjectKey());
+                LOG.info("Key {} deleted from KSM DB", result.getObjectKey());
+              } catch (IOException e) {
+                // if a pending deletion key is failed to delete,
+                // print a warning here and retain it in this state,
+                // so that it can be attempt to delete next time.
+                LOG.warn("Failed to delete pending-deletion key {}",
+                    result.getObjectKey(), e);
+              }
+            } else {
+              // Key deletion failed, retry in next interval.
+              LOG.warn("Key {} deletion failed because some of the blocks"
+                  + " were failed to delete, failed blocks: {}",
+                  result.getObjectKey(),
+                  String.join(",", result.getFailedBlocks()));
+            }
+          }
+          return results::size;
+        } else {
+          LOG.info("No pending deletion key found in KSM");
+        }
+      } catch (IOException e) {
+        LOG.error("Unable to get pending deletion keys, retry in"
+            + " next interval", e);
+      }
+      return EmptyTaskResult.newResult();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
index 753a564..5625cb7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.ksm;
 
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 
@@ -26,6 +27,17 @@ import java.util.List;
  * Handles key level commands.
  */
 public interface KeyManager {
+
+  /**
+   * Start key manager.
+   */
+  void start();
+
+  /**
+   * Stop key manager.
+   */
+  void stop() throws IOException;
+
   /**
    * Given the args of a key to put, return a pipeline for the key. Writes
    * the key to pipeline mapping to meta data.
@@ -89,4 +101,26 @@ public interface KeyManager {
   List<KsmKeyInfo> listKeys(String volumeName,
       String bucketName, String startKey, String keyPrefix, int maxKeys)
       throws IOException;
+
+  /**
+   * Returns a list of pending deletion key info that ups to the given count.
+   * Each entry is a {@link BlockGroup}, which contains the info about the
+   * key name and all its associated block IDs. A pending deletion key is
+   * stored with #deleting# prefix in KSM DB.
+   *
+   * @param count max number of keys to return.
+   * @return a list of {@link BlockGroup} representing keys and blocks.
+   * @throws IOException
+   */
+  List<BlockGroup> getPendingDeletionKeys(int count) throws IOException;
+
+  /**
+   * Deletes a pending deletion key by its name. This is often called when
+   * key can be safely deleted from this layer. Once called, all footprints
+   * of the key will be purged from KSM DB.
+   *
+   * @param objectKeyName object key name with #deleting# prefix.
+   * @throws IOException if specified key doesn't exist or other I/O errors.
+   */
+  void deletePendingDeletionKey(String objectKeyName) throws IOException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
index aea7b31..2bf9a33 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeyManagerImpl.java
@@ -17,7 +17,10 @@
 package org.apache.hadoop.ozone.ksm;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
 import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.OzoneConfiguration;
@@ -27,6 +30,7 @@ import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.KeyI
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.util.Time;
+import org.apache.hadoop.utils.BackgroundService;
 import org.apache.hadoop.utils.BatchOperation;
 import org.iq80.leveldb.DBException;
 import org.slf4j.Logger;
@@ -35,7 +39,12 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_KEY;
 
@@ -52,6 +61,7 @@ public class KeyManagerImpl implements KeyManager {
   private final ScmBlockLocationProtocol scmBlockClient;
   private final KSMMetadataManager metadataManager;
   private final long scmBlockSize;
+  private final BackgroundService keyDeletingService;
 
   public KeyManagerImpl(ScmBlockLocationProtocol scmBlockClient,
       KSMMetadataManager metadataManager, OzoneConfiguration conf) {
@@ -59,6 +69,24 @@ public class KeyManagerImpl implements KeyManager {
     this.metadataManager = metadataManager;
     this.scmBlockSize = conf.getLong(OZONE_SCM_BLOCK_SIZE_KEY,
         OZONE_SCM_BLOCK_SIZE_DEFAULT);
+    int svcInterval = conf.getInt(
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+    long serviceTimeout = conf.getTimeDuration(
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
+    keyDeletingService = new KeyDeletingService(
+        scmBlockClient, this, svcInterval, serviceTimeout, conf);
+  }
+
+  @Override
+  public void start() {
+    keyDeletingService.start();
+  }
+
+  @Override
+  public void stop() throws IOException {
+    keyDeletingService.shutdown();
   }
 
   @Override
@@ -181,7 +209,6 @@ public class KeyManagerImpl implements KeyManager {
   @Override
   public void deleteKey(KsmKeyArgs args) throws IOException {
     Preconditions.checkNotNull(args);
-
     metadataManager.writeLock().lock();
     String volumeName = args.getVolumeName();
     String bucketName = args.getBucketName();
@@ -223,4 +250,39 @@ public class KeyManagerImpl implements KeyManager {
       metadataManager.readLock().unlock();
     }
   }
+
+  @Override
+  public List<BlockGroup> getPendingDeletionKeys(final int count)
+      throws IOException {
+    metadataManager.readLock().lock();
+    try {
+      return metadataManager.getPendingDeletionKeys(count);
+    } finally {
+      metadataManager.readLock().unlock();
+    }
+  }
+
+  @Override
+  public void deletePendingDeletionKey(String objectKeyName)
+      throws IOException{
+    Preconditions.checkNotNull(objectKeyName);
+    if (!objectKeyName.startsWith(OzoneConsts.DELETING_KEY_PREFIX)) {
+      throw new IllegalArgumentException("Invalid key name,"
+          + " the name should be the key name with deleting prefix");
+    }
+
+    // Simply removes the entry from KSM DB.
+    metadataManager.writeLock().lock();
+    try {
+      byte[] pendingDelKey = DFSUtil.string2Bytes(objectKeyName);
+      byte[] delKeyValue = metadataManager.get(pendingDelKey);
+      if (delKeyValue == null) {
+        throw new IOException("Failed to delete key " + objectKeyName
+            + " because it is not found in DB");
+      }
+      metadataManager.delete(pendingDelKey);
+    } finally {
+      metadataManager.writeLock().unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
index 3b76e5c..bdd97d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/ksm/KeySpaceManager.java
@@ -217,6 +217,7 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
         ksmRpcAddress));
     DefaultMetricsSystem.initialize("KeySpaceManager");
     metadataManager.start();
+    keyManager.start();
     ksmRpcServer.start();
     httpServer.start();
     registerMXBean();
@@ -228,8 +229,9 @@ public class KeySpaceManager extends ServiceRuntimeInfoImpl
    */
   public void stop() {
     try {
-      ksmRpcServer.stop();
       metadataManager.stop();
+      ksmRpcServer.stop();
+      keyManager.stop();
       httpServer.stop();
       metrics.unRegister();
       unregisterMXBean();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
index e8843fc..7ea06ad 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -21,8 +21,10 @@ import com.google.common.collect.Sets;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
 import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
 import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
 import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
@@ -32,11 +34,9 @@ import org.apache.hadoop.ozone.protocol.proto
 import org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
-    .ScmBlockLocationProtocolProtos.DeleteScmBlocksRequestProto;
+    .ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
-    .ScmBlockLocationProtocolProtos.DeleteScmBlocksResponseProto;
-import static org.apache.hadoop.ozone.protocol.proto
-    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult;
+    .ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
 import org.apache.hadoop.ozone.protocol.proto
     .ScmBlockLocationProtocolProtos.GetScmBlockLocationsRequestProto;
 import org.apache.hadoop.ozone.protocol.proto
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.protocol.proto
 import java.io.IOException;
 import java.util.List;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * This class is the server-side translator that forwards requests received on
@@ -123,21 +124,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
   }
 
   @Override
-  public DeleteScmBlocksResponseProto deleteScmBlocks(
-      RpcController controller, DeleteScmBlocksRequestProto req)
+  public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
+      RpcController controller, DeleteScmKeyBlocksRequestProto req)
       throws ServiceException {
-    Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
-        req.getKeysCount());
-    for (String key : req.getKeysList()) {
-      keys.add(key);
-    }
-    DeleteScmBlocksResponseProto.Builder resp =
-        DeleteScmBlocksResponseProto.newBuilder();
+    DeleteScmKeyBlocksResponseProto.Builder resp =
+        DeleteScmKeyBlocksResponseProto.newBuilder();
     try {
-      final List<DeleteBlockResult> results = impl.deleteBlocks(keys);
-      for (DeleteBlockResult result: results) {
-        DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
-            .newBuilder().setKey(result.getKey()).setResult(result.getResult());
+      List<BlockGroup> infoList = req.getKeyBlocksList().stream()
+          .map(BlockGroup::getFromProto).collect(Collectors.toList());
+      final List<DeleteBlockGroupResult> results =
+          impl.deleteKeyBlocks(infoList);
+      for (DeleteBlockGroupResult result: results) {
+        DeleteKeyBlocksResultProto.Builder deleteResult =
+            DeleteKeyBlocksResultProto
+            .newBuilder()
+            .setObjectKey(result.getObjectKey())
+            .addAllBlockResults(result.getBlockResultProtoList());
         resp.addResults(deleteResult.build());
       }
     } catch (IOException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
index 5a97735..6677b65 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/StorageContainerManager.java
@@ -33,6 +33,8 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.client.OzoneClientUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.BlockGroup;
 import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
 import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
 import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
@@ -85,6 +87,7 @@ import org.slf4j.LoggerFactory;
 import javax.management.ObjectName;
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -838,19 +841,26 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
   }
 
   /**
-   * Delete blocks.
-   * @param keys batch of block keys to delete.
+   * Delete blocks for a set of object keys.
+   *
+   * @param keyBlocksInfoList list of block keys with object keys to delete.
    * @return deletion results.
    */
-  public List<DeleteBlockResult> deleteBlocks(final Set<String> keys) {
-    List<DeleteBlockResult> results = new LinkedList<>();
-    for (String key: keys) {
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException {
+    LOG.info("SCM is informed by KSM to delete {} blocks",
+        keyBlocksInfoList.size());
+    List<DeleteBlockGroupResult> results = new ArrayList<>();
+    for (BlockGroup keyBlocks : keyBlocksInfoList) {
       Result resultCode;
       try {
-        scmBlockManager.deleteBlock(key);
+        // We delete blocks in an atomic operation to prevent getting
+        // into state like only a partial of blocks are deleted,
+        // which will leave key in an inconsistent state.
+        scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
         resultCode = Result.success;
       } catch (SCMException scmEx) {
-        LOG.warn("Fail to delete block: {}", key, scmEx);
+        LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
         switch (scmEx.getResult()) {
         case CHILL_MODE_EXCEPTION:
           resultCode = Result.chillMode;
@@ -862,10 +872,16 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
           resultCode = Result.unknownFailure;
         }
       } catch (IOException ex) {
-        LOG.warn("Fail to delete block: {}", key, ex);
+        LOG.warn("Fail to delete blocks for object key: {}",
+            keyBlocks.getGroupID(), ex);
         resultCode = Result.unknownFailure;
       }
-      results.add(new DeleteBlockResult(key, resultCode));
+      List<DeleteBlockResult> blockResultList = new ArrayList<>();
+      for (String blockKey : keyBlocks.getBlockIDList()) {
+        blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
+      }
+      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+          blockResultList));
     }
     return results;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
index 4672b33..2693c06 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManager.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.List;
 
 /**
  *
@@ -46,11 +47,16 @@ public interface BlockManager extends Closeable {
   Pipeline getBlock(String key) throws IOException;
 
   /**
-   * Given a key of the block, delete the block.
-   * @param key - key of the block.
-   * @throws IOException
+   * Deletes a list of blocks in an atomic operation. Internally, SCM
+   * writes these blocks into a {@link DeletedBlockLog} and deletes them
+   * from SCM DB. If this is successful, given blocks are entering pending
+   * deletion state and becomes invisible from SCM namespace.
+   *
+   * @param blockIDs block IDs. This is often the list of blocks of
+   *                 a particular object key.
+   * @throws IOException if exception happens, non of the blocks is deleted.
    */
-  void deleteBlock(String key) throws IOException;
+  void deleteBlocks(List<String> blockIDs) throws IOException;
 
   /**
    * @return the block deletion transaction log maintained by SCM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index d520246..1a2dc14 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -45,6 +45,7 @@ import java.io.File;
 import java.io.IOException;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
@@ -382,8 +383,8 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
         }
         // now we should have some candidates in ALLOCATE state
         if (candidates.size() == 0) {
-          throw new SCMException("Fail to find any container to allocate block " +
-              "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
+          throw new SCMException("Fail to find any container to allocate block "
+              + "of size " + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SAPCE);
         }
       }
 
@@ -475,35 +476,84 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   }
 
   /**
-   * Given a block key, delete a block.
-   * @param key - block key assigned by SCM.
-   * @throws IOException
+   * Deletes a list of blocks in an atomic operation. Internally, SCM
+   * writes these blocks into a {@link DeletedBlockLog} and deletes them
+   * from SCM DB. If this is successful, given blocks are entering pending
+   * deletion state and becomes invisible from SCM namespace.
+   *
+   * @param blockIDs block IDs. This is often the list of blocks of
+   *                 a particular object key.
+   * @throws IOException if exception happens, non of the blocks is deleted.
    */
   @Override
-  public void deleteBlock(final String key) throws IOException {
+  public void deleteBlocks(List<String> blockIDs) throws IOException {
     if (!nodeManager.isOutOfNodeChillMode()) {
       throw new SCMException("Unable to delete block while in chill mode",
           CHILL_MODE_EXCEPTION);
     }
 
     lock.lock();
+    LOG.info("Deleting blocks {}", String.join(",", blockIDs));
+    Map<String, List<String>> containerBlocks = new HashMap<>();
+    BatchOperation batch = new BatchOperation();
+    BatchOperation rollbackBatch = new BatchOperation();
+    // TODO: track the block size info so that we can reclaim the container
+    // TODO: used space when the block is deleted.
     try {
-      byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
-      if (containerBytes == null) {
-        throw new SCMException("Specified block key does not exist. key : " +
-            key, FAILED_TO_FIND_BLOCK);
+      for (String blockKey : blockIDs) {
+        byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey);
+        byte[] containerBytes = blockStore.get(blockKeyBytes);
+        if (containerBytes == null) {
+          throw new SCMException(
+              "Specified block key does not exist. key : " + blockKey,
+              FAILED_TO_FIND_BLOCK);
+        }
+        batch.delete(blockKeyBytes);
+        rollbackBatch.put(blockKeyBytes, containerBytes);
+
+        // Merge blocks to a container to blocks mapping,
+        // prepare to persist this info to the deletedBlocksLog.
+        String containerName = DFSUtil.bytes2String(containerBytes);
+        if (containerBlocks.containsKey(containerName)) {
+          containerBlocks.get(containerName).add(blockKey);
+        } else {
+          List<String> item = new ArrayList<>();
+          item.add(blockKey);
+          containerBlocks.put(containerName, item);
+        }
       }
-      // TODO: track the block size info so that we can reclaim the container
-      // TODO: used space when the block is deleted.
-      BatchOperation batch = new BatchOperation();
-      String deletedKeyName = getDeletedKeyName(key);
-      // Add a tombstone for the deleted key
-      batch.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
-      // Delete the block key
-      batch.delete(DFSUtil.string2Bytes(key));
+
+      // We update SCM DB first, so if this step fails, we end up here,
+      // nothing gets into the delLog so no blocks will be accidentally
+      // removed. If we write the log first, once log is written, the
+      // async deleting service will start to scan and might be picking
+      // up some blocks to do real deletions, that might cause data loss.
       blockStore.writeBatch(batch);
-      // TODO: Add async tombstone clean thread to send delete command to
-      // datanodes in the pipeline to clean up the blocks from containers.
+      try {
+        deletedBlockLog.addTransactions(containerBlocks);
+      } catch (IOException e) {
+        try {
+          // If delLog update is failed, we need to rollback the changes.
+          blockStore.writeBatch(rollbackBatch);
+        } catch (IOException rollbackException) {
+          // This is a corner case. AddTX fails and rollback also fails,
+          // this will leave these blocks in inconsistent state. They were
+          // moved to pending deletion state in SCM DB but were not written
+          // into delLog so real deletions would not be done. Blocks become
+          // to be invisible from namespace but actual data are not removed.
+          // We log an error here so admin can manually check and fix such
+          // errors.
+          LOG.error("Blocks might be in inconsistent state because"
+                  + " they were moved to pending deletion state in SCM DB but"
+                  + " not written into delLog. Admin can manually add them"
+                  + " into delLog for deletions. Inconsistent block list: {}",
+              String.join(",", blockIDs), e);
+          throw rollbackException;
+        }
+        throw new IOException("Skip writing the deleted blocks info to"
+            + " the delLog because addTransaction fails. Batch skipped: "
+            + String.join(",", blockIDs), e);
+      }
       // TODO: Container report handling of the deleted blocks:
       // Remove tombstone and update open container usage.
       // We will revisit this when the closed container replication is done.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
index 9e268a6..bcbbe15 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLog.java
@@ -23,6 +23,7 @@ import org.apache.hadoop.ozone.protocol.proto
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.List;
+import java.util.Map;
 
 /**
  * The DeletedBlockLog is a persisted log in SCM to keep tracking
@@ -87,6 +88,22 @@ public interface DeletedBlockLog extends Closeable {
       throws IOException;
 
   /**
+   * Creates block deletion transactions for a set of containers,
+   * add into the log and persist them atomically. An object key
+   * might be stored in multiple containers and multiple blocks,
+   * this API ensures that these updates are done in atomic manner
+   * so if any of them fails, the entire operation fails without
+   * any updates to the log. Note, this doesn't mean to create only
+   * one transaction, it creates multiple transactions (depends on the
+   * number of containers) together (on success) or non (on failure).
+   *
+   * @param containerBlocksMap a map of containerBlocks.
+   * @throws IOException
+   */
+  void addTransactions(Map<String, List<String>> containerBlocksMap)
+      throws IOException;
+
+  /**
    * Returns the total number of valid transactions. A transaction is
    * considered to be valid as long as its count is in range [0, MAX_RETRY].
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
index 738157d..e7e92d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/DeletedBlockLogImpl.java
@@ -37,6 +37,7 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -209,6 +210,16 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     }
   }
 
+  private DeletedBlocksTransaction constructNewTransaction(long txID,
+      String containerName, List<String> blocks) {
+    return DeletedBlocksTransaction.newBuilder()
+        .setTxID(txID)
+        .setContainerName(containerName)
+        .addAllBlockID(blocks)
+        .setCount(0)
+        .build();
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -244,12 +255,8 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     BatchOperation batch = new BatchOperation();
     lock.lock();
     try {
-      DeletedBlocksTransaction tx = DeletedBlocksTransaction.newBuilder()
-          .setTxID(lastTxID + 1)
-          .setContainerName(containerName)
-          .addAllBlockID(blocks)
-          .setCount(0)
-          .build();
+      DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
+          containerName, blocks);
       byte[] key = Longs.toByteArray(lastTxID + 1);
 
       batch.put(key, tx.toByteArray());
@@ -284,6 +291,35 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     }
   }
 
+  /**
+   * {@inheritDoc}
+   *
+   * @param containerBlocksMap a map of containerBlocks.
+   * @throws IOException
+   */
+  @Override
+  public void addTransactions(Map<String, List<String>> containerBlocksMap)
+      throws IOException {
+    BatchOperation batch = new BatchOperation();
+    lock.lock();
+    try {
+      long currentLatestID = lastTxID;
+      for (Map.Entry<String, List<String>> entry :
+          containerBlocksMap.entrySet()) {
+        currentLatestID += 1;
+        byte[] key = Longs.toByteArray(currentLatestID);
+        DeletedBlocksTransaction tx = constructNewTransaction(currentLatestID,
+            entry.getKey(), entry.getValue());
+        batch.put(key, tx.toByteArray());
+      }
+      lastTxID = currentLatestID;
+      batch.put(LATEST_TXID, Longs.toByteArray(lastTxID));
+      deletedStore.writeBatch(batch);
+    } finally {
+      lock.unlock();
+    }
+  }
+
   @Override
   public void close() throws IOException {
     if (deletedStore != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
index 1ec574a..dcbf82a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/ozone-default.xml
@@ -432,6 +432,17 @@
   </property>
 
   <property>
+    <name>ozone.key.deleting.limit.per.task</name>
+    <value>1000</value>
+    <description>
+      Maximum number of keys to be scanned by key deleting service per
+      time interval in KSM. Those keys are sent to delete metadata and
+      generate transactions in SCM for next async deletion between SCM
+      and DataNode.
+    </description>
+  </property>
+
+  <property>
     <name>dfs.container.ipc</name>
     <value>50011</value>
     <description>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java
index 78daad4..483b0c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/scm/block/TestBlockManager.java
@@ -37,6 +37,7 @@ import org.junit.rules.ExpectedException;
 import java.io.File;
 import java.io.IOException;
 import java.nio.file.Paths;
+import java.util.Collections;
 import java.util.UUID;
 
 import static org.apache.hadoop.ozone.OzoneConsts.GB;
@@ -104,7 +105,7 @@ public class TestBlockManager {
   public void testDeleteBlock() throws Exception {
     AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
     Assert.assertNotNull(block);
-    blockManager.deleteBlock(block.getKey());
+    blockManager.deleteBlocks(Collections.singletonList(block.getKey()));
 
     // Deleted block can not be retrieved
     thrown.expectMessage("Specified block key does not exist.");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
index 5e07fa5..81a89a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeys.java
@@ -17,15 +17,31 @@
  */
 package org.apache.hadoop.ozone.web.client;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConfiguration;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.helpers.KeyData;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.ksm.KeySpaceManager;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmVolumeArgs;
+import org.apache.hadoop.ozone.ksm.helpers.KsmBucketInfo;
+import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
 import org.apache.hadoop.ozone.protocol.proto.KeySpaceManagerProtocolProtos.Status;
 import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
@@ -48,8 +64,9 @@ import java.io.IOException;
 import java.net.URISyntaxException;
 import java.nio.file.Path;
 import java.nio.file.Paths;
-import java.text.ParseException;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.Random;
 
 import static org.junit.Assert.assertEquals;
@@ -77,6 +94,9 @@ public class TestKeys {
   public static void init() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
 
+    // Set short block deleting service interval to speed up deletions.
+    conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
+
     path = GenericTestUtils.getTempPath(TestKeys.class.getSimpleName());
     Logger.getLogger("log4j.logger.org.apache.http").setLevel(Level.DEBUG);
 
@@ -104,12 +124,12 @@ public class TestKeys {
    *
    * @return File.
    */
-  static File createRandomDataFile(String dir, String fileName, long size) {
+  static File createRandomDataFile(String dir, String fileName, long size)
+      throws IOException {
     File tmpDir = new File(dir);
-    tmpDir.mkdirs();
+    FileUtils.forceMkdir(tmpDir);
     File tmpFile = new File(tmpDir, fileName);
-    try {
-      FileOutputStream randFile = new FileOutputStream(tmpFile);
+    try (FileOutputStream randFile = new FileOutputStream(tmpFile)) {
       Random r = new Random();
       for (int x = 0; x < size; x++) {
         char c = (char) (r.nextInt(26) + 'a');
@@ -176,8 +196,7 @@ public class TestKeys {
      * @return Returns the name of the new key that was created.
      * @throws OzoneException
      */
-    private String putKey() throws
-        OzoneException {
+    private KsmKeyArgs putKey() throws Exception {
       String volumeName = OzoneUtils.getRequestID().toLowerCase();
       client.setUserAuth("hdfs");
 
@@ -188,16 +207,21 @@ public class TestKeys {
       bucket = vol.createBucket(bucketName, acls, StorageType.DEFAULT);
 
       String fileName = OzoneUtils.getRequestID().toLowerCase();
+
       file = createRandomDataFile(dir, fileName, 1024);
 
       bucket.putKey(keyName, file);
-      return keyName;
+      return new KsmKeyArgs.Builder()
+          .setKeyName(keyName)
+          .setVolumeName(volumeName)
+          .setBucketName(bucketName)
+          .setDataSize(1024)
+          .build();
     }
-
   }
 
   @Test
-  public void testPutKey() throws OzoneException {
+  public void testPutKey() throws Exception {
     // Test non-delimited keys
     runTestPutKey(new PutHelper(ozoneRestClient, path));
     // Test key delimited by a random delimiter
@@ -206,7 +230,7 @@ public class TestKeys {
         getMultiPartKey(delimiter)));
   }
 
-  static void runTestPutKey(PutHelper helper) throws OzoneException {
+  static void runTestPutKey(PutHelper helper) throws Exception {
     final OzoneRestClient client = helper.client;
     helper.putKey();
     assertNotNull(helper.getBucket());
@@ -254,8 +278,7 @@ public class TestKeys {
   }
 
   @Test
-  public void testPutAndGetKeyWithDnRestart()
-      throws OzoneException, IOException, URISyntaxException {
+  public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(
         new PutHelper(ozoneRestClient, path), ozoneCluster);
     String delimiter = RandomStringUtils.randomAscii(1);
@@ -265,9 +288,8 @@ public class TestKeys {
   }
 
   static void runTestPutAndGetKeyWithDnRestart(
-      PutHelper helper, MiniOzoneCluster cluster)
-      throws OzoneException, IOException, URISyntaxException {
-    String keyName = helper.putKey();
+      PutHelper helper, MiniOzoneCluster cluster) throws Exception {
+    String keyName = helper.putKey().getKeyName();
     assertNotNull(helper.getBucket());
     assertNotNull(helper.getFile());
 
@@ -281,37 +303,35 @@ public class TestKeys {
 
     helper.getBucket().getKey(keyName, newPath);
 
-    FileInputStream original = new FileInputStream(helper.getFile());
-    FileInputStream downloaded = new FileInputStream(newPath.toFile());
-
-
-    String originalHash = DigestUtils.sha256Hex(original);
-    String downloadedHash = DigestUtils.sha256Hex(downloaded);
-
-    assertEquals(
-        "Sha256 does not match between original file and downloaded file.",
-        originalHash, downloadedHash);
+    try (
+        FileInputStream original = new FileInputStream(helper.getFile());
+        FileInputStream downloaded = new FileInputStream(newPath.toFile())) {
+      String originalHash = DigestUtils.sha256Hex(original);
+      String downloadedHash = DigestUtils.sha256Hex(downloaded);
+      assertEquals(
+          "Sha256 does not match between original file and downloaded file.",
+          originalHash, downloadedHash);
+    }
   }
 
   @Test
-  public void testPutAndGetKey() throws OzoneException, IOException {
+  public void testPutAndGetKey() throws Exception {
     runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAscii(1);
     runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
         getMultiPartKey(delimiter)));
   }
 
-  static void runTestPutAndGetKey(PutHelper helper)
-      throws OzoneException, IOException {
+  static void runTestPutAndGetKey(PutHelper helper) throws Exception {
     final OzoneRestClient client = helper.client;
 
-    String keyName = helper.putKey();
+    String keyName = helper.putKey().getKeyName();
     assertNotNull(helper.getBucket());
     assertNotNull(helper.getFile());
 
-    final String newFileName1 =  helper.dir + "/"
+    final String newFileName1 = helper.dir + "/"
         + OzoneUtils.getRequestID().toLowerCase();
-    final String newFileName2 =  helper.dir + "/"
+    final String newFileName2 = helper.dir + "/"
         + OzoneUtils.getRequestID().toLowerCase();
 
     Path newPath1 = Paths.get(newFileName1);
@@ -322,54 +342,51 @@ public class TestKeys {
     client.getKey(helper.getVol().getVolumeName(),
         helper.getBucket().getBucketName(), keyName, newPath2);
 
-    FileInputStream original = new FileInputStream(helper.getFile());
-    FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
-    FileInputStream downloaded2 = new FileInputStream(newPath1.toFile());
-
-    String originalHash = DigestUtils.sha256Hex(original);
-    String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
-    String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
-
-    assertEquals(
-        "Sha256 does not match between original file and downloaded file.",
-        originalHash, downloadedHash1);
-    assertEquals(
-        "Sha256 does not match between original file and downloaded file.",
-        originalHash, downloadedHash2);
-
-    // test new get key with invalid volume/bucket name
-    try {
-      client.getKey("invalid-volume",
-          helper.getBucket().getBucketName(), keyName, newPath1);
-      fail("Get key should have thrown "
-          + "when using invalid volume name.");
-    } catch (OzoneException e) {
-      GenericTestUtils.assertExceptionContains(
-          Status.KEY_NOT_FOUND.toString(), e);
-    }
+    try (FileInputStream original = new FileInputStream(helper.getFile());
+        FileInputStream downloaded1 = new FileInputStream(newPath1.toFile());
+        FileInputStream downloaded2 = new FileInputStream(newPath1.toFile())) {
+      String originalHash = DigestUtils.sha256Hex(original);
+      String downloadedHash1 = DigestUtils.sha256Hex(downloaded1);
+      String downloadedHash2 = DigestUtils.sha256Hex(downloaded2);
+
+      assertEquals(
+          "Sha256 does not match between original file and downloaded file.",
+          originalHash, downloadedHash1);
+      assertEquals(
+          "Sha256 does not match between original file and downloaded file.",
+          originalHash, downloadedHash2);
+
+      // test new get key with invalid volume/bucket name
+      try {
+        client.getKey("invalid-volume", helper.getBucket().getBucketName(),
+            keyName, newPath1);
+        fail("Get key should have thrown " + "when using invalid volume name.");
+      } catch (OzoneException e) {
+        GenericTestUtils
+            .assertExceptionContains(Status.KEY_NOT_FOUND.toString(), e);
+      }
 
-    try {
-      client.getKey(helper.getVol().getVolumeName(),
-          "invalid-bucket", keyName, newPath1);
-      fail("Get key should have thrown "
-          + "when using invalid bucket name.");
-    } catch (OzoneException e) {
-      GenericTestUtils.assertExceptionContains(
-          Status.KEY_NOT_FOUND.toString(), e);
+      try {
+        client.getKey(helper.getVol().getVolumeName(), "invalid-bucket",
+            keyName, newPath1);
+        fail("Get key should have thrown " + "when using invalid bucket name.");
+      } catch (OzoneException e) {
+        GenericTestUtils.assertExceptionContains(
+            Status.KEY_NOT_FOUND.toString(), e);
+      }
     }
   }
 
   @Test
-  public void testPutAndDeleteKey() throws OzoneException, IOException {
+  public void testPutAndDeleteKey() throws Exception {
     runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAscii(1);
     runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
         getMultiPartKey(delimiter)));
   }
 
-  static void runTestPutAndDeleteKey(PutHelper helper)
-      throws OzoneException, IOException {
-    String keyName = helper.putKey();
+  static void runTestPutAndDeleteKey(PutHelper helper) throws Exception {
+    String keyName = helper.putKey().getKeyName();
     assertNotNull(helper.getBucket());
     assertNotNull(helper.getFile());
     helper.getBucket().deleteKey(keyName);
@@ -384,16 +401,14 @@ public class TestKeys {
   }
 
   @Test
-  public void testPutAndListKey()
-      throws OzoneException, IOException, ParseException {
+  public void testPutAndListKey() throws Exception {
     runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAscii(1);
     runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
         getMultiPartKey(delimiter)));
   }
 
-  static void runTestPutAndListKey(PutHelper helper)
-      throws OzoneException, IOException, ParseException {
+  static void runTestPutAndListKey(PutHelper helper) throws Exception {
     final OzoneRestClient client = helper.client;
     helper.putKey();
     assertNotNull(helper.getBucket());
@@ -473,17 +488,15 @@ public class TestKeys {
   }
 
   @Test
-  public void testGetKeyInfo()
-      throws OzoneException, IOException, ParseException {
+  public void testGetKeyInfo() throws Exception {
     runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAscii(1);
     runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,
         getMultiPartKey(delimiter)));
   }
 
-  static void runTestGetKeyInfo(PutHelper helper)
-      throws OzoneException, ParseException {
-    String keyName = helper.putKey();
+  static void runTestGetKeyInfo(PutHelper helper) throws Exception {
+    String keyName = helper.putKey().getKeyName();
     assertNotNull(helper.getBucket());
     assertNotNull(helper.getFile());
 
@@ -500,4 +513,170 @@ public class TestKeys {
         (OzoneUtils.formatDate(keyInfo.getObjectInfo().getModifiedOn())
             / 1000) >= (currentTime / 1000));
   }
+
+  // Volume, bucket, keys info that helps for test create/delete keys.
+  private static class BucketKeys {
+
+    private Map<Pair<String, String>, List<String>> buckets;
+
+    BucketKeys() {
+      buckets = Maps.newHashMap();
+    }
+
+    void addKey(String volume, String bucket, String key) {
+      // check if this bucket exists
+      for (Map.Entry<Pair<String, String>, List<String>> entry :
+          buckets.entrySet()) {
+        if (entry.getKey().getValue().equals(bucket)) {
+          entry.getValue().add(key);
+          return;
+        }
+      }
+
+      // bucket not exist
+      Pair<String, String> newBucket = new ImmutablePair(volume, bucket);
+      List<String> keyList = Lists.newArrayList();
+      keyList.add(key);
+      buckets.put(newBucket, keyList);
+    }
+
+    Set<Pair<String, String>> getAllBuckets() {
+      return buckets.keySet();
+    }
+
+    List<String> getBucketKeys(String bucketName) {
+      for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
+          .entrySet()) {
+        if (entry.getKey().getValue().equals(bucketName)) {
+          return entry.getValue();
+        }
+      }
+      return Lists.newArrayList();
+    }
+
+    int totalNumOfKeys() {
+      int count = 0;
+      for (Map.Entry<Pair<String, String>, List<String>> entry : buckets
+          .entrySet()) {
+        count += entry.getValue().size();
+      }
+      return count;
+    }
+  }
+
+  private int countKsmKeys(KeySpaceManager ksm) throws IOException {
+    int totalCount = 0;
+    List<KsmVolumeArgs> volumes =
+        ksm.listAllVolumes(null, null, Integer.MAX_VALUE);
+    for (KsmVolumeArgs volume : volumes) {
+      List<KsmBucketInfo> buckets =
+          ksm.listBuckets(volume.getVolume(), null, null, Integer.MAX_VALUE);
+      for (KsmBucketInfo bucket : buckets) {
+        List<KsmKeyInfo> keys = ksm.listKeys(bucket.getVolumeName(),
+            bucket.getBucketName(), null, null, Integer.MAX_VALUE);
+        totalCount += keys.size();
+      }
+    }
+    return totalCount;
+  }
+
+  @Test
+  public void testDeleteKey() throws Exception {
+    KeySpaceManager ksm = ozoneCluster.getKeySpaceManager();
+    // To avoid interference from other test cases,
+    // we collect number of existing keys at the beginning
+    int numOfExistedKeys = countKsmKeys(ksm);
+
+    // Keep tracking bucket keys info while creating them
+    PutHelper helper = new PutHelper(ozoneRestClient, path);
+    BucketKeys bucketKeys = new BucketKeys();
+    for (int i = 0; i < 20; i++) {
+      KsmKeyArgs keyArgs = helper.putKey();
+      bucketKeys.addKey(keyArgs.getVolumeName(), keyArgs.getBucketName(),
+          keyArgs.getKeyName());
+    }
+
+    // There should be 20 keys in the buckets we just created.
+    Assert.assertEquals(20, bucketKeys.totalNumOfKeys());
+
+    int numOfCreatedKeys = 0;
+    OzoneContainer cm = ozoneCluster.getDataNodes().get(0)
+        .getOzoneContainerManager();
+
+    // Expected to delete chunk file list.
+    List<File> expectedChunkFiles = Lists.newArrayList();
+    // Iterate over all buckets, and list all keys in each bucket,
+    // count the total number of created keys.
+    Set<Pair<String, String>> buckets = bucketKeys.getAllBuckets();
+    for (Pair<String, String> buk : buckets) {
+      List<KsmKeyInfo> createdKeys =
+          ksm.listKeys(buk.getKey(), buk.getValue(), null, null, 20);
+
+      // Memorize chunks that has been created,
+      // so we can verify actual deletions at DN side later.
+      for (KsmKeyInfo keyInfo : createdKeys) {
+        List<KsmKeyLocationInfo> locations = keyInfo.getKeyLocationList();
+        for (KsmKeyLocationInfo location : locations) {
+          String containerName = location.getContainerName();
+          KeyData keyData = new KeyData(containerName, location.getBlockID());
+          KeyData blockInfo = cm.getContainerManager()
+              .getKeyManager().getKey(keyData);
+          ContainerData containerData = cm.getContainerManager()
+              .readContainer(containerName);
+          File dataDir = ContainerUtils
+              .getDataDirectory(containerData).toFile();
+          for (ContainerProtos.ChunkInfo chunkInfo : blockInfo.getChunks()) {
+            File chunkFile = dataDir.toPath()
+                .resolve(chunkInfo.getChunkName()).toFile();
+            System.out.println("Chunk File created: "
+                + chunkFile.getAbsolutePath());
+            Assert.assertTrue(chunkFile.exists());
+            expectedChunkFiles.add(chunkFile);
+          }
+        }
+      }
+      numOfCreatedKeys += createdKeys.size();
+    }
+
+    // Ensure all keys are created.
+    Assert.assertEquals(20, numOfCreatedKeys);
+
+    // Ensure all keys are visible from KSM.
+    // Total number should be numOfCreated + numOfExisted
+    Assert.assertEquals(20 + numOfExistedKeys, countKsmKeys(ksm));
+
+    // Delete 10 keys
+    int delCount = 20;
+    Set<Pair<String, String>> allBuckets = bucketKeys.getAllBuckets();
+    for (Pair<String, String> bucketInfo : allBuckets) {
+      List<String> bks = bucketKeys.getBucketKeys(bucketInfo.getValue());
+      for (String keyName : bks) {
+        if (delCount > 0) {
+          KsmKeyArgs arg =
+              new KsmKeyArgs.Builder().setVolumeName(bucketInfo.getKey())
+                  .setBucketName(bucketInfo.getValue()).setKeyName(keyName)
+                  .build();
+          ksm.deleteKey(arg);
+          delCount--;
+        }
+      }
+    }
+
+    // It should be pretty quick that keys are removed from KSM namespace,
+    // because actual deletion happens in async mode.
+    GenericTestUtils.waitFor(() -> {
+      try {
+        int num = countKsmKeys(ksm);
+        return num == (numOfExistedKeys);
+      } catch (IOException e) {
+        return false;
+      }
+    }, 1000, 10000);
+
+    // It might take a while until all blocks are actually deleted,
+    // verify all chunk files created earlier are removed from disk.
+    GenericTestUtils.waitFor(
+        () -> expectedChunkFiles.stream().allMatch(file -> !file.exists()),
+        1000, 60000);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/81f71b47/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
index e50ce7d..2c4ac1c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/ozone/web/client/TestKeysRatis.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.web.client;
 import org.apache.commons.lang.RandomStringUtils;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.RatisTestHelper;
-import org.apache.hadoop.ozone.web.exceptions.OzoneException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -28,10 +27,6 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.Timeout;
 
-import java.io.IOException;
-import java.net.URISyntaxException;
-import java.text.ParseException;
-
 import static org.apache.hadoop.ozone.web.client.TestKeys.*;
 
 /** The same as {@link TestKeys} except that this test is Ratis enabled. */
@@ -59,7 +54,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testPutKey() throws OzoneException {
+  public void testPutKey() throws Exception {
     runTestPutKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
     runTestPutKey(new PutHelper(ozoneRestClient, path,
@@ -67,8 +62,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testPutAndGetKeyWithDnRestart()
-      throws OzoneException, IOException, URISyntaxException {
+  public void testPutAndGetKeyWithDnRestart() throws Exception {
     runTestPutAndGetKeyWithDnRestart(
         new PutHelper(ozoneRestClient, path), suite.getCluster());
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
@@ -78,7 +72,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testPutAndGetKey() throws OzoneException, IOException {
+  public void testPutAndGetKey() throws Exception {
     runTestPutAndGetKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
     runTestPutAndGetKey(new PutHelper(ozoneRestClient, path,
@@ -86,7 +80,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testPutAndDeleteKey() throws OzoneException, IOException {
+  public void testPutAndDeleteKey() throws Exception  {
     runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
     runTestPutAndDeleteKey(new PutHelper(ozoneRestClient, path,
@@ -94,8 +88,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testPutAndListKey()
-      throws OzoneException, IOException, ParseException {
+  public void testPutAndListKey() throws Exception {
     runTestPutAndListKey(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
     runTestPutAndListKey(new PutHelper(ozoneRestClient, path,
@@ -103,8 +96,7 @@ public class TestKeysRatis {
   }
 
   @Test
-  public void testGetKeyInfo()
-      throws OzoneException, IOException, ParseException {
+  public void testGetKeyInfo() throws Exception {
     runTestGetKeyInfo(new PutHelper(ozoneRestClient, path));
     String delimiter = RandomStringUtils.randomAlphanumeric(1);
     runTestGetKeyInfo(new PutHelper(ozoneRestClient, path,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message