hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [5/7] hadoop git commit: HDDS-1. Remove SCM Block DB. Contributed by Xiaoyu Yao.
Date Mon, 07 May 2018 22:00:02 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
index cf6bf12..f920ded 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/KeyManagerImpl.java
@@ -19,20 +19,18 @@
 package org.apache.hadoop.ozone.container.common.impl;
 
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
 import org.apache.hadoop.ozone.container.common.interfaces.KeyManager;
 import org.apache.hadoop.ozone.container.common.utils.ContainerCache;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -73,22 +71,21 @@ public class KeyManagerImpl implements KeyManager {
    * {@inheritDoc}
    */
   @Override
-  public void putKey(Pipeline pipeline, KeyData data) throws IOException {
+  public void putKey(KeyData data) throws IOException {
+    Preconditions.checkNotNull(data, "KeyData cannot be null for put operation.");
+    Preconditions.checkState(data.getContainerID() >= 0, "Container ID cannot be negative");
     containerManager.readLock();
     try {
       // We are not locking the key manager since LevelDb serializes all actions
       // against a single DB. We rely on DB level locking to avoid conflicts.
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      ContainerData cData = containerManager.readContainer(containerName);
+      ContainerData cData = containerManager.readContainer(
+          data.getContainerID());
       MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
       // Should never fail.
       Preconditions.checkNotNull(db, "DB cannot be null here");
-      db.put(data.getKeyName().getBytes(KeyUtils.ENCODING), data
+      db.put(Longs.toByteArray(data.getLocalID()), data
           .getProtoBufMessage().toByteArray());
     } finally {
       containerManager.readUnlock();
@@ -103,17 +100,17 @@ public class KeyManagerImpl implements KeyManager {
     containerManager.readLock();
     try {
       Preconditions.checkNotNull(data, "Key data cannot be null");
-      Preconditions.checkNotNull(data.getContainerName(),
+      Preconditions.checkNotNull(data.getContainerID(),
           "Container name cannot be null");
       ContainerData cData = containerManager.readContainer(data
-          .getContainerName());
+          .getContainerID());
       MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
       // Should never fail.
       Preconditions.checkNotNull(db, "DB cannot be null here");
 
-      byte[] kData = db.get(data.getKeyName().getBytes(KeyUtils.ENCODING));
+      byte[] kData = db.get(Longs.toByteArray(data.getLocalID()));
       if (kData == null) {
         throw new StorageContainerException("Unable to find the key.",
             NO_SUCH_KEY);
@@ -130,15 +127,19 @@ public class KeyManagerImpl implements KeyManager {
    * {@inheritDoc}
    */
   @Override
-  public void deleteKey(Pipeline pipeline, String keyName)
+  public void deleteKey(BlockID blockID)
       throws IOException {
+    Preconditions.checkNotNull(blockID, "block ID cannot be null.");
+    Preconditions.checkState(blockID.getContainerID() >= 0,
+        "Container ID cannot be negative.");
+    Preconditions.checkState(blockID.getLocalID() >= 0,
+        "Local ID cannot be negative.");
+
     containerManager.readLock();
     try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      ContainerData cData = containerManager.readContainer(containerName);
+
+      ContainerData cData = containerManager
+          .readContainer(blockID.getContainerID());
       MetadataStore db = KeyUtils.getDB(cData, conf);
 
       // This is a post condition that acts as a hint to the user.
@@ -149,12 +150,13 @@ public class KeyManagerImpl implements KeyManager {
       // to delete a key which might have just gotten inserted after
       // the get check.
 
-      byte[] kData = db.get(keyName.getBytes(KeyUtils.ENCODING));
+      byte[] kKey = Longs.toByteArray(blockID.getLocalID());
+      byte[] kData = db.get(kKey);
       if (kData == null) {
         throw new StorageContainerException("Unable to find the key.",
             NO_SUCH_KEY);
       }
-      db.delete(keyName.getBytes(KeyUtils.ENCODING));
+      db.delete(kKey);
     } finally {
       containerManager.readUnlock();
     }
@@ -165,26 +167,22 @@ public class KeyManagerImpl implements KeyManager {
    */
   @Override
   public List<KeyData> listKey(
-      Pipeline pipeline, String prefix, String startKey, int count)
+      long containerID, long startLocalID, int count)
       throws IOException {
-    Preconditions.checkNotNull(pipeline,
-        "Pipeline cannot be null.");
+    Preconditions.checkState(containerID >= 0, "Container ID cannot be negative");
+    Preconditions.checkState(startLocalID >= 0, "startLocal ID cannot be negative");
     Preconditions.checkArgument(count > 0,
         "Count must be a positive number.");
-    ContainerData cData = containerManager.readContainer(pipeline
-        .getContainerName());
+    ContainerData cData = containerManager.readContainer(containerID);
     MetadataStore db = KeyUtils.getDB(cData, conf);
 
-    List<KeyData> result = new ArrayList<KeyData>();
-    byte[] startKeyInBytes = startKey == null ? null :
-        DFSUtil.string2Bytes(startKey);
-    MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefix);
+    List<KeyData> result = new ArrayList<>();
+    byte[] startKeyInBytes = Longs.toByteArray(startLocalID);
     List<Map.Entry<byte[], byte[]>> range =
-        db.getSequentialRangeKVs(startKeyInBytes, count, prefixFilter);
+        db.getSequentialRangeKVs(startKeyInBytes, count, null);
     for (Map.Entry<byte[], byte[]> entry : range) {
-      String keyName = KeyUtils.getKeyName(entry.getKey());
       KeyData value = KeyUtils.getKeyData(entry.getValue());
-      KeyData data = new KeyData(value.getContainerName(), keyName);
+      KeyData data = new KeyData(value.getBlockID());
       result.add(data);
     }
     return result;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
index 3e267d2..06177cb 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/RandomContainerDeletionChoosingPolicy.java
@@ -41,7 +41,7 @@ public class RandomContainerDeletionChoosingPolicy
 
   @Override
   public List<ContainerData> chooseContainerForBlockDeletion(int count,
-      Map<String, ContainerStatus> candidateContainers)
+      Map<Long, ContainerStatus> candidateContainers)
       throws StorageContainerException {
     Preconditions.checkNotNull(candidateContainers,
         "Internal assertion: candidate containers cannot be null");
@@ -58,7 +58,7 @@ public class RandomContainerDeletionChoosingPolicy
 
         LOG.debug("Select container {} for block deletion, "
             + "pending deletion blocks num: {}.",
-            entry.getContainer().getContainerName(),
+            entry.getContainer().getContainerID(),
             entry.getNumPendingDeletionBlocks());
       } else {
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
index 0169a96..2463426 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/TopNOrderedContainerDeletionChoosingPolicy.java
@@ -53,7 +53,7 @@ public class TopNOrderedContainerDeletionChoosingPolicy
 
   @Override
   public List<ContainerData> chooseContainerForBlockDeletion(int count,
-      Map<String, ContainerStatus> candidateContainers)
+      Map<Long, ContainerStatus> candidateContainers)
       throws StorageContainerException {
     Preconditions.checkNotNull(candidateContainers,
         "Internal assertion: candidate containers cannot be null");
@@ -74,7 +74,7 @@ public class TopNOrderedContainerDeletionChoosingPolicy
           LOG.debug(
               "Select container {} for block deletion, "
                   + "pending deletion blocks num: {}.",
-              entry.getContainer().getContainerName(),
+              entry.getContainer().getContainerID(),
               entry.getNumPendingDeletionBlocks());
         } else {
           LOG.debug("Stop looking for next container, there is no"

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
index f55d74c..26dcf21 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ChunkManager.java
@@ -18,10 +18,10 @@
 
 package org.apache.hadoop.ozone.container.common.interfaces;
 
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdds.protocol.proto.ContainerProtos;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
 
 /**
@@ -32,20 +32,18 @@ public interface ChunkManager {
 
   /**
    * writes a given chunk.
-   * @param pipeline - Name and the set of machines that make this container.
-   * @param keyName - Name of the Key.
+   * @param blockID - ID of the block.
    * @param info - ChunkInfo.
    * @param stage - Chunk Stage write.
    * @throws StorageContainerException
    */
-  void writeChunk(Pipeline pipeline, String keyName,
-                  ChunkInfo info, byte[] data, ContainerProtos.Stage stage)
+  void writeChunk(BlockID blockID,
+      ChunkInfo info, byte[] data, ContainerProtos.Stage stage)
       throws StorageContainerException;
 
   /**
    * reads the data defined by a chunk.
-   * @param pipeline - container pipeline.
-   * @param keyName - Name of the Key
+   * @param blockID - ID of the block.
    * @param info - ChunkInfo.
    * @return  byte array
    * @throws StorageContainerException
@@ -53,17 +51,16 @@ public interface ChunkManager {
    * TODO: Right now we do not support partial reads and writes of chunks.
    * TODO: Explore if we need to do that for ozone.
    */
-  byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+  byte[] readChunk(BlockID blockID, ChunkInfo info) throws
       StorageContainerException;
 
   /**
    * Deletes a given chunk.
-   * @param pipeline  - Pipeline.
-   * @param keyName   - Key Name
+   * @param blockID - ID of the block.
    * @param info  - Chunk Info
    * @throws StorageContainerException
    */
-  void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info) throws
+  void deleteChunk(BlockID blockID, ChunkInfo info) throws
       StorageContainerException;
 
   // TODO : Support list operations.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
index f7280e2..6b60c52 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerDeletionChoosingPolicy.java
@@ -41,6 +41,6 @@ public interface ContainerDeletionChoosingPolicy {
    * @throws StorageContainerException
    */
   List<ContainerData> chooseContainerForBlockDeletion(int count,
-      Map<String, ContainerStatus> candidateContainers)
+      Map<Long, ContainerStatus> candidateContainers)
       throws StorageContainerException;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
index 2ff636e..84d95f8 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/ContainerManager.java
@@ -21,7 +21,6 @@ package org.apache.hadoop.ozone.container.common.interfaces;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -60,48 +59,43 @@ public interface ContainerManager extends RwLock {
   /**
    * Creates a container with the given name.
    *
-   * @param pipeline      -- Nodes which make up this container.
    * @param containerData - Container Name and metadata.
    * @throws StorageContainerException
    */
-  void createContainer(Pipeline pipeline, ContainerData containerData)
+  void createContainer(ContainerData containerData)
       throws StorageContainerException;
 
   /**
    * Deletes an existing container.
    *
-   * @param pipeline      - nodes that make this container.
-   * @param containerName - name of the container.
+   * @param containerID - ID of the container.
    * @param forceDelete   - whether this container should be deleted forcibly.
    * @throws StorageContainerException
    */
-  void deleteContainer(Pipeline pipeline, String containerName,
+  void deleteContainer(long containerID,
       boolean forceDelete) throws StorageContainerException;
 
   /**
    * Update an existing container.
    *
-   * @param pipeline container nodes
-   * @param containerName name of the container
+   * @param containerID ID of the container
    * @param data container data
    * @param forceUpdate if true, update container forcibly.
    * @throws StorageContainerException
    */
-  void updateContainer(Pipeline pipeline, String containerName,
-      ContainerData data, boolean forceUpdate) throws StorageContainerException;
+  void updateContainer(long containerID, ContainerData data,
+      boolean forceUpdate) throws StorageContainerException;
 
   /**
    * As simple interface for container Iterations.
    *
-   * @param prefix - Return only values matching this prefix
-   * @param count   - how many to return
-   * @param prevKey - Previous key - Server returns results from this point.
-   * @param data    - Actual containerData
+   * @param startContainerID -  Return containers with ID >= startContainerID.
+   * @param count - how many to return
+   * @param data - Actual containerData
    * @throws StorageContainerException
    */
-  void listContainer(String prefix, long count, String prevKey,
-                     List<ContainerData> data)
-      throws StorageContainerException;
+  void listContainer(long startContainerID, long count,
+      List<ContainerData> data) throws StorageContainerException;
 
   /**
    * Choose containers for block deletion.
@@ -115,30 +109,30 @@ public interface ContainerManager extends RwLock {
   /**
    * Get metadata about a specific container.
    *
-   * @param containerName - Name of the container
+   * @param containerID - ID of the container.
    * @return ContainerData - Container Data.
    * @throws StorageContainerException
    */
-  ContainerData readContainer(String containerName)
+  ContainerData readContainer(long containerID)
       throws StorageContainerException;
 
   /**
    * Closes a open container, if it is already closed or does not exist a
    * StorageContainerException is thrown.
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    * @throws StorageContainerException
    */
-  void closeContainer(String containerName)
+  void closeContainer(long containerID)
       throws StorageContainerException, NoSuchAlgorithmException;
 
   /**
    * Checks if a container exists.
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    * @return true if the container is open false otherwise.
    * @throws StorageContainerException  - Throws Exception if we are not
    * able to find the container.
    */
-  boolean isOpen(String containerName) throws StorageContainerException;
+  boolean isOpen(long containerID) throws StorageContainerException;
 
   /**
    * Supports clean shutdown of container.
@@ -203,7 +197,7 @@ public interface ContainerManager extends RwLock {
    * @param containerId
    *          container id
    */
-  void incrPendingDeletionBlocks(int numBlocks, String containerId);
+  void incrPendingDeletionBlocks(int numBlocks, long containerId);
 
   /**
    * Decrease pending deletion blocks count number of specified container.
@@ -213,64 +207,64 @@ public interface ContainerManager extends RwLock {
    * @param containerId
    *          container id
    */
-  void decrPendingDeletionBlocks(int numBlocks, String containerId);
+  void decrPendingDeletionBlocks(int numBlocks, long containerId);
 
   /**
    * Increase the read count of the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    */
-  void incrReadCount(String containerName);
+  void incrReadCount(long containerId);
 
   /**
    * Increse the read counter for bytes read from the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @param readBytes - bytes read from the container.
    */
-  void incrReadBytes(String containerName, long readBytes);
+  void incrReadBytes(long containerId, long readBytes);
 
 
   /**
    * Increase the write count of the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    */
-  void incrWriteCount(String containerName);
+  void incrWriteCount(long containerId);
 
   /**
    * Increase the write counter for bytes write into the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @param writeBytes - bytes write into the container.
    */
-  void incrWriteBytes(String containerName, long writeBytes);
+  void incrWriteBytes(long containerId, long writeBytes);
 
   /**
    * Increase the bytes used by the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @param used - additional bytes used by the container.
    * @return the current bytes used.
    */
-  long incrBytesUsed(String containerName, long used);
+  long incrBytesUsed(long containerId, long used);
 
   /**
    * Decrease the bytes used by the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @param used - additional bytes reclaimed by the container.
    * @return the current bytes used.
    */
-  long decrBytesUsed(String containerName, long used);
+  long decrBytesUsed(long containerId, long used);
 
   /**
    * Get the bytes used by the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @return the current bytes used by the container.
    */
-  long getBytesUsed(String containerName);
+  long getBytesUsed(long containerId);
 
   /**
    * Get the number of keys in the container.
-   * @param containerName - Name of the container.
+   * @param containerId - ID of the container.
    * @return the current key count.
    */
-  long getNumKeys(String containerName);
+  long getNumKeys(long containerId);
 
   /**
    * Get the container report state to send via HB to SCM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
index 8c27ba9..158ce38 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/KeyManager.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.ozone.container.common.interfaces;
 
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.container.common.helpers
     .StorageContainerException;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.ozone.container.common.helpers.KeyData;
 
 import java.io.IOException;
@@ -32,11 +32,10 @@ public interface KeyManager {
   /**
    * Puts or overwrites a key.
    *
-   * @param pipeline - Pipeline.
    * @param data     - Key Data.
    * @throws IOException
    */
-  void putKey(Pipeline pipeline, KeyData data) throws IOException;
+  void putKey(KeyData data) throws IOException;
 
   /**
    * Gets an existing key.
@@ -50,23 +49,21 @@ public interface KeyManager {
   /**
    * Deletes an existing Key.
    *
-   * @param pipeline - Pipeline.
-   * @param keyName  Key Data.
+   * @param blockID - ID of the block.
    * @throws StorageContainerException
    */
-  void deleteKey(Pipeline pipeline, String keyName)
+  void deleteKey(BlockID blockID)
       throws IOException;
 
   /**
    * List keys in a container.
    *
-   * @param pipeline - pipeline.
-   * @param prefix   - Prefix in needed.
-   * @param startKey  - Key to start from, EMPTY_STRING to begin.
+   * @param containerID - ID of the container.
+   * @param startLocalID  - Key to start from, 0 to begin.
    * @param count    - Number of keys to return.
    * @return List of Keys that match the criteria.
    */
-  List<KeyData> listKey(Pipeline pipeline, String prefix, String startKey,
+  List<KeyData> listKey(long containerID, long startLocalID,
       int count) throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
index ac95b2a..7c3fa30 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/background/BlockDeletingService.java
@@ -180,12 +180,12 @@ public class BlockDeletingService extends BackgroundService{
           meta.getSequentialRangeKVs(null, blockLimitPerTask, filter);
       if (toDeleteBlocks.isEmpty()) {
         LOG.debug("No under deletion block found in container : {}",
-            containerData.getContainerName());
+            containerData.getContainerID());
       }
 
       List<String> succeedBlocks = new LinkedList<>();
       LOG.debug("Container : {}, To-Delete blocks : {}",
-          containerData.getContainerName(), toDeleteBlocks.size());
+          containerData.getContainerID(), toDeleteBlocks.size());
       File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
       if (!dataDir.exists() || !dataDir.isDirectory()) {
         LOG.error("Invalid container data dir {} : "
@@ -220,11 +220,11 @@ public class BlockDeletingService extends BackgroundService{
       meta.writeBatch(batch);
       // update count of pending deletion blocks in in-memory container status
       containerManager.decrPendingDeletionBlocks(succeedBlocks.size(),
-          containerData.getContainerName());
+          containerData.getContainerID());
 
       if (!succeedBlocks.isEmpty()) {
         LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
-            containerData.getContainerName(), succeedBlocks.size(),
+            containerData.getContainerID(), succeedBlocks.size(),
             Time.monotonicNow() - startTime);
       }
       crr.addAll(succeedBlocks);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
index f7b49b7..d8adc7d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerHandler.java
@@ -58,19 +58,20 @@ public class CloseContainerHandler implements CommandHandler {
     LOG.debug("Processing Close Container command.");
     invocationCount++;
     long startTime = Time.monotonicNow();
-    String containerName = "UNKNOWN";
+    // TODO: define this as INVALID_CONTAINER_ID in HddsConsts.java (TBA)
+    long containerID = -1;
     try {
 
       SCMCloseContainerCmdResponseProto
           closeContainerProto =
           SCMCloseContainerCmdResponseProto
               .parseFrom(command.getProtoBufMessage());
-      containerName = closeContainerProto.getContainerName();
+      containerID = closeContainerProto.getContainerID();
 
-      container.getContainerManager().closeContainer(containerName);
+      container.getContainerManager().closeContainer(containerID);
 
     } catch (Exception e) {
-      LOG.error("Can't close container " + containerName, e);
+      LOG.error("Can't close container " + containerID, e);
     } finally {
       long endTime = Time.monotonicNow();
       totalTime += endTime - startTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index f106e3d..5231660 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
 
+import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto
@@ -108,7 +109,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
         txResultBuilder.setSuccess(true);
       } catch (IOException e) {
         LOG.warn("Failed to delete blocks for container={}, TXID={}",
-            entry.getContainerName(), entry.getTxID(), e);
+            entry.getContainerID(), entry.getTxID(), e);
         txResultBuilder.setSuccess(false);
       }
       resultBuilder.addResults(txResultBuilder.build());
@@ -150,7 +151,7 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
    */
   private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
       Configuration config) throws IOException {
-    String containerId = delTX.getContainerName();
+    long containerId = delTX.getContainerID();
     ContainerData containerInfo = containerManager.readContainer(containerId);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Processing Container : {}, DB path : {}", containerId,
@@ -159,9 +160,9 @@ public class DeleteBlocksCommandHandler implements CommandHandler {
 
     int newDeletionBlocks = 0;
     MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
-    for (String blk : delTX.getBlockIDList()) {
+    for (Long blk : delTX.getLocalIDList()) {
       BatchOperation batch = new BatchOperation();
-      byte[] blkBytes = DFSUtil.string2Bytes(blk);
+      byte[] blkBytes = Longs.toByteArray(blk);
       byte[] blkInfo = containerDB.get(blkBytes);
       if (blkInfo != null) {
         // Found the block in container db,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 5dee10f..eba565d 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -170,7 +170,7 @@ public class HeartbeatEndpointTask
                 commandResponseProto.getCloseContainerProto());
         if (LOG.isDebugEnabled()) {
           LOG.debug("Received SCM container close request for container {}",
-              closeContainer.getContainerName());
+              closeContainer.getContainerID());
         }
         this.context.addCommand(closeContainer);
         break;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 1a89e44..89eaace 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -94,7 +94,7 @@ public class ContainerStateMachine extends BaseStateMachine {
   private ThreadPoolExecutor writeChunkExecutor;
   private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       writeChunkFutureMap;
-  private final ConcurrentHashMap<String, CompletableFuture<Message>>
+  private final ConcurrentHashMap<Long, CompletableFuture<Message>>
       createContainerFutureMap;
 
   ContainerStateMachine(ContainerDispatcher dispatcher,
@@ -146,8 +146,7 @@ public class ContainerStateMachine extends BaseStateMachine {
       // create the log entry proto
       final WriteChunkRequestProto commitWriteChunkProto =
           WriteChunkRequestProto.newBuilder()
-              .setPipeline(write.getPipeline())
-              .setKeyName(write.getKeyName())
+              .setBlockID(write.getBlockID())
               .setChunkData(write.getChunkData())
               // skipping the data field as it is
               // already set in statemachine data proto
@@ -196,9 +195,9 @@ public class ContainerStateMachine extends BaseStateMachine {
   private CompletableFuture<Message> handleWriteChunk(
       ContainerCommandRequestProto requestProto, long entryIndex) {
     final WriteChunkRequestProto write = requestProto.getWriteChunk();
-    String containerName = write.getPipeline().getContainerName();
+    long containerID = write.getBlockID().getContainerID();
     CompletableFuture<Message> future =
-        createContainerFutureMap.get(containerName);
+        createContainerFutureMap.get(containerID);
     CompletableFuture<Message> writeChunkFuture;
     if (future != null) {
       writeChunkFuture = future.thenApplyAsync(
@@ -213,10 +212,10 @@ public class ContainerStateMachine extends BaseStateMachine {
 
   private CompletableFuture<Message> handleCreateContainer(
       ContainerCommandRequestProto requestProto) {
-    String containerName =
-        requestProto.getCreateContainer().getContainerData().getName();
+    long containerID =
+        requestProto.getCreateContainer().getContainerData().getContainerID();
     createContainerFutureMap.
-        computeIfAbsent(containerName, k -> new CompletableFuture<>());
+        computeIfAbsent(containerID, k -> new CompletableFuture<>());
     return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
   }
 
@@ -270,9 +269,9 @@ public class ContainerStateMachine extends BaseStateMachine {
       } else {
         Message message = runCommand(requestProto);
         if (cmdType == ContainerProtos.Type.CreateContainer) {
-          String containerName =
-              requestProto.getCreateContainer().getContainerData().getName();
-          createContainerFutureMap.remove(containerName).complete(message);
+          long containerID =
+              requestProto.getCreateContainer().getContainerData().getContainerID();
+          createContainerFutureMap.remove(containerID).complete(message);
         }
         return CompletableFuture.completedFuture(message);
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
index 6ae45b6..4d9c690 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/utils/ContainerCache.java
@@ -69,15 +69,15 @@ public final class ContainerCache extends LRUMap {
   /**
    * Closes a db instance.
    *
-   * @param container - name of the container to be closed.
+   * @param containerID - ID of the container to be closed.
    * @param db - db instance to close.
    */
-  private void closeDB(String container, MetadataStore db) {
+  private void closeDB(long containerID, MetadataStore db) {
     if (db != null) {
       try {
         db.close();
       } catch (IOException e) {
-        LOG.error("Error closing DB. Container: " + container, e);
+        LOG.error("Error closing DB. Container: " + containerID, e);
       }
     }
   }
@@ -93,7 +93,7 @@ public final class ContainerCache extends LRUMap {
       while (iterator.hasNext()) {
         iterator.next();
         MetadataStore db = (MetadataStore) iterator.getValue();
-        closeDB(iterator.getKey().toString(), db);
+        closeDB(((Number)iterator.getKey()).longValue(), db);
       }
       // reset the cache
       cache.clear();
@@ -110,7 +110,7 @@ public final class ContainerCache extends LRUMap {
     lock.lock();
     try {
       MetadataStore db = (MetadataStore) entry.getValue();
-      closeDB(entry.getKey().toString(), db);
+      closeDB(((Number)entry.getKey()).longValue(), db);
     } finally {
       lock.unlock();
     }
@@ -120,28 +120,27 @@ public final class ContainerCache extends LRUMap {
   /**
    * Returns a DB handle if available, create the handler otherwise.
    *
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    * @return MetadataStore.
    */
-  public MetadataStore getDB(String containerName, String containerDBPath)
+  public MetadataStore getDB(long containerID, String containerDBPath)
       throws IOException {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
+    Preconditions.checkState(containerID >= 0, "Container ID cannot be negative.");
     lock.lock();
     try {
-      MetadataStore db = (MetadataStore) this.get(containerName);
+      MetadataStore db = (MetadataStore) this.get(containerID);
 
       if (db == null) {
         db = MetadataStoreBuilder.newBuilder()
             .setDbFile(new File(containerDBPath))
             .setCreateIfMissing(false)
             .build();
-        this.put(containerName, db);
+        this.put(containerID, db);
       }
       return db;
     } catch (Exception e) {
       LOG.error("Error opening DB. Container:{} ContainerPath:{}",
-          containerName, containerDBPath, e);
+          containerID, containerDBPath, e);
       throw e;
     } finally {
       lock.unlock();
@@ -151,16 +150,15 @@ public final class ContainerCache extends LRUMap {
   /**
    * Remove a DB handler from cache.
    *
-   * @param containerName - Name of the container.
+   * @param containerID - ID of the container.
    */
-  public void removeDB(String containerName) {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
+  public void removeDB(long containerID) {
+    Preconditions.checkState(containerID >= 0, "Container ID cannot be negative.");
     lock.lock();
     try {
-      MetadataStore db = (MetadataStore)this.get(containerName);
-      closeDB(containerName, db);
-      this.remove(containerName);
+      MetadataStore db = (MetadataStore)this.get(containerID);
+      closeDB(containerID, db);
+      this.remove(containerID);
     } finally {
       lock.unlock();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index b1cdbc4..d1d6488 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -32,10 +32,10 @@ import static org.apache.hadoop.hdds.protocol.proto
 public class CloseContainerCommand
     extends SCMCommand<SCMCloseContainerCmdResponseProto> {
 
-  private String containerName;
+  private long containerID;
 
-  public CloseContainerCommand(String containerName) {
-    this.containerName = containerName;
+  public CloseContainerCommand(long containerID) {
+    this.containerID = containerID;
   }
 
   /**
@@ -60,17 +60,17 @@ public class CloseContainerCommand
 
   public SCMCloseContainerCmdResponseProto getProto() {
     return SCMCloseContainerCmdResponseProto.newBuilder()
-        .setContainerName(containerName).build();
+        .setContainerID(containerID).build();
   }
 
   public static CloseContainerCommand getFromProtobuf(
       SCMCloseContainerCmdResponseProto closeContainerProto) {
     Preconditions.checkNotNull(closeContainerProto);
-    return new CloseContainerCommand(closeContainerProto.getContainerName());
+    return new CloseContainerCommand(closeContainerProto.getContainerID());
 
   }
 
-  public String getContainerName() {
-    return containerName;
+  public long getContainerID() {
+    return containerID;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
index 03b85e5..bc7fb7a 100644
--- a/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
+++ b/hadoop-hdds/container-service/src/main/proto/StorageContainerDatanodeProtocol.proto
@@ -87,7 +87,6 @@ message NodeContianerMapping {
 A container report contains the following information.
 */
 message ContainerInfo {
-  required string containerName = 1;
   optional string finalhash = 2;
   optional int64 size = 3;
   optional int64 used = 4;
@@ -102,10 +101,12 @@ message ContainerInfo {
 }
 
 // The deleted blocks which are stored in deletedBlock.db of scm.
+// We don't use BlockID because this only contians multiple localIDs
+// of the same containerID.
 message DeletedBlocksTransaction {
   required int64 txID = 1;
-  required string containerName = 2;
-  repeated string blockID = 3;
+  required int64 containerID = 2;
+  repeated int64 localID = 3;
   // the retry time of sending deleting command to datanode.
   required int32 count = 4;
 }
@@ -201,7 +202,7 @@ message SendContainerReportProto {
 This command asks the datanode to close a specific container.
 */
 message SCMCloseContainerCmdResponseProto {
-  required string containerName = 1;
+  required int64 containerID = 1;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
index 41a8a80..764ccfd 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/ScmTestMock.java
@@ -239,7 +239,7 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
 
       for (StorageContainerDatanodeProtocolProtos.ContainerInfo report:
           reports.getReportsList()) {
-        containers.put(report.getContainerName(), report);
+        containers.put(report.getContainerID(), report);
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
index 4ab2516..f9aa0cd 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
@@ -18,8 +18,8 @@
 package org.apache.hadoop.hdds.scm.block;
 
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.client.BlockID;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -43,14 +43,6 @@ public interface BlockManager extends Closeable {
       HddsProtos.ReplicationFactor factor, String owner) throws IOException;
 
   /**
-   *  Give the key to the block, get the pipeline info.
-   * @param key - key to the block.
-   * @return - Pipeline that used to access the block.
-   * @throws IOException
-   */
-  Pipeline getBlock(String key) throws IOException;
-
-  /**
    * Deletes a list of blocks in an atomic operation. Internally, SCM
    * writes these blocks into a {@link DeletedBlockLog} and deletes them
    * from SCM DB. If this is successful, given blocks are entering pending
@@ -60,7 +52,7 @@ public interface BlockManager extends Closeable {
    *                 a particular object key.
    * @throws IOException if exception happens, non of the blocks is deleted.
    */
-  void deleteBlocks(List<String> blockIDs) throws IOException;
+  void deleteBlocks(List<BlockID> blockIDs) throws IOException;
 
   /**
    * @return the block deletion transaction log maintained by SCM.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index d966112..5a98e85 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -16,30 +16,25 @@
  */
 package org.apache.hadoop.hdds.scm.block;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.Mapping;
 import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.client.BlockID;
 import org.apache.hadoop.util.StringUtils;
-import org.apache.hadoop.utils.BatchOperation;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.util.Time;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import javax.management.ObjectName;
-import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -54,10 +49,7 @@ import java.util.concurrent.locks.ReentrantLock;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .CHILL_MODE_EXCEPTION;
 import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
-    .FAILED_TO_FIND_BLOCK;
-import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
     .INVALID_BLOCK_SIZE;
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -66,7 +58,6 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
 
 /** Block Manager manages the block access for SCM. */
 public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
@@ -78,11 +69,9 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   private final NodeManager nodeManager;
   private final Mapping containerManager;
-  private final MetadataStore blockStore;
 
   private final Lock lock;
   private final long containerSize;
-  private final long cacheSize;
 
   private final DeletedBlockLog deletedBlockLog;
   private final SCMBlockDeletingService blockDeletingService;
@@ -97,30 +86,17 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    * @param conf - configuration.
    * @param nodeManager - node manager.
    * @param containerManager - container manager.
-   * @param cacheSizeMB - cache size for level db store.
    * @throws IOException
    */
   public BlockManagerImpl(final Configuration conf,
-      final NodeManager nodeManager, final Mapping containerManager,
-      final int cacheSizeMB) throws IOException {
+      final NodeManager nodeManager, final Mapping containerManager)
+      throws IOException {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
-    this.cacheSize = cacheSizeMB;
 
     this.containerSize = OzoneConsts.GB * conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
-    File metaDir = getOzoneMetaDirPath(conf);
-    String scmMetaDataDir = metaDir.getPath();
-
-    // Write the block key to container name mapping.
-    File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
-    blockStore =
-        MetadataStoreBuilder.newBuilder()
-            .setConf(conf)
-            .setDbFile(blockContainerDbPath)
-            .setCacheSize(this.cacheSize * OzoneConsts.MB)
-            .build();
 
     this.containerProvisionBatchSize =
         conf.getInt(
@@ -181,12 +157,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     lock.lock();
     try {
       for (int i = 0; i < count; i++) {
-        String containerName = UUID.randomUUID().toString();
         ContainerInfo containerInfo = null;
         try {
           // TODO: Fix this later when Ratis is made the Default.
           containerInfo = containerManager.allocateContainer(type, factor,
-              containerName, owner);
+              owner);
 
           if (containerInfo == null) {
             LOG.warn("Unable to allocate container.");
@@ -267,7 +242,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
                   size, owner, type, factor, HddsProtos.LifeCycleState
                       .ALLOCATED);
       if (containerInfo != null) {
-        containerManager.updateContainerState(containerInfo.getContainerName(),
+        containerManager.updateContainerState(containerInfo.getContainerID(),
             HddsProtos.LifeCycleEvent.CREATE);
         return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
       }
@@ -297,7 +272,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
                   size, owner, type, factor, HddsProtos.LifeCycleState
                       .ALLOCATED);
       if (containerInfo != null) {
-        containerManager.updateContainerState(containerInfo.getContainerName(),
+        containerManager.updateContainerState(containerInfo.getContainerID(),
             HddsProtos.LifeCycleEvent.CREATE);
         return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
       }
@@ -327,68 +302,27 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
       throws IOException {
 
-    // TODO : Replace this with Block ID.
-    String blockKey = UUID.randomUUID().toString();
-    boolean createContainer = (state == HddsProtos.LifeCycleState.ALLOCATED);
-
-    AllocatedBlock.Builder abb =
-        new AllocatedBlock.Builder()
-            .setKey(blockKey)
-            // TODO : Use containerinfo instead of pipeline.
-            .setPipeline(containerInfo.getPipeline())
-            .setShouldCreateContainer(createContainer);
-    LOG.trace("New block allocated : {} Container ID: {}", blockKey,
-        containerInfo.toString());
-
     if (containerInfo.getPipeline().getMachines().size() == 0) {
       LOG.error("Pipeline Machine count is zero.");
       return null;
     }
 
-    // Persist this block info to the blockStore DB, so getBlock(key) can
-    // find which container the block lives.
-    // TODO : Remove this DB in future
-    // and make this a KSM operation. Category: SCALABILITY.
-    if (containerInfo.getPipeline().getMachines().size() > 0) {
-      blockStore.put(
-          DFSUtil.string2Bytes(blockKey),
-          DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName()));
-    }
-    return abb.build();
-  }
+    // TODO : Revisit this local ID allocation when HA is added.
+    // TODO: this does not work well if multiple allocation kicks in a tight
+    // loop.
+    long localID = Time.getUtcTime();
+    long containerID = containerInfo.getContainerID();
 
-  /**
-   * Given a block key, return the Pipeline information.
-   *
-   * @param key - block key assigned by SCM.
-   * @return Pipeline (list of DNs and leader) to access the block.
-   * @throws IOException
-   */
-  @Override
-  public Pipeline getBlock(final String key) throws IOException {
-    lock.lock();
-    try {
-      byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
-      if (containerBytes == null) {
-        throw new SCMException(
-            "Specified block key does not exist. key : " + key,
-            FAILED_TO_FIND_BLOCK);
-      }
+    boolean createContainer = (state == HddsProtos.LifeCycleState.ALLOCATED);
 
-      String containerName = DFSUtil.bytes2String(containerBytes);
-      ContainerInfo containerInfo = containerManager.getContainer(
-          containerName);
-      if (containerInfo == null) {
-        LOG.debug("Container {} allocated by block service"
-            + "can't be found in SCM", containerName);
-        throw new SCMException(
-            "Unable to find container for the block",
-            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
-      }
-      return containerInfo.getPipeline();
-    } finally {
-      lock.unlock();
-    }
+    AllocatedBlock.Builder abb =
+        new AllocatedBlock.Builder()
+            .setBlockID(new BlockID(containerID, localID))
+            .setPipeline(containerInfo.getPipeline())
+            .setShouldCreateContainer(createContainer);
+    LOG.trace("New block allocated : {} Container ID: {}", localID,
+        containerID);
+    return abb.build();
   }
 
   /**
@@ -403,40 +337,28 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    * @throws IOException if exception happens, non of the blocks is deleted.
    */
   @Override
-  public void deleteBlocks(List<String> blockIDs) throws IOException {
+  public void deleteBlocks(List<BlockID> blockIDs) throws IOException {
     if (!nodeManager.isOutOfChillMode()) {
       throw new SCMException("Unable to delete block while in chill mode",
           CHILL_MODE_EXCEPTION);
     }
 
     lock.lock();
-    LOG.info("Deleting blocks {}", String.join(",", blockIDs));
-    Map<String, List<String>> containerBlocks = new HashMap<>();
-    BatchOperation batch = new BatchOperation();
-    BatchOperation rollbackBatch = new BatchOperation();
+    LOG.info("Deleting blocks {}", StringUtils.join(",", blockIDs));
+    Map<Long, List<Long>> containerBlocks = new HashMap<>();
     // TODO: track the block size info so that we can reclaim the container
     // TODO: used space when the block is deleted.
     try {
-      for (String blockKey : blockIDs) {
-        byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey);
-        byte[] containerBytes = blockStore.get(blockKeyBytes);
-        if (containerBytes == null) {
-          throw new SCMException(
-              "Specified block key does not exist. key : " + blockKey,
-              FAILED_TO_FIND_BLOCK);
-        }
-        batch.delete(blockKeyBytes);
-        rollbackBatch.put(blockKeyBytes, containerBytes);
-
+      for (BlockID block : blockIDs) {
         // Merge blocks to a container to blocks mapping,
         // prepare to persist this info to the deletedBlocksLog.
-        String containerName = DFSUtil.bytes2String(containerBytes);
-        if (containerBlocks.containsKey(containerName)) {
-          containerBlocks.get(containerName).add(blockKey);
+        long containerID = block.getContainerID();
+        if (containerBlocks.containsKey(containerID)) {
+          containerBlocks.get(containerID).add(block.getLocalID());
         } else {
-          List<String> item = new ArrayList<>();
-          item.add(blockKey);
-          containerBlocks.put(containerName, item);
+          List<Long> item = new ArrayList<>();
+          item.add(block.getLocalID());
+          containerBlocks.put(containerID, item);
         }
       }
 
@@ -445,34 +367,13 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
       // removed. If we write the log first, once log is written, the
       // async deleting service will start to scan and might be picking
       // up some blocks to do real deletions, that might cause data loss.
-      blockStore.writeBatch(batch);
       try {
         deletedBlockLog.addTransactions(containerBlocks);
       } catch (IOException e) {
-        try {
-          // If delLog update is failed, we need to rollback the changes.
-          blockStore.writeBatch(rollbackBatch);
-        } catch (IOException rollbackException) {
-          // This is a corner case. AddTX fails and rollback also fails,
-          // this will leave these blocks in inconsistent state. They were
-          // moved to pending deletion state in SCM DB but were not written
-          // into delLog so real deletions would not be done. Blocks become
-          // to be invisible from namespace but actual data are not removed.
-          // We log an error here so admin can manually check and fix such
-          // errors.
-          LOG.error(
-              "Blocks might be in inconsistent state because"
-                  + " they were moved to pending deletion state in SCM DB but"
-                  + " not written into delLog. Admin can manually add them"
-                  + " into delLog for deletions. Inconsistent block list: {}",
-              String.join(",", blockIDs),
-              e);
-          throw rollbackException;
-        }
         throw new IOException(
             "Skip writing the deleted blocks info to"
                 + " the delLog because addTransaction fails. Batch skipped: "
-                + String.join(",", blockIDs),
+                + StringUtils.join(",", blockIDs),
             e);
       }
       // TODO: Container report handling of the deleted blocks:
@@ -488,11 +389,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     return this.deletedBlockLog;
   }
 
-  @VisibleForTesting
-  public String getDeletedKeyName(String key) {
-    return StringUtils.format(".Deleted/%s", key);
-  }
-
   /**
    * Close the resources for BlockManager.
    *
@@ -500,9 +396,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
    */
   @Override
   public void close() throws IOException {
-    if (blockStore != null) {
-      blockStore.close();
-    }
     if (deletedBlockLog != null) {
       deletedBlockLog.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
index 47074d2..32290cc 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -56,7 +56,7 @@ public class DatanodeDeletedBlockTransactions {
   public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
     ContainerInfo info = null;
     try {
-      info = mappingService.getContainer(tx.getContainerName());
+      info = mappingService.getContainer(tx.getContainerID());
     } catch (IOException e) {
       SCMBlockDeletingService.LOG.warn("Got container info error.", e);
     }
@@ -64,7 +64,7 @@ public class DatanodeDeletedBlockTransactions {
     if (info == null) {
       SCMBlockDeletingService.LOG.warn(
           "Container {} not found, continue to process next",
-          tx.getContainerName());
+          tx.getContainerID());
       return;
     }
 
@@ -75,7 +75,7 @@ public class DatanodeDeletedBlockTransactions {
         if (txs != null && txs.size() < maximumAllowedTXNum) {
           boolean hasContained = false;
           for (DeletedBlocksTransaction t : txs) {
-            if (t.getContainerName().equals(tx.getContainerName())) {
+            if (t.getContainerID() == tx.getContainerID()) {
               hasContained = true;
               break;
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
index f7b770e..cc32b35 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -89,12 +89,12 @@ public interface DeletedBlockLog extends Closeable {
   /**
    * Creates a block deletion transaction and adds that into the log.
    *
-   * @param containerName - container name.
+   * @param containerID - container ID.
    * @param blocks - blocks that belong to the same container.
    *
    * @throws IOException
    */
-  void addTransaction(String containerName, List<String> blocks)
+  void addTransaction(long containerID, List<Long> blocks)
       throws IOException;
 
   /**
@@ -110,7 +110,7 @@ public interface DeletedBlockLog extends Closeable {
    * @param containerBlocksMap a map of containerBlocks.
    * @throws IOException
    */
-  void addTransactions(Map<String, List<String>> containerBlocksMap)
+  void addTransactions(Map<Long, List<Long>> containerBlocksMap)
       throws IOException;
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
index 0f4988a..cabcb46 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLogImpl.java
@@ -190,8 +190,14 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
     try {
       for(Long txID : txIDs) {
         try {
+          byte [] deleteBlockBytes =
+              deletedStore.get(Longs.toByteArray(txID));
+          if (deleteBlockBytes == null) {
+            LOG.warn("Delete txID {} not found", txID);
+            continue;
+          }
           DeletedBlocksTransaction block = DeletedBlocksTransaction
-              .parseFrom(deletedStore.get(Longs.toByteArray(txID)));
+              .parseFrom(deleteBlockBytes);
           DeletedBlocksTransaction.Builder builder = block.toBuilder();
           int currentCount = block.getCount();
           if (currentCount > -1) {
@@ -216,11 +222,11 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   }
 
   private DeletedBlocksTransaction constructNewTransaction(long txID,
-      String containerName, List<String> blocks) {
+      long containerID, List<Long> blocks) {
     return DeletedBlocksTransaction.newBuilder()
         .setTxID(txID)
-        .setContainerName(containerName)
-        .addAllBlockID(blocks)
+        .setContainerID(containerID)
+        .addAllLocalID(blocks)
         .setCount(0)
         .build();
   }
@@ -250,18 +256,18 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
   /**
    * {@inheritDoc}
    *
-   * @param containerName - container name.
+   * @param containerID - container ID.
    * @param blocks - blocks that belong to the same container.
    * @throws IOException
    */
   @Override
-  public void addTransaction(String containerName, List<String> blocks)
+  public void addTransaction(long containerID, List<Long> blocks)
       throws IOException {
     BatchOperation batch = new BatchOperation();
     lock.lock();
     try {
       DeletedBlocksTransaction tx = constructNewTransaction(lastTxID + 1,
-          containerName, blocks);
+          containerID, blocks);
       byte[] key = Longs.toByteArray(lastTxID + 1);
 
       batch.put(key, tx.toByteArray());
@@ -303,13 +309,13 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
    * @throws IOException
    */
   @Override
-  public void addTransactions(Map<String, List<String>> containerBlocksMap)
+  public void addTransactions(Map<Long, List<Long>> containerBlocksMap)
       throws IOException {
     BatchOperation batch = new BatchOperation();
     lock.lock();
     try {
       long currentLatestID = lastTxID;
-      for (Map.Entry<String, List<String>> entry :
+      for (Map.Entry<Long, List<Long>> entry :
           containerBlocksMap.entrySet()) {
         currentLatestID += 1;
         byte[] key = Longs.toByteArray(currentLatestID);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
index 63cb3a3..e569874 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerMapping.java
@@ -18,6 +18,7 @@ package org.apache.hadoop.hdds.scm.container;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import com.google.common.primitives.Longs;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.closer.ContainerCloser;
@@ -26,7 +27,6 @@ import org.apache.hadoop.hdds.scm.container.replication.ContainerSupervisor;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 import org.apache.hadoop.hdds.scm.node.NodeManager;
 import org.apache.hadoop.hdds.scm.pipelines.PipelineSelector;
-import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -38,8 +38,6 @@ import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.lease.Lease;
 import org.apache.hadoop.ozone.lease.LeaseException;
 import org.apache.hadoop.ozone.lease.LeaseManager;
-import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
-import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
 import org.apache.hadoop.utils.MetadataStoreBuilder;
 import org.slf4j.Logger;
@@ -149,16 +147,15 @@ public class ContainerMapping implements Mapping {
    * {@inheritDoc}
    */
   @Override
-  public ContainerInfo getContainer(final String containerName) throws
+  public ContainerInfo getContainer(final long containerID) throws
       IOException {
     ContainerInfo containerInfo;
     lock.lock();
     try {
-      byte[] containerBytes = containerStore.get(containerName.getBytes(
-          encoding));
+      byte[] containerBytes = containerStore.get(Longs.toByteArray(containerID));
       if (containerBytes == null) {
         throw new SCMException(
-            "Specified key does not exist. key : " + containerName,
+            "Specified key does not exist. key : " + containerID,
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
 
@@ -175,19 +172,18 @@ public class ContainerMapping implements Mapping {
    * {@inheritDoc}
    */
   @Override
-  public List<ContainerInfo> listContainer(String startName,
-      String prefixName, int count) throws IOException {
+  public List<ContainerInfo> listContainer(long startContainerID,
+      int count) throws IOException {
     List<ContainerInfo> containerList = new ArrayList<>();
     lock.lock();
     try {
       if (containerStore.isEmpty()) {
         throw new IOException("No container exists in current db");
       }
-      MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
-      byte[] startKey = startName == null ? null : DFSUtil.string2Bytes(
-          startName);
+      byte[] startKey = startContainerID <= 0 ? null :
+          Longs.toByteArray(startContainerID);
       List<Map.Entry<byte[], byte[]>> range =
-          containerStore.getSequentialRangeKVs(startKey, count, prefixFilter);
+          containerStore.getSequentialRangeKVs(startKey, count, null);
 
       // Transform the values into the pipelines.
       // TODO: filter by container state
@@ -209,7 +205,6 @@ public class ContainerMapping implements Mapping {
    * Allocates a new container.
    *
    * @param replicationFactor - replication factor of the container.
-   * @param containerName - Name of the container.
    * @param owner - The string name of the Service that owns this container.
    * @return - Pipeline that makes up this container.
    * @throws IOException - Exception
@@ -218,11 +213,8 @@ public class ContainerMapping implements Mapping {
   public ContainerInfo allocateContainer(
       ReplicationType type,
       ReplicationFactor replicationFactor,
-      final String containerName,
       String owner)
       throws IOException {
-    Preconditions.checkNotNull(containerName);
-    Preconditions.checkState(!containerName.isEmpty());
 
     ContainerInfo containerInfo;
     if (!nodeManager.isOutOfChillMode()) {
@@ -233,19 +225,12 @@ public class ContainerMapping implements Mapping {
 
     lock.lock();
     try {
-      byte[] containerBytes = containerStore.get(containerName.getBytes(
-          encoding));
-      if (containerBytes != null) {
-        throw new SCMException(
-            "Specified container already exists. key : " + containerName,
-            SCMException.ResultCodes.CONTAINER_EXISTS);
-      }
       containerInfo =
           containerStateManager.allocateContainer(
-              pipelineSelector, type, replicationFactor, containerName,
-              owner);
-      containerStore.put(
-          containerName.getBytes(encoding), containerInfo.getProtobuf()
+              pipelineSelector, type, replicationFactor, owner);
+
+      byte[] containerIDBytes = Longs.toByteArray(containerInfo.getContainerID());
+      containerStore.put(containerIDBytes, containerInfo.getProtobuf()
               .toByteArray());
     } finally {
       lock.unlock();
@@ -256,20 +241,20 @@ public class ContainerMapping implements Mapping {
   /**
    * Deletes a container from SCM.
    *
-   * @param containerName - Container name
+   * @param containerID - Container ID
    * @throws IOException if container doesn't exist or container store failed
    *                     to delete the
    *                     specified key.
    */
   @Override
-  public void deleteContainer(String containerName) throws IOException {
+  public void deleteContainer(long containerID) throws IOException {
     lock.lock();
     try {
-      byte[] dbKey = containerName.getBytes(encoding);
+      byte[] dbKey = Longs.toByteArray(containerID);
       byte[] containerBytes = containerStore.get(dbKey);
       if (containerBytes == null) {
         throw new SCMException(
-            "Failed to delete container " + containerName + ", reason : " +
+            "Failed to delete container " + containerID + ", reason : " +
                 "container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
@@ -284,17 +269,17 @@ public class ContainerMapping implements Mapping {
    */
   @Override
   public HddsProtos.LifeCycleState updateContainerState(
-      String containerName, HddsProtos.LifeCycleEvent event) throws
+      long containerID, HddsProtos.LifeCycleEvent event) throws
       IOException {
     ContainerInfo containerInfo;
     lock.lock();
     try {
-      byte[] dbKey = containerName.getBytes(encoding);
+      byte[] dbKey = Longs.toByteArray(containerID);
       byte[] containerBytes = containerStore.get(dbKey);
       if (containerBytes == null) {
         throw new SCMException(
             "Failed to update container state"
-                + containerName
+                + containerID
                 + ", reason : container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
@@ -310,7 +295,7 @@ public class ContainerMapping implements Mapping {
             containerLeaseManager.acquire(containerInfo);
         // Register callback to be executed in case of timeout
         containerLease.registerCallBack(() -> {
-          updateContainerState(containerName,
+          updateContainerState(containerID,
               HddsProtos.LifeCycleEvent.TIMEOUT);
           return null;
         });
@@ -388,7 +373,7 @@ public class ContainerMapping implements Mapping {
     containerSupervisor.handleContainerReport(reports);
     for (StorageContainerDatanodeProtocolProtos.ContainerInfo datanodeState :
         containerInfos) {
-      byte[] dbKey = datanodeState.getContainerNameBytes().toByteArray();
+      byte[] dbKey = Longs.toByteArray(datanodeState.getContainerID());
       lock.lock();
       try {
         byte[] containerBytes = containerStore.get(dbKey);
@@ -409,14 +394,14 @@ public class ContainerMapping implements Mapping {
           // If the container is closed, then state is already written to SCM
           // DB.TODO: So can we can write only once to DB.
           if (closeContainerIfNeeded(newState)) {
-            LOG.info("Closing the Container: {}", newState.getContainerName());
+            LOG.info("Closing the Container: {}", newState.getContainerID());
           }
         } else {
           // Container not found in our container db.
           LOG.error("Error while processing container report from datanode :" +
                   " {}, for container: {}, reason: container doesn't exist in" +
                   "container database.", reports.getDatanodeDetails(),
-              datanodeState.getContainerName());
+              datanodeState.getContainerID());
         }
       } finally {
         lock.unlock();
@@ -436,7 +421,7 @@ public class ContainerMapping implements Mapping {
       HddsProtos.SCMContainerInfo knownState) {
     HddsProtos.SCMContainerInfo.Builder builder =
         HddsProtos.SCMContainerInfo.newBuilder();
-    builder.setContainerName(knownState.getContainerName());
+    builder.setContainerID(knownState.getContainerID());
     builder.setPipeline(knownState.getPipeline());
     // If used size is greater than allocated size, we will be updating
     // allocated size with used size. This update is done as a fallback
@@ -473,7 +458,7 @@ public class ContainerMapping implements Mapping {
     float containerUsedPercentage = 1.0f *
         newState.getUsedBytes() / this.size;
 
-    ContainerInfo scmInfo = getContainer(newState.getContainerName());
+    ContainerInfo scmInfo = getContainer(newState.getContainerID());
     if (containerUsedPercentage >= containerCloseThreshold
         && !isClosed(scmInfo)) {
       // We will call closer till get to the closed state.
@@ -488,13 +473,13 @@ public class ContainerMapping implements Mapping {
         // closed state from container reports. This state change should be
         // invoked once and only once.
         HddsProtos.LifeCycleState state = updateContainerState(
-            scmInfo.getContainerName(),
+            scmInfo.getContainerID(),
             HddsProtos.LifeCycleEvent.FINALIZE);
         if (state != HddsProtos.LifeCycleState.CLOSING) {
           LOG.error("Failed to close container {}, reason : Not able " +
                   "to " +
                   "update container state, current container state: {}.",
-              newState.getContainerName(), state);
+              newState.getContainerID(), state);
           return false;
         }
         return true;
@@ -561,11 +546,11 @@ public class ContainerMapping implements Mapping {
   @VisibleForTesting
   public void flushContainerInfo() throws IOException {
     List<ContainerInfo> containers = containerStateManager.getAllContainers();
-    List<String> failedContainers = new ArrayList<>();
+    List<Long> failedContainers = new ArrayList<>();
     for (ContainerInfo info : containers) {
       // even if some container updated failed, others can still proceed
       try {
-        byte[] dbKey = info.getContainerName().getBytes(encoding);
+        byte[] dbKey = Longs.toByteArray(info.getContainerID());
         byte[] containerBytes = containerStore.get(dbKey);
         // TODO : looks like when a container is deleted, the container is
         // removed from containerStore but not containerStateManager, so it can
@@ -577,7 +562,6 @@ public class ContainerMapping implements Mapping {
           ContainerInfo oldInfo = ContainerInfo.fromProtobuf(oldInfoProto);
           ContainerInfo newInfo = new ContainerInfo.Builder()
               .setAllocatedBytes(info.getAllocatedBytes())
-              .setContainerName(oldInfo.getContainerName())
               .setNumberOfKeys(oldInfo.getNumberOfKeys())
               .setOwner(oldInfo.getOwner())
               .setPipeline(oldInfo.getPipeline())
@@ -588,10 +572,10 @@ public class ContainerMapping implements Mapping {
         } else {
           LOG.debug("Container state manager has container {} but not found " +
                   "in container store, a deleted container?",
-              info.getContainerName());
+              info.getContainerID());
         }
       } catch (IOException ioe) {
-        failedContainers.add(info.getContainerName());
+        failedContainers.add(info.getContainerID());
       }
     }
     if (!failedContainers.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a43ac28/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
index 227eca0..f11a50c 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerStateManager.java
@@ -157,8 +157,7 @@ public class ContainerStateManager implements Closeable {
 
     List<ContainerInfo> containerList;
     try {
-      containerList = containerMapping.listContainer(null,
-          null, Integer.MAX_VALUE);
+      containerList = containerMapping.listContainer(0, Integer.MAX_VALUE);
 
       // if there are no container to load, let us return.
       if (containerList == null || containerList.size() == 0) {
@@ -280,24 +279,21 @@ public class ContainerStateManager implements Closeable {
    * @param selector -- Pipeline selector class.
    * @param type -- Replication type.
    * @param replicationFactor - Replication replicationFactor.
-   * @param containerName - Container Name.
    * @return Container Info.
    * @throws IOException  on Failure.
    */
   public ContainerInfo allocateContainer(PipelineSelector selector, HddsProtos
       .ReplicationType type, HddsProtos.ReplicationFactor replicationFactor,
-      final String containerName, String owner) throws
-      IOException {
+      String owner) throws IOException {
 
     Pipeline pipeline = selector.getReplicationPipeline(type,
-        replicationFactor, containerName);
+        replicationFactor);
 
     Preconditions.checkNotNull(pipeline, "Pipeline type=%s/"
         + "replication=%s couldn't be found for the new container. "
         + "Do you have enough nodes?", type, replicationFactor);
 
     ContainerInfo containerInfo = new ContainerInfo.Builder()
-        .setContainerName(containerName)
         .setState(HddsProtos.LifeCycleState.ALLOCATED)
         .setPipeline(pipeline)
         // This is bytes allocated for blocks inside container, not the
@@ -332,7 +328,7 @@ public class ContainerStateManager implements Closeable {
       String error = String.format("Failed to update container state %s, " +
               "reason: invalid state transition from state: %s upon " +
               "event: %s.",
-          info.getPipeline().getContainerName(), info.getState(), event);
+          info.getContainerID(), info.getState(), event);
       LOG.error(error);
       throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message