hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [2/3] hadoop git commit: HDFS-12387. Ozone: Support Ratis as a first class replication mechanism. Contributed by Anu Engineer.
Date Wed, 04 Oct 2017 17:29:01 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
index 23faaf9..95d5cbc 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/block/BlockManagerImpl.java
@@ -5,33 +5,34 @@
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.ozone.scm.block;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.ozone.OzoneConsts;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.scm.container.Mapping;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.scm.ScmConfigKeys;
 import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
-import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.utils.BatchOperation;
@@ -43,35 +44,16 @@ import org.slf4j.LoggerFactory;
 import javax.management.ObjectName;
 import java.io.File;
 import java.io.IOException;
-
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import java.util.stream.Collectors;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
 
-import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
-import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    CHILL_MODE_EXCEPTION;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    FAILED_TO_ALLOCATE_CONTAINER;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    FAILED_TO_FIND_CONTAINER;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    FAILED_TO_FIND_CONTAINER_WITH_SPACE;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    FAILED_TO_FIND_BLOCK;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    FAILED_TO_LOAD_OPEN_CONTAINER;
-import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
-    INVALID_BLOCK_SIZE;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
@@ -80,13 +62,22 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
-
-/**
- * Block Manager manages the block access for SCM.
- */
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .CHILL_MODE_EXCEPTION;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_BLOCK;
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes
+    .INVALID_BLOCK_SIZE;
+
+/** Block Manager manages the block access for SCM. */
 public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   private static final Logger LOG =
       LoggerFactory.getLogger(BlockManagerImpl.class);
+  // TODO : FIX ME : Hard coding the owner.
+  // Currently only user of the block service is Ozone, CBlock manages blocks
+  // by itself and does not rely on the Block service offered by SCM.
+  private final Owner owner = Owner.OZONE;
 
   private final NodeManager nodeManager;
   private final Mapping containerManager;
@@ -96,20 +87,16 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   private final long containerSize;
   private final long cacheSize;
 
-  // Track all containers owned by block service.
-  private final MetadataStore containerStore;
   private final DeletedBlockLog deletedBlockLog;
   private final SCMBlockDeletingService blockDeletingService;
 
-  private Map<OzoneProtos.LifeCycleState,
-      Map<String, BlockContainerInfo>> containers;
   private final int containerProvisionBatchSize;
   private final Random rand;
   private ObjectName mxBean;
 
-
   /**
    * Constructor.
+   *
    * @param conf - configuration.
    * @param nodeManager - node manager.
    * @param containerManager - container manager.
@@ -122,34 +109,26 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     this.nodeManager = nodeManager;
     this.containerManager = containerManager;
     this.cacheSize = cacheSizeMB;
-    File metaDir = OzoneUtils.getScmMetadirPath(conf);
-    String scmMetaDataDir = metaDir.getPath();
-
-    // Write the block key to container name mapping.
-    File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
-    blockStore = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(blockContainerDbPath)
-        .setCacheSize(this.cacheSize * OzoneConsts.MB)
-        .build();
 
     this.containerSize = OzoneConsts.GB * conf.getInt(
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
         ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+    File metaDir = OzoneUtils.getScmMetadirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
 
-    // Load store of all open contains for block allocation
-    File openContainsDbPath = new File(scmMetaDataDir, OPEN_CONTAINERS_DB);
-    containerStore = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(openContainsDbPath)
-        .setCacheSize(this.cacheSize * OzoneConsts.MB)
-        .build();
-
-    loadAllocatedContainers();
-
-    this.containerProvisionBatchSize = conf.getInt(
-        ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
-        ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
+    // Write the block key to container name mapping.
+    File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
+    blockStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(blockContainerDbPath)
+            .setCacheSize(this.cacheSize * OzoneConsts.MB)
+            .build();
+
+    this.containerProvisionBatchSize =
+        conf.getInt(
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
     rand = new Random();
     this.lock = new ReentrantLock();
 
@@ -157,18 +136,24 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
     // SCM block deleting transaction log and deleting service.
     deletedBlockLog = new DeletedBlockLogImpl(conf);
-    int svcInterval = conf.getInt(
-        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
-        OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
-    long serviceTimeout = conf.getTimeDuration(
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
-        OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
-    blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
-        containerManager, nodeManager, svcInterval, serviceTimeout);
+    int svcInterval =
+        conf.getInt(
+            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
+            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
+    long serviceTimeout =
+        conf.getTimeDuration(
+            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    blockDeletingService =
+        new SCMBlockDeletingService(
+            deletedBlockLog, containerManager, nodeManager, svcInterval,
+            serviceTimeout);
   }
 
   /**
    * Start block manager services.
+   *
    * @throws IOException
    */
   public void start() throws IOException {
@@ -177,6 +162,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   /**
    * Shutdown block manager services.
+   *
    * @throws IOException
    */
   public void stop() throws IOException {
@@ -184,59 +170,17 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     this.close();
   }
 
-  // TODO: close full (or almost full) containers with a separate thread.
-  /**
-   * Load allocated containers from persistent store.
-   * @throws IOException
-   */
-  private void loadAllocatedContainers() throws IOException {
-    // Pre-allocate empty map entry by state to avoid null check
-    containers = new ConcurrentHashMap<>();
-    for (OzoneProtos.LifeCycleState state :
-        OzoneProtos.LifeCycleState.values()) {
-      containers.put(state, new ConcurrentHashMap());
-    }
-    try {
-      containerStore.iterate(null, (key, value) -> {
-        try {
-          String containerName = DFSUtil.bytes2String(key);
-          Long containerUsed = Long.parseLong(DFSUtil.bytes2String(value));
-          ContainerInfo containerInfo =
-              containerManager.getContainer(containerName);
-          // TODO: remove the container from block manager's container DB
-          // Most likely the allocated container is timeout and cleaned up
-          // by SCM, we should clean up correspondingly instead of just skip it.
-          if (containerInfo == null) {
-            LOG.warn("Container {} allocated by block service" +
-                "can't be found in SCM", containerName);
-            return true;
-          }
-          Map<String, BlockContainerInfo> containersByState =
-              containers.get(containerInfo.getState());
-          containersByState.put(containerName,
-              new BlockContainerInfo(containerInfo, containerUsed));
-          LOG.debug("Loading allocated container: {} used : {} state: {}",
-              containerName, containerUsed, containerInfo.getState());
-        } catch (Exception e) {
-          LOG.warn("Failed loading allocated container, continue next...");
-        }
-        return true;
-      });
-    } catch (IOException e) {
-      LOG.error("Loading open container store failed." + e);
-      throw new SCMException("Failed to load open container store",
-          FAILED_TO_LOAD_OPEN_CONTAINER);
-    }
-  }
-
   /**
    * Pre allocate specified count of containers for block creation.
-   * @param count - number of containers to allocate.
-   * @return list of container names allocated.
+   *
+   * @param count - Number of containers to allocate.
+   * @param type - Type of containers
+   * @param factor - how many copies needed for this container.
    * @throws IOException
    */
-  private List<String> allocateContainers(int count) throws IOException {
-    List<String> results = new ArrayList();
+  private void preAllocateContainers(int count, ReplicationType type,
+      ReplicationFactor factor)
+      throws IOException {
     lock.lock();
     try {
       for (int i = 0; i < count; i++) {
@@ -244,210 +188,177 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
         ContainerInfo containerInfo = null;
         try {
           // TODO: Fix this later when Ratis is made the Default.
-          containerInfo = containerManager.allocateContainer(
-              OzoneProtos.ReplicationType.STAND_ALONE,
-              OzoneProtos.ReplicationFactor.ONE,
-              containerName);
+          containerInfo = containerManager.allocateContainer(type, factor,
+              containerName, owner);
 
           if (containerInfo == null) {
             LOG.warn("Unable to allocate container.");
             continue;
           }
         } catch (IOException ex) {
-          LOG.warn("Unable to allocate container: " + ex);
+          LOG.warn("Unable to allocate container: {}", ex);
           continue;
         }
-        Map<String, BlockContainerInfo> containersByState =
-            containers.get(OzoneProtos.LifeCycleState.ALLOCATED);
-        Preconditions.checkNotNull(containersByState);
-        containersByState.put(containerName,
-            new BlockContainerInfo(containerInfo, 0));
-        containerStore.put(DFSUtil.string2Bytes(containerName),
-            DFSUtil.string2Bytes(Long.toString(0L)));
-        results.add(containerName);
       }
     } finally {
       lock.unlock();
     }
-    return results;
   }
 
   /**
-   * Filter container by states and size.
-   * @param state the state of the container.
-   * @param size the minimal available size of the container
-   * @return allocated containers satisfy both state and size.
-   */
-  private List <String> filterContainers(OzoneProtos.LifeCycleState state,
-      long size) {
-    Map<String, BlockContainerInfo> containersByState =
-        this.containers.get(state);
-    return containersByState.entrySet().parallelStream()
-        .filter(e -> ((e.getValue().getAllocated() + size < containerSize)))
-        .map(e -> e.getKey())
-        .collect(Collectors.toList());
-  }
-
-  private BlockContainerInfo getContainer(OzoneProtos.LifeCycleState state,
-      String name) {
-    Map<String, BlockContainerInfo> containersByState =
-        this.containers.get(state);
-    return containersByState.get(name);
-  }
-
-  // Relies on the caller such as allocateBlock() to hold the lock
-  // to ensure containers map consistent.
-  private void updateContainer(OzoneProtos.LifeCycleState oldState, String name,
-      OzoneProtos.LifeCycleState newState) {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Update container {} from state {} to state {}",
-          name, oldState, newState);
-    }
-    Map<String, BlockContainerInfo> containersInOldState =
-        this.containers.get(oldState);
-    BlockContainerInfo containerInfo = containersInOldState.get(name);
-    Preconditions.checkNotNull(containerInfo);
-    containersInOldState.remove(name);
-    Map<String, BlockContainerInfo> containersInNewState =
-        this.containers.get(newState);
-    containersInNewState.put(name, containerInfo);
-  }
-
-  // Refresh containers that have been allocated.
-  // We may not need to track all the states, just the creating/open/close
-  // should be enough for now.
-  private void refreshContainers() {
-    Map<String, BlockContainerInfo> containersByState =
-        this.containers.get(OzoneProtos.LifeCycleState.CREATING);
-    for (String containerName : containersByState.keySet()) {
-      try {
-        ContainerInfo containerInfo =
-            containerManager.getContainer(containerName);
-        if (containerInfo == null) {
-          // TODO: clean up containers that has been deleted on SCM but
-          // TODO: still in ALLOCATED state in block manager.
-          LOG.debug("Container {} allocated by block service"
-              + "can't be found in SCM", containerName);
-          continue;
-        }
-        if (containerInfo.getState() == OzoneProtos.LifeCycleState.OPEN) {
-          updateContainer(OzoneProtos.LifeCycleState.CREATING, containerName,
-              containerInfo.getState());
-        }
-        // TODO: check containers in other state and refresh as needed.
-        // TODO: ALLOCATED container that is timeout and DELETED. (unit test)
-        // TODO: OPEN container that is CLOSE.
-      } catch (IOException ex) {
-        LOG.debug("Failed to get container info for: {}", containerName);
-      }
-    }
-  }
-
-  /**
-   * Allocates a new block for a given size.
+   * Allocates a block in a container and returns that info.
    *
-   * SCM choose one of the open containers and returns that as the location for
-   * the new block. An open container is a container that is actively written to
-   * via replicated log.
-   * @param size - size of the block to be allocated
-   * @return - the allocated pipeline and key for the block
-   * @throws IOException
+   * @param size - Block Size
+   * @param type Replication Type
+   * @param factor - Replication Factor
+   * @return Allocated block
+   * @throws IOException on failure.
    */
   @Override
-  public AllocatedBlock allocateBlock(final long size) throws IOException {
-    boolean createContainer = false;
+  public AllocatedBlock allocateBlock(
+      final long size, ReplicationType type, ReplicationFactor factor)
+      throws IOException {
+    LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
+
     if (size < 0 || size > containerSize) {
-      throw new SCMException("Unsupported block size", INVALID_BLOCK_SIZE);
+      LOG.warn("Invalid block size requested : {}", size);
+      throw new SCMException("Unsupported block size: " + size,
+          INVALID_BLOCK_SIZE);
     }
+
     if (!nodeManager.isOutOfNodeChillMode()) {
+      LOG.warn("Not out of Chill mode.");
       throw new SCMException("Unable to create block while in chill mode",
           CHILL_MODE_EXCEPTION);
     }
 
     lock.lock();
     try {
-      refreshContainers();
-      List<String> candidates;
-      candidates = filterContainers(OzoneProtos.LifeCycleState.OPEN, size);
-      if (candidates.size() == 0) {
-        candidates =
-            filterContainers(OzoneProtos.LifeCycleState.ALLOCATED, size);
-        if (candidates.size() == 0) {
-          try {
-            candidates = allocateContainers(containerProvisionBatchSize);
-          } catch (IOException ex) {
-            LOG.error("Unable to allocate container for the block.");
-            throw new SCMException("Unable to allocate container for the block",
-                FAILED_TO_ALLOCATE_CONTAINER);
-          }
-        }
-        // now we should have some candidates in ALLOCATE state
-        if (candidates.size() == 0) {
-          throw new SCMException(
-              "Fail to find any container to allocate block " + "of size "
-                  + size + ".", FAILED_TO_FIND_CONTAINER_WITH_SPACE);
-        }
+      /*
+               Here is the high level logic.
+
+               1. First we check if there are containers in ALLOCATED state,
+               that is
+                SCM has allocated them in the SCM namespace but the
+                corresponding
+                container has not been created in the Datanode yet. If we
+                have any
+                in that state, we will return that to the client, which allows
+                client to finish creating those containers. This is a sort of
+                 greedy
+                 algorithm, our primary purpose is to get as many containers as
+                 possible.
+
+                2. If there are no allocated containers -- Then we find a Open
+                container that matches that pattern.
+
+                3. If both of them fail, the we will pre-allocate a bunch of
+                conatainers in SCM and try again.
+
+               TODO : Support random picking of two containers from the list.
+                So we
+               can use different kind of policies.
+      */
+
+      BlockContainerInfo containerInfo = null;
+
+      // Look for ALLOCATED container that matches all other parameters.
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(
+                  size, owner, type, factor, OzoneProtos.LifeCycleState
+                      .ALLOCATED);
+      if (containerInfo != null) {
+        return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
       }
 
-      // Candidates list now should include only ALLOCATE or OPEN containers
-      int randomIdx = rand.nextInt(candidates.size());
-      String containerName = candidates.get(randomIdx);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Find {} candidates: {}, picking: {}", candidates.size(),
-            candidates.toString(), containerName);
+      // Since we found no allocated containers that match our criteria, let us
+      // look for OPEN containers that match the criteria.
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(size, owner, type, factor, OzoneProtos
+                  .LifeCycleState.OPEN);
+      if (containerInfo != null) {
+        return newBlock(containerInfo, OzoneProtos.LifeCycleState.OPEN);
       }
 
-      ContainerInfo containerInfo =
-          containerManager.getContainer(containerName);
-      if (containerInfo == null) {
-        LOG.debug("Unable to find container for the block");
-        throw new SCMException("Unable to find container to allocate block",
-            FAILED_TO_FIND_CONTAINER);
+      // We found neither ALLOCATED or OPEN Containers. This generally means
+      // that most of our containers are full or we have not allocated
+      // containers of the type and replication factor. So let us go and
+      // allocate some.
+      preAllocateContainers(containerProvisionBatchSize, type, factor);
+
+      // Since we just allocated a set of containers this should work
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(
+                  size, owner, type, factor, OzoneProtos.LifeCycleState
+                      .ALLOCATED);
+      if (containerInfo != null) {
+        return newBlock(containerInfo, OzoneProtos.LifeCycleState.ALLOCATED);
       }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Candidate {} state {}", containerName,
-            containerInfo.getState());
-      }
-      // Container must be either OPEN or ALLOCATE state
-      if (containerInfo.getState() == OzoneProtos.LifeCycleState.ALLOCATED) {
-        createContainer = true;
-      }
-
-      // TODO: make block key easier to debug (e.g., seq no)
-      // Allocate key for the block
-      String blockKey = UUID.randomUUID().toString();
-      AllocatedBlock.Builder abb = new AllocatedBlock.Builder().setKey(blockKey)
-          .setPipeline(containerInfo.getPipeline())
-          .setShouldCreateContainer(createContainer);
-      if (containerInfo.getPipeline().getMachines().size() > 0) {
-        blockStore.put(DFSUtil.string2Bytes(blockKey),
-            DFSUtil.string2Bytes(containerName));
-
-        // update the container usage information
-        BlockContainerInfo containerInfoUpdate =
-            getContainer(containerInfo.getState(), containerName);
-        Preconditions.checkNotNull(containerInfoUpdate);
-        containerInfoUpdate.addAllocated(size);
-        containerStore.put(DFSUtil.string2Bytes(containerName), DFSUtil
-            .string2Bytes(Long.toString(containerInfoUpdate.getAllocated())));
-        if (createContainer) {
-          OzoneProtos.LifeCycleState newState = containerManager
-              .updateContainerState(containerName,
-                  OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
-          updateContainer(containerInfo.getState(), containerName, newState);
-        }
-        return abb.build();
-      }
+      // we have tried all strategies we know and but somehow we are not able
+      // to get a container for this block. Log that info and return a null.
+      LOG.error(
+          "Unable to allocate a block for the size: {}, type: {}, " +
+              "factor: {}",
+          size,
+          type,
+          factor);
+      return null;
     } finally {
       lock.unlock();
     }
-    return null;
   }
 
   /**
+   * newBlock - returns a new block assigned to a container.
    *
+   * @param containerInfo - Container Info.
+   * @param state - Current state of the container.
+   * @return AllocatedBlock
+   */
+  private AllocatedBlock newBlock(
+      BlockContainerInfo containerInfo, OzoneProtos.LifeCycleState state)
+      throws IOException {
+
+    // TODO : Replace this with Block ID.
+    String blockKey = UUID.randomUUID().toString();
+    boolean createContainer = (state == OzoneProtos.LifeCycleState.ALLOCATED);
+
+    AllocatedBlock.Builder abb =
+        new AllocatedBlock.Builder()
+            .setKey(blockKey)
+            // TODO : Use containerinfo instead of pipeline.
+            .setPipeline(containerInfo.getPipeline())
+            .setShouldCreateContainer(createContainer);
+    LOG.trace("New block allocated : {} Container ID: {}", blockKey,
+        containerInfo.toString());
+
+    if (containerInfo.getPipeline().getMachines().size() == 0) {
+      LOG.error("Pipeline Machine count is zero.");
+      return null;
+    }
+
+    // Persist this block info to the blockStore DB, so getBlock(key) can
+    // find which container the block lives.
+    // TODO : Remove this DB in future
+    // and make this a KSM operation. Category: SCALABILITY.
+    if (containerInfo.getPipeline().getMachines().size() > 0) {
+      blockStore.put(
+          DFSUtil.string2Bytes(blockKey),
+          DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName()));
+    }
+    return abb.build();
+  }
+
+  /**
    * Given a block key, return the Pipeline information.
+   *
    * @param key - block key assigned by SCM.
    * @return Pipeline (list of DNs and leader) to access the block.
    * @throws IOException
@@ -462,14 +373,15 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
             "Specified block key does not exist. key : " + key,
             FAILED_TO_FIND_BLOCK);
       }
+
       String containerName = DFSUtil.bytes2String(containerBytes);
-      ContainerInfo containerInfo =
-          containerManager.getContainer(containerName);
+      ContainerInfo containerInfo = containerManager.getContainer(
+          containerName);
       if (containerInfo == null) {
-        LOG.debug(
-            "Container {} allocated by block service" + "can't be found in SCM",
-            containerName);
-        throw new SCMException("Unable to find container for the block",
+        LOG.debug("Container {} allocated by block service"
+            + "can't be found in SCM", containerName);
+        throw new SCMException(
+            "Unable to find container for the block",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
       return containerInfo.getPipeline();
@@ -479,13 +391,14 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
   }
 
   /**
-   * Deletes a list of blocks in an atomic operation. Internally, SCM
-   * writes these blocks into a {@link DeletedBlockLog} and deletes them
-   * from SCM DB. If this is successful, given blocks are entering pending
-   * deletion state and becomes invisible from SCM namespace.
+   * Deletes a list of blocks in an atomic operation. Internally, SCM writes
+   * these blocks into a
+   * {@link DeletedBlockLog} and deletes them from SCM DB. If this is
+   * successful, given blocks are
+   * entering pending deletion state and becomes invisible from SCM namespace.
    *
-   * @param blockIDs block IDs. This is often the list of blocks of
-   *                 a particular object key.
+   * @param blockIDs block IDs. This is often the list of blocks of a
+   * particular object key.
    * @throws IOException if exception happens, non of the blocks is deleted.
    */
   @Override
@@ -546,16 +459,20 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
           // to be invisible from namespace but actual data are not removed.
           // We log an error here so admin can manually check and fix such
           // errors.
-          LOG.error("Blocks might be in inconsistent state because"
+          LOG.error(
+              "Blocks might be in inconsistent state because"
                   + " they were moved to pending deletion state in SCM DB but"
                   + " not written into delLog. Admin can manually add them"
                   + " into delLog for deletions. Inconsistent block list: {}",
-              String.join(",", blockIDs), e);
+              String.join(",", blockIDs),
+              e);
           throw rollbackException;
         }
-        throw new IOException("Skip writing the deleted blocks info to"
-            + " the delLog because addTransaction fails. Batch skipped: "
-            + String.join(",", blockIDs), e);
+        throw new IOException(
+            "Skip writing the deleted blocks info to"
+                + " the delLog because addTransaction fails. Batch skipped: "
+                + String.join(",", blockIDs),
+            e);
       }
       // TODO: Container report handling of the deleted blocks:
       // Remove tombstone and update open container usage.
@@ -577,6 +494,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   /**
    * Close the resources for BlockManager.
+   *
    * @throws IOException
    */
   @Override
@@ -584,9 +502,6 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
     if (blockStore != null) {
       blockStore.close();
     }
-    if (containerStore != null) {
-      containerStore.close();
-    }
     if (deletedBlockLog != null) {
       deletedBlockLog.close();
     }
@@ -599,6 +514,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
 
   @Override
   public int getOpenContainersNo() {
-    return containers.get(OzoneProtos.LifeCycleState.OPEN).size();
+    return 0;
+    // TODO : FIX ME : The open container being a single number does not make
+    // sense.
+    // We have to get open containers by Replication Type and Replication
+    // factor. Hence returning 0 for now.
+    // containers.get(OzoneProtos.LifeCycleState.OPEN).size();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
index 5a1af51..8c4782e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerMapping.java
@@ -1,36 +1,36 @@
 /**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with this
- * work for additional information regarding copyright ownership.  The ASF
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
  * licenses this file to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
  * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>http://www.apache.org/licenses/LICENSE-2.0
  * <p>
- * Unless required by applicable law or agreed to in writing, software
+ * <p>Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  * License for the specific language governing permissions and limitations under
  * the License.
  */
-
 package org.apache.hadoop.ozone.scm.container;
 
 import com.google.common.base.Preconditions;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
-import org.apache.hadoop.ozone.common.statemachine.StateMachine;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
 import org.apache.hadoop.ozone.scm.exceptions.SCMException;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.util.Time;
 import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
 import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
 import org.apache.hadoop.utils.MetadataStore;
@@ -42,22 +42,21 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_DB;
 
 /**
- * Mapping class contains the mapping from a name to a pipeline mapping. This is
- * used by SCM when allocating new locations and when looking up a key.
+ * Mapping class contains the mapping from a name to a pipeline mapping. This
+ * is used by SCM when
+ * allocating new locations and when looking up a key.
  */
 public class ContainerMapping implements Mapping {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerMapping.class);
+  private static final Logger LOG = LoggerFactory.getLogger(ContainerMapping
+      .class);
 
   private final NodeManager nodeManager;
   private final long cacheSize;
@@ -65,24 +64,27 @@ public class ContainerMapping implements Mapping {
   private final Charset encoding = Charset.forName("UTF-8");
   private final MetadataStore containerStore;
   private final PipelineSelector pipelineSelector;
-
-  private final StateMachine<OzoneProtos.LifeCycleState,
-        OzoneProtos.LifeCycleEvent> stateMachine;
+  private final ContainerStateManager containerStateManager;
 
   /**
-   * Constructs a mapping class that creates mapping between container names and
-   * pipelines.
+   * Constructs a mapping class that creates mapping between container names
+   * and pipelines.
    *
    * @param nodeManager - NodeManager so that we can get the nodes that are
-   * healthy to place new containers.
+   * healthy to place new
+   *     containers.
    * @param cacheSizeMB - Amount of memory reserved for the LSM tree to cache
-   * its nodes. This is passed to LevelDB and this memory is allocated in Native
-   * code space. CacheSize is specified in MB.
+   * its nodes. This is
+   *     passed to LevelDB and this memory is allocated in Native code space.
+   *     CacheSize is specified
+   *     in MB.
    * @throws IOException
    */
   @SuppressWarnings("unchecked")
-  public ContainerMapping(final Configuration conf,
-      final NodeManager nodeManager, final int cacheSizeMB) throws IOException {
+  public ContainerMapping(
+      final Configuration conf, final NodeManager nodeManager, final int
+      cacheSizeMB)
+      throws IOException {
     this.nodeManager = nodeManager;
     this.cacheSize = cacheSizeMB;
 
@@ -90,100 +92,48 @@ public class ContainerMapping implements Mapping {
 
     // Write the container name to pipeline mapping.
     File containerDBPath = new File(metaDir, CONTAINER_DB);
-    containerStore = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(containerDBPath)
-        .setCacheSize(this.cacheSize * OzoneConsts.MB)
-        .build();
+    containerStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(containerDBPath)
+            .setCacheSize(this.cacheSize * OzoneConsts.MB)
+            .build();
 
     this.lock = new ReentrantLock();
 
     this.pipelineSelector = new PipelineSelector(nodeManager, conf);
-
-    // Initialize the container state machine.
-    Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
-    finalStates.add(OzoneProtos.LifeCycleState.OPEN);
-    finalStates.add(OzoneProtos.LifeCycleState.CLOSED);
-    finalStates.add(OzoneProtos.LifeCycleState.DELETED);
-
-    this.stateMachine = new StateMachine<>(
-        OzoneProtos.LifeCycleState.ALLOCATED, finalStates);
-    initializeStateMachine();
-  }
-
-  // Client-driven Create State Machine
-  // States: <ALLOCATED>------------->CREATING----------------->[OPEN]
-  // Events:            (BEGIN_CREATE)    |    (COMPLETE_CREATE)
-  //                                      |
-  //                                      |(TIMEOUT)
-  //                                      V
-  //                                  DELETING----------------->[DELETED]
-  //                                           (CLEANUP)
-
-  // SCM Open/Close State Machine
-  // States: OPEN------------------>[CLOSED]
-  // Events:        (CLOSE)
-
-  // Delete State Machine
-  // States: OPEN------------------>DELETING------------------>[DELETED]
-  // Events:         (DELETE)                  (CLEANUP)
-  private void initializeStateMachine() {
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.ALLOCATED,
-        OzoneProtos.LifeCycleState.CREATING,
-        OzoneProtos.LifeCycleEvent.BEGIN_CREATE);
-
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
-        OzoneProtos.LifeCycleState.OPEN,
-        OzoneProtos.LifeCycleEvent.COMPLETE_CREATE);
-
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
-        OzoneProtos.LifeCycleState.CLOSED,
-        OzoneProtos.LifeCycleEvent.CLOSE);
-
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.OPEN,
-        OzoneProtos.LifeCycleState.DELETING,
-        OzoneProtos.LifeCycleEvent.DELETE);
-
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.DELETING,
-        OzoneProtos.LifeCycleState.DELETED,
-        OzoneProtos.LifeCycleEvent.CLEANUP);
-
-    // Creating timeout -> Deleting
-    stateMachine.addTransition(OzoneProtos.LifeCycleState.CREATING,
-        OzoneProtos.LifeCycleState.DELETING,
-        OzoneProtos.LifeCycleEvent.TIMEOUT);
+    this.containerStateManager = new ContainerStateManager(conf, +this
+        .cacheSize * OzoneConsts.MB);
+    LOG.trace("Container State Manager created.");
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
-  public ContainerInfo getContainer(final String containerName)
-      throws IOException {
+  public ContainerInfo getContainer(final String containerName) throws
+      IOException {
     ContainerInfo containerInfo;
     lock.lock();
     try {
-      byte[] containerBytes =
-          containerStore.get(containerName.getBytes(encoding));
+      byte[] containerBytes = containerStore.get(containerName.getBytes(
+          encoding));
       if (containerBytes == null) {
         throw new SCMException(
             "Specified key does not exist. key : " + containerName,
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
-      containerInfo = ContainerInfo.fromProtobuf(
-          OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+      containerInfo =
+          ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
+              .parseFrom(containerBytes));
       return containerInfo;
     } finally {
       lock.unlock();
     }
   }
 
-  /**
-   * {@inheritDoc}
-   */
+  /** {@inheritDoc} */
   @Override
-  public List<Pipeline> listContainer(String startName,
-      String prefixName, int count)
+  public List<Pipeline> listContainer(String startName, String prefixName,
+      int count)
       throws IOException {
     List<Pipeline> pipelineList = new ArrayList<>();
     lock.lock();
@@ -192,8 +142,8 @@ public class ContainerMapping implements Mapping {
         throw new IOException("No container exists in current db");
       }
       MetadataKeyFilter prefixFilter = new KeyPrefixFilter(prefixName);
-      byte[] startKey = startName == null ?
-          null : DFSUtil.string2Bytes(startName);
+      byte[] startKey = startName == null ? null : DFSUtil.string2Bytes(
+          startName);
       List<Map.Entry<byte[], byte[]>> range =
           containerStore.getRangeKVs(startKey, count, prefixFilter);
 
@@ -201,8 +151,10 @@ public class ContainerMapping implements Mapping {
       // TODO: return list of ContainerInfo instead of pipelines.
       // TODO: filter by container state
       for (Map.Entry<byte[], byte[]> entry : range) {
-        ContainerInfo containerInfo =  ContainerInfo.fromProtobuf(
-            OzoneProtos.SCMContainerInfo.PARSER.parseFrom(entry.getValue()));
+        ContainerInfo containerInfo =
+            ContainerInfo.fromProtobuf(
+                OzoneProtos.SCMContainerInfo.PARSER.parseFrom(
+                    entry.getValue()));
         Preconditions.checkNotNull(containerInfo);
         pipelineList.add(containerInfo.getPipeline());
       }
@@ -215,40 +167,43 @@ public class ContainerMapping implements Mapping {
   /**
    * Allocates a new container.
    *
-   * @param containerName - Name of the container.
    * @param replicationFactor - replication factor of the container.
+   * @param containerName - Name of the container.
+   * @param owner
    * @return - Pipeline that makes up this container.
    * @throws IOException - Exception
    */
   @Override
-  public ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
-      OzoneProtos.ReplicationFactor replicationFactor,
-      final String containerName) throws IOException {
+  public ContainerInfo allocateContainer(
+      ReplicationType type,
+      ReplicationFactor replicationFactor,
+      final String containerName,
+      Owner owner)
+      throws IOException {
     Preconditions.checkNotNull(containerName);
     Preconditions.checkState(!containerName.isEmpty());
     ContainerInfo containerInfo = null;
     if (!nodeManager.isOutOfNodeChillMode()) {
-      throw new SCMException("Unable to create container while in chill mode",
+      throw new SCMException(
+          "Unable to create container while in chill mode",
           SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
     }
 
     lock.lock();
     try {
-      byte[] containerBytes =
-          containerStore.get(containerName.getBytes(encoding));
+      byte[] containerBytes = containerStore.get(containerName.getBytes(
+          encoding));
       if (containerBytes != null) {
-        throw new SCMException("Specified container already exists. key : " +
-            containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
+        throw new SCMException(
+            "Specified container already exists. key : " + containerName,
+            SCMException.ResultCodes.CONTAINER_EXISTS);
       }
-      Pipeline pipeline = pipelineSelector.getReplicationPipeline(type,
-          replicationFactor, containerName);
-      containerInfo = new ContainerInfo.Builder()
-          .setState(OzoneProtos.LifeCycleState.ALLOCATED)
-          .setPipeline(pipeline)
-          .setStateEnterTime(Time.monotonicNow())
-          .build();
-      containerStore.put(containerName.getBytes(encoding),
-          containerInfo.getProtobuf().toByteArray());
+      containerInfo =
+          containerStateManager.allocateContainer(
+              pipelineSelector, type, replicationFactor, containerName, owner);
+      containerStore.put(
+          containerName.getBytes(encoding), containerInfo.getProtobuf()
+              .toByteArray());
     } finally {
       lock.unlock();
     }
@@ -259,19 +214,20 @@ public class ContainerMapping implements Mapping {
    * Deletes a container from SCM.
    *
    * @param containerName - Container name
-   * @throws IOException if container doesn't exist or container store failed to
-   *                     delete the specified key.
+   * @throws IOException if container doesn't exist or container store failed
+   * to delete the
+   *     specified key.
    */
   @Override
   public void deleteContainer(String containerName) throws IOException {
     lock.lock();
     try {
       byte[] dbKey = containerName.getBytes(encoding);
-      byte[] containerBytes =
-          containerStore.get(dbKey);
-      if(containerBytes == null) {
-        throw new SCMException("Failed to delete container "
-            + containerName + ", reason : container doesn't exist.",
+      byte[] containerBytes = containerStore.get(dbKey);
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Failed to delete container " + containerName + ", reason : " +
+                "container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
       containerStore.delete(dbKey);
@@ -287,8 +243,11 @@ public class ContainerMapping implements Mapping {
       OzoneProtos.LifeCycleState newState =
           updateContainerState(containerName, OzoneProtos.LifeCycleEvent.CLOSE);
       if (newState != OzoneProtos.LifeCycleState.CLOSED) {
-        throw new SCMException("Failed to close container " + containerName +
-            ", reason : container in state " + newState,
+        throw new SCMException(
+            "Failed to close container "
+                + containerName
+                + ", reason : container in state "
+                + newState,
             SCMException.ResultCodes.UNEXPECTED_CONTAINER_STATE);
       }
     } finally {
@@ -296,13 +255,11 @@ public class ContainerMapping implements Mapping {
     }
   }
 
-  /**
-   * {@inheritDoc}
-   * Used by client to update container state on SCM.
-   */
+  /** {@inheritDoc} Used by client to update container state on SCM. */
   @Override
-  public OzoneProtos.LifeCycleState updateContainerState(String containerName,
-      OzoneProtos.LifeCycleEvent event) throws IOException {
+  public OzoneProtos.LifeCycleState updateContainerState(
+      String containerName, OzoneProtos.LifeCycleEvent event) throws
+      IOException {
     ContainerInfo containerInfo;
     lock.lock();
     try {
@@ -310,39 +267,49 @@ public class ContainerMapping implements Mapping {
       byte[] containerBytes = containerStore.get(dbKey);
       if (containerBytes == null) {
         throw new SCMException(
-            "Failed to update container state" + containerName
+            "Failed to update container state"
+                + containerName
                 + ", reason : container doesn't exist.",
             SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
       }
-      containerInfo = ContainerInfo.fromProtobuf(
-          OzoneProtos.SCMContainerInfo.PARSER.parseFrom(containerBytes));
+      containerInfo =
+          ContainerInfo.fromProtobuf(OzoneProtos.SCMContainerInfo.PARSER
+              .parseFrom(containerBytes));
+
+      Preconditions.checkNotNull(containerInfo);
+
+      // TODO: Actual used will be updated via Container Reports later.
+      containerInfo.setState(
+          containerStateManager.updateContainerState(
+              new BlockContainerInfo(containerInfo, 0), event));
 
-      OzoneProtos.LifeCycleState newState;
-      try {
-        newState = stateMachine.getNextState(containerInfo.getState(), event);
-      } catch (InvalidStateTransitionException ex) {
-        throw new SCMException(
-            "Failed to update container state" + containerName
-                + ", reason : invalid state transition from state: "
-                + containerInfo.getState() + " upon event: " + event + ".",
-            SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE);
-      }
-      containerInfo.setState(newState);
       containerStore.put(dbKey, containerInfo.getProtobuf().toByteArray());
-      return newState;
+      return containerInfo.getState();
     } finally {
       lock.unlock();
     }
   }
 
+  /** + * Returns the container State Manager. + * + * @return
+   * ContainerStateManager + */
+  @Override
+  public ContainerStateManager getStateManager() {
+    return containerStateManager;
+  }
+
   /**
-   * Closes this stream and releases any system resources associated with it. If
-   * the stream is already closed then invoking this method has no effect.
+   * Closes this stream and releases any system resources associated with it.
+   * If the stream is
+   * already closed then invoking this method has no effect.
+   *
    * <p>
-   * <p> As noted in {@link AutoCloseable#close()}, cases where the close may
-   * fail require careful attention. It is strongly advised to relinquish the
-   * underlying resources and to internally <em>mark</em> the {@code Closeable}
-   * as closed, prior to throwing the {@code IOException}.
+   *
+   * <p>As noted in {@link AutoCloseable#close()}, cases where the close may
+   * fail require careful
+   * attention. It is strongly advised to relinquish the underlying resources
+   * and to internally
+   * <em>mark</em> the {@code Closeable} as closed, prior to throwing the
+   * {@code IOException}.
    *
    * @throws IOException if an I/O error occurs
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
new file mode 100644
index 0000000..12f0a9d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/ContainerStateManager.java
@@ -0,0 +1,472 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.scm.container;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
+import org.apache.hadoop.ozone.scm.exceptions.SCMException;
+import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
+import org.apache.hadoop.scm.ScmConfigKeys;
+import org.apache.hadoop.scm.container.common.helpers.BlockContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.PriorityQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationType;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.Owner;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleEvent;
+import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.ReplicationFactor;
+
+import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.FAILED_TO_CHANGE_CONTAINER_STATE;
+
+/**
+ * A container state manager keeps track of container states and returns
+ * containers that match various queries.
+ * <p>
+ * This state machine is driven by a combination of server and client actions.
+ * <p>
+ * This is how a create container happens: 1. When a container is created, the
+ * Server(or SCM) marks that Container as ALLOCATED state. In this state, SCM
+ * has chosen a pipeline for container to live on. However, the container is not
+ * created yet. This container along with the pipeline is returned to the
+ * client.
+ * <p>
+ * 2. The client when it sees the Container state as ALLOCATED understands that
+ * container needs to be created on the specified pipeline. The client lets the
+ * SCM know that saw this flag and is initiating the on the data nodes.
+ * <p>
+ * This is done by calling into notifyObjectCreation(ContainerName,
+ * BEGIN_CREATE) flag. When SCM gets this call, SCM puts the container state
+ * into CREATING. All this state means is that SCM told Client to create a
+ * container and client saw that request.
+ * <p>
+ * 3. Then client makes calls to datanodes directly, asking the datanodes to
+ * create the container. This is done with the help of pipeline that supports
+ * this container.
+ * <p>
+ * 4. Once the creation of the container is complete, the client will make
+ * another call to the SCM, this time specifing the containerName and the
+ * COMPLETE_CREATE as the Event.
+ * <p>
+ * 5. With COMPLETE_CREATE event, the container moves to an Open State. This is
+ * the state when clients can write to a container.
+ * <p>
+ * 6. If the client does not respond with the COMPLETE_CREATE event with a
+ * certain time, the state machine times out and triggers a delete operation of
+ * the container.
+ * <p>
+ * Please see the function initializeStateMachine below to see how this looks in
+ * code.
+ * <p>
+ * Reusing existing container :
+ * <p>
+ * The create container call is not made all the time, the system tries to use
+ * open containers as much as possible. So in those cases, it looks thru the
+ * list of open containers and will return containers that match the specific
+ * signature.
+ * <p>
+ * Please note : Logically there are 3 separate state machines in the case of
+ * containers.
+ * <p>
+ * The Create State Machine -- Commented extensively above.
+ * <p>
+ * Open/Close State Machine - Once the container is in the Open State,
+ * eventually it will be closed, once sufficient data has been written to it.
+ * <p>
+ * TimeOut Delete Container State Machine - if the container creating times out,
+ * then Container State manager decides to delete the container.
+ */
+public class ContainerStateManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ContainerStateManager.class);
+
+  private final StateMachine<OzoneProtos.LifeCycleState,
+      OzoneProtos.LifeCycleEvent> stateMachine;
+
+  private final long containerSize;
+  private final long cacheSize;
+  private final long blockSize;
+
+  // A map that maintains the ContainerKey to Containers of that type ordered
+  // by last access time.
+  private final Lock writeLock;
+  private final Queue<BlockContainerInfo> containerCloseQueue;
+  private Map<ContainerKey, PriorityQueue<BlockContainerInfo>> containers;
+
+  /**
+   * Constructs a Container State Manager that tracks all containers owned by
+   * SCM for the purpose of allocation of blocks.
+   * <p>
+   * TODO : Add Container Tags so we know which containers are owned by SCM.
+   */
+  public ContainerStateManager(Configuration configuration, final long
+      cacheSize) throws IOException {
+    this.cacheSize = cacheSize;
+
+    // Initialize the container state machine.
+    Set<OzoneProtos.LifeCycleState> finalStates = new HashSet();
+
+    // These are the steady states of a container.
+    finalStates.add(LifeCycleState.OPEN);
+    finalStates.add(LifeCycleState.CLOSED);
+    finalStates.add(LifeCycleState.DELETED);
+
+    this.stateMachine = new StateMachine<>(LifeCycleState.ALLOCATED,
+        finalStates);
+    initializeStateMachine();
+
+    this.containerSize = OzoneConsts.GB * configuration.getInt(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+
+    this.blockSize = OzoneConsts.MB * configuration.getLong(
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_IN_MB,
+        OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT);
+
+    writeLock = new ReentrantLock();
+    containers = new HashMap<>();
+    initializeContainerMaps(containers);
+    containerCloseQueue = new ConcurrentLinkedQueue<BlockContainerInfo>();
+  }
+
+  /**
+   * Creates containers maps of following types.
+   * <p>
+   * OZONE  of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
+   * CREATING, OPEN, CLOSED, DELETING, DELETED}  container states
+   * <p>
+   * CBLOCK of type {Ratis, StandAlone, Chained} for each of these {ALLOCATED,
+   * CREATING, OPEN, CLOSED, DELETING, DELETED}  container states
+   * <p>
+   * Commented out for now: HDFS of type {Ratis, StandAlone, Chained} for each
+   * of these {ALLOCATED, CREATING, OPEN, CLOSED, DELETING, DELETED}  container
+   * states
+   */
+  private void initializeContainerMaps(Map containerMaps) {
+    // Called only from Ctor path, hence no lock is held.
+    Preconditions.checkNotNull(containerMaps);
+    for (OzoneProtos.Owner owner : OzoneProtos.Owner.values()) {
+      for (ReplicationType type : ReplicationType.values()) {
+        for (ReplicationFactor factor : ReplicationFactor.values()) {
+          for (LifeCycleState state : LifeCycleState.values()) {
+            ContainerKey key = new ContainerKey(owner, type, factor, state);
+            PriorityQueue<BlockContainerInfo> queue = new PriorityQueue<>();
+            containerMaps.put(key, queue);
+          }
+        }
+      }
+    }
+  }
+
+  // 1. Client -> SCM: Begin_create
+  // 2. Client -> Datanode: create
+  // 3. Client -> SCM: complete    {SCM:Creating ->OK}
+
+  // 3. Client -> SCM: complete    {SCM:DELETING -> INVALID}
+
+  // 4. Client->Datanode: write data.
+
+  // Client-driven Create State Machine
+  // States: <ALLOCATED>------------->CREATING----------------->[OPEN]
+  // Events:            (BEGIN_CREATE)    |    (COMPLETE_CREATE)
+  //                                      |
+  //                                      |(TIMEOUT)
+  //                                      V
+  //                                  DELETING----------------->[DELETED]
+  //                                           (CLEANUP)
+  // SCM Open/Close State Machine
+  // States: OPEN------------------>PENDING_CLOSE---------->[CLOSE]
+  // Events:        (FULL_CONTAINER)               (CLOSE)
+  // Delete State Machine
+  // States: OPEN------------------>DELETING------------------>[DELETED]
+  // Events:         (DELETE)                  (CLEANUP)
+  private void initializeStateMachine() {
+    stateMachine.addTransition(LifeCycleState.ALLOCATED,
+        LifeCycleState.CREATING,
+        LifeCycleEvent.BEGIN_CREATE);
+
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.OPEN,
+        LifeCycleEvent.COMPLETE_CREATE);
+
+    stateMachine.addTransition(LifeCycleState.OPEN,
+        LifeCycleState.CLOSED,
+        LifeCycleEvent.CLOSE);
+
+    stateMachine.addTransition(LifeCycleState.OPEN,
+        LifeCycleState.DELETING,
+        LifeCycleEvent.DELETE);
+
+    stateMachine.addTransition(LifeCycleState.DELETING,
+        LifeCycleState.DELETED,
+        LifeCycleEvent.CLEANUP);
+
+    // Creating timeout -> Deleting
+    stateMachine.addTransition(LifeCycleState.CREATING,
+        LifeCycleState.DELETING,
+        LifeCycleEvent.TIMEOUT);
+  }
+
+  /**
+   * allocates a new container based on the type, replication etc.
+   *
+   * @param selector -- Pipeline selector class.
+   * @param type -- Replication type.
+   * @param replicationFactor - Replication replicationFactor.
+   * @param containerName - Container Name.
+   * @return Container Info.
+   * @throws IOException
+   */
+  public ContainerInfo allocateContainer(PipelineSelector selector, OzoneProtos
+      .ReplicationType type, OzoneProtos.ReplicationFactor replicationFactor,
+      final String containerName, OzoneProtos.Owner owner) throws
+      IOException {
+
+    Pipeline pipeline = selector.getReplicationPipeline(type,
+        replicationFactor, containerName);
+    ContainerInfo info = new ContainerInfo.Builder()
+        .setContainerName(containerName)
+        .setState(OzoneProtos.LifeCycleState.ALLOCATED)
+        .setPipeline(pipeline)
+        .setStateEnterTime(Time.monotonicNow())
+        .setOwner(owner)
+        .build();
+    Preconditions.checkNotNull(info);
+    BlockContainerInfo blockInfo = new BlockContainerInfo(info, 0);
+    blockInfo.setLastUsed(Time.monotonicNow());
+    writeLock.lock();
+    try {
+      ContainerKey key = new ContainerKey(owner, type, replicationFactor,
+          info.getState());
+      PriorityQueue<BlockContainerInfo> queue = containers.get(key);
+      Preconditions.checkNotNull(queue);
+      queue.add(blockInfo);
+      LOG.trace("New container allocated: {}", blockInfo);
+    } finally {
+      writeLock.unlock();
+    }
+    return info;
+  }
+
+  /**
+   * Update the Container State to the next state.
+   *
+   * @param info - ContainerInfo
+   * @param event - LifeCycle Event
+   * @return New state of the container.
+   * @throws SCMException
+   */
+  public OzoneProtos.LifeCycleState updateContainerState(BlockContainerInfo
+      info, OzoneProtos.LifeCycleEvent event) throws SCMException {
+    LifeCycleState newState = null;
+    boolean shouldLease = false;
+    try {
+      newState = this.stateMachine.getNextState(info.getState(), event);
+      if(newState == LifeCycleState.CREATING) {
+        // if we are moving into a Creating State, it is possible that clients
+        // could timeout therefore we need to use a lease.
+        shouldLease = true;
+      }
+    } catch (InvalidStateTransitionException ex) {
+      String error = String.format("Failed to update container state %s, " +
+              "reason: invalid state transition from state: %s upon event: %s.",
+          info.getPipeline().getContainerName(), info.getState(), event);
+      LOG.error(error);
+      throw new SCMException(error, FAILED_TO_CHANGE_CONTAINER_STATE);
+    }
+
+    // This is a post condition after executing getNextState.
+    Preconditions.checkNotNull(newState);
+    Pipeline pipeline = info.getPipeline();
+
+    ContainerKey oldKey = new ContainerKey(info.getOwner(), pipeline.getType(),
+        pipeline.getFactor(), info.getState());
+
+    ContainerKey newKey = new ContainerKey(info.getOwner(), pipeline.getType(),
+        pipeline.getFactor(), newState);
+    writeLock.lock();
+    try {
+
+      PriorityQueue<BlockContainerInfo> currentQueue = containers.get(oldKey);
+      // This should never happen, since we have initialized the map and
+      // queues to all possible states. No harm in asserting that info.
+      Preconditions.checkNotNull(currentQueue);
+
+      // TODO : Should we read this container info from the database if this
+      // is missing in the queue?. Right now we just add it into the queue.
+      // We also need a background thread that will remove unused containers
+      // from memory after 24 hours.  This is really a low priority work item
+      // since typical clusters will have less than 10's of millions of open
+      // containers at a given time, which we can easily keep in memory.
+
+      if (currentQueue.contains(info)) {
+        currentQueue.remove(info);
+      }
+
+      info.setState(newState);
+      PriorityQueue<BlockContainerInfo> nextQueue = containers.get(newKey);
+      Preconditions.checkNotNull(nextQueue);
+
+      info.setLastUsed(Time.monotonicNow());
+      nextQueue.add(info);
+
+      return newState;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Return a container matching the attributes specified.
+   *
+   * @param size - Space needed in the Container.
+   * @param owner - Owner of the container {OZONE, CBLOCK}
+   * @param type - Replication Type {StandAlone, Ratis}
+   * @param factor - Replication Factor {ONE, THREE}
+   * @param state - State of the Container-- {Open, Allocated etc.}
+   * @return BlockContainerInfo
+   */
+  public BlockContainerInfo getMatchingContainer(final long size,
+      Owner owner, ReplicationType type, ReplicationFactor factor,
+      LifeCycleState state) {
+    ContainerKey key = new ContainerKey(owner, type, factor, state);
+    writeLock.lock();
+    try {
+      PriorityQueue<BlockContainerInfo> queue = containers.get(key);
+      if (queue.size() == 0) {
+        // We don't have any Containers of this type.
+        return null;
+      }
+      Iterator<BlockContainerInfo> iter = queue.iterator();
+      // Two assumptions here.
+      // 1. The Iteration on the heap is in ordered by the last used time.
+      // 2. We remove and add the node back to push the node to the end of
+      // the queue.
+
+      while (iter.hasNext()) {
+        BlockContainerInfo info = iter.next();
+        if (info.getAllocated() < this.containerSize + size) {
+
+          queue.remove(info);
+          info.addAllocated(size);
+          info.setLastUsed(Time.monotonicNow());
+          queue.add(info);
+
+          return info;
+        } else {
+          // We should close this container.
+          LOG.info("Moving {} to containerCloseQueue.", info.toString());
+          containerCloseQueue.add(info);
+          //TODO: Next JIRA will handle these containers to close.
+        }
+      }
+
+    } finally {
+      writeLock.unlock();
+    }
+    return null;
+  }
+
+  /**
+   * Class that acts as the container Key.
+   */
+  private static class ContainerKey {
+    private final LifeCycleState state;
+    private final ReplicationType type;
+    private final OzoneProtos.Owner owner;
+    private final ReplicationFactor replicationFactor;
+
+    /**
+     * Constructs a Container Key.
+     *
+     * @param owner - Container Owners
+     * @param type - Replication Type.
+     * @param factor - Replication Factors
+     * @param state - LifeCycle State
+     */
+    ContainerKey(Owner owner, ReplicationType type,
+        ReplicationFactor factor, LifeCycleState state) {
+      this.state = state;
+      this.type = type;
+      this.owner = owner;
+      this.replicationFactor = factor;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      ContainerKey that = (ContainerKey) o;
+
+      return new EqualsBuilder()
+          .append(state, that.state)
+          .append(type, that.type)
+          .append(owner, that.owner)
+          .append(replicationFactor, that.replicationFactor)
+          .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+      return new HashCodeBuilder(137, 757)
+          .append(state)
+          .append(type)
+          .append(owner)
+          .append(replicationFactor)
+          .toHashCode();
+    }
+
+    @Override
+    public String toString() {
+      return "ContainerKey{" +
+          "state=" + state +
+          ", type=" + type +
+          ", owner=" + owner +
+          ", replicationFactor=" + replicationFactor +
+          '}';
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
index b795fa7..6c7bdde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/container/Mapping.java
@@ -61,14 +61,15 @@ public interface Mapping extends Closeable {
   /**
    * Allocates a new container for a given keyName and replication factor.
    *
-   * @param containerName - Name.
    * @param replicationFactor - replication factor of the container.
+   * @param containerName - Name.
+   * @param owner
    * @return - Container Info.
    * @throws IOException
    */
   ContainerInfo allocateContainer(OzoneProtos.ReplicationType type,
       OzoneProtos.ReplicationFactor replicationFactor,
-      String containerName) throws IOException;
+      String containerName, OzoneProtos.Owner owner) throws IOException;
 
   /**
    * Deletes a container from SCM.
@@ -95,4 +96,10 @@ public interface Mapping extends Closeable {
    */
   OzoneProtos.LifeCycleState updateContainerState(String containerName,
       OzoneProtos.LifeCycleEvent event) throws IOException;
+
+  /**
+   * Returns the container State Manager.
+   * @return ContainerStateManager
+   */
+  ContainerStateManager getStateManager();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
index 2807da8..a36e67d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/PipelineSelector.java
@@ -51,9 +51,9 @@ public class PipelineSelector {
   private final StandaloneManagerImpl standaloneManager;
   private final long containerSize;
 
-
   /**
    * Constructs a pipeline Selector.
+   *
    * @param nodeManager - node manager
    * @param conf - Ozone Config
    */
@@ -72,6 +72,30 @@ public class PipelineSelector {
   }
 
   /**
+   * Translates a list of nodes, ordered such that the first is the leader, into
+   * a corresponding {@link Pipeline} object.
+   *
+   * @param nodes - list of datanodes on which we will allocate the container.
+   * The first of the list will be the leader node.
+   * @return pipeline corresponding to nodes
+   */
+  public static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes) {
+    Preconditions.checkNotNull(nodes);
+    Preconditions.checkArgument(nodes.size() > 0);
+    String leaderId = nodes.get(0).getDatanodeUuid();
+    Pipeline pipeline = new Pipeline(leaderId);
+    for (DatanodeID node : nodes) {
+      pipeline.addMember(node);
+    }
+
+    // A Standalone pipeline is always open, no action from the client
+    // is needed to make it open.
+    pipeline.setType(ReplicationType.STAND_ALONE);
+    pipeline.setLifeCycleState(OzoneProtos.LifeCycleState.OPEN);
+    return pipeline;
+  }
+
+  /**
    * Create pluggable container placement policy implementation instance.
    *
    * @param nodeManager - SCM node manager.
@@ -106,13 +130,15 @@ public class PipelineSelector {
 
   /**
    * Return the pipeline manager from the replication type.
+   *
    * @param replicationType - Replication Type Enum.
    * @return pipeline Manager.
-   * @throws IllegalArgumentException
+   * @throws IllegalArgumentException If an pipeline type gets added
+   * and this function is not modified we will throw.
    */
   private PipelineManager getPipelineManager(ReplicationType replicationType)
       throws IllegalArgumentException {
-    switch(replicationType){
+    switch (replicationType) {
     case RATIS:
       return this.ratisManager;
     case STAND_ALONE:
@@ -131,7 +157,6 @@ public class PipelineSelector {
    * container. The client specifies what kind of replication pipeline is needed
    * and based on the replication type in the request appropriate Interface is
    * invoked.
-   *
    */
 
   public Pipeline getReplicationPipeline(ReplicationType replicationType,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
index 1d71d3b..a1dafa2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/ratis/RatisManagerImpl.java
@@ -16,22 +16,33 @@
  */
 package org.apache.hadoop.ozone.scm.pipelines.ratis;
 
-
+import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.ozone.scm.container.placement.algorithms
-    .ContainerPlacementPolicy;
+import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
+import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.ALLOCATED;
+import static org.apache.hadoop.ozone.protocol.proto.OzoneProtos.LifeCycleState.OPEN;
+
 
 /**
  * Implementation of {@link PipelineManager}.
+ *
+ * TODO : Introduce a state machine.
  */
 public class RatisManagerImpl implements PipelineManager {
   private static final Logger LOG =
@@ -39,9 +50,14 @@ public class RatisManagerImpl implements PipelineManager {
   private final NodeManager nodeManager;
   private final ContainerPlacementPolicy placementPolicy;
   private final long containerSize;
+  private final Set<DatanodeID> ratisMembers;
+  private final List<Pipeline> activePipelines;
+  private final AtomicInteger pipelineIndex;
+  private static final String PREFIX = "Ratis-";
 
   /**
    * Constructs a Ratis Pipeline Manager.
+   *
    * @param nodeManager
    */
   public RatisManagerImpl(NodeManager nodeManager,
@@ -49,6 +65,9 @@ public class RatisManagerImpl implements PipelineManager {
     this.nodeManager = nodeManager;
     this.placementPolicy = placementPolicy;
     this.containerSize = size;
+    ratisMembers = new HashSet<>();
+    activePipelines = new LinkedList<>();
+    pipelineIndex = new AtomicInteger(0);
   }
 
   /**
@@ -60,13 +79,141 @@ public class RatisManagerImpl implements PipelineManager {
    * @param containerName Name of the container
    * @param replicationFactor - Replication Factor
    * @return a Pipeline.
+   * <p>
+   * TODO: Evaulate if we really need this lock. Right now favoring safety over
+   * speed.
    */
   @Override
-  public Pipeline getPipeline(String containerName,
+  public synchronized Pipeline getPipeline(String containerName,
       OzoneProtos.ReplicationFactor replicationFactor) {
+    /**
+     * In the ratis world, we have a very simple policy.
+     *
+     * 1. Try to create a pipeline if there are enough free nodes.
+     *
+     * 2. This allows all nodes to part of a pipeline quickly.
+     *
+     * 3. if there are not enough free nodes, return pipelines in a
+     * round-robin fashion.
+     *
+     * TODO: Might have to come up with a better algorithm than this.
+     * Create a new placement policy that returns pipelines in round robin
+     * fashion.
+     */
+    Pipeline pipeline = null;
+    List<DatanodeID> newNodes = allocatePipelineNodes(replicationFactor);
+    if (newNodes != null) {
+      Preconditions.checkState(newNodes.size() ==
+          getReplicationCount(replicationFactor), "Replication factor " +
+          "does not match the expected node count.");
+      pipeline = allocateRatisPipeline(newNodes, containerName);
+    } else {
+      pipeline = findOpenPipeline();
+    }
+    if (pipeline == null) {
+      LOG.error("Get pipeline call failed. We are not able to find free nodes" +
+          " or operational pipeline.");
+    }
+    return pipeline;
+  }
+
+  /**
+   * Find a pipeline that is operational.
+   *
+   * @return - Pipeline or null
+   */
+  Pipeline findOpenPipeline() {
+    Pipeline pipeline = null;
+    final int sentinal = -1;
+    if (activePipelines.size() == 0) {
+      LOG.error("No Operational pipelines found. Returning null.");
+      return pipeline;
+    }
+    int startIndex = getNextIndex();
+    int nextIndex = sentinal;
+    for (; startIndex != nextIndex; nextIndex = getNextIndex()) {
+      // Just walk the list in a circular way.
+      Pipeline temp =
+          activePipelines.get(nextIndex != sentinal ? nextIndex : startIndex);
+      // if we find an operational pipeline just return that.
+      if (temp.getLifeCycleState() == OPEN) {
+        pipeline = temp;
+        break;
+      }
+    }
+    return pipeline;
+  }
+
+  /**
+   * Allocate a new Ratis pipeline from the existing nodes.
+   *
+   * @param nodes - list of Nodes.
+   * @param containerName - container Name
+   * @return - Pipeline.
+   */
+  Pipeline allocateRatisPipeline(List<DatanodeID> nodes, String containerName) {
+    Preconditions.checkNotNull(nodes);
+    Pipeline pipeline = PipelineSelector.newPipelineFromNodes(nodes);
+    if (pipeline != null) {
+      // Start all pipeline names with "Ratis", easy to grep the logs.
+      String pipelineName = PREFIX +
+          UUID.randomUUID().toString().substring(PREFIX.length());
+      pipeline.setType(OzoneProtos.ReplicationType.RATIS);
+      pipeline.setLifeCycleState(ALLOCATED);
+      pipeline.setPipelineName(pipelineName);
+      pipeline.setContainerName(containerName);
+      LOG.info("Creating new ratis pipeline: {}", pipeline.toString());
+      activePipelines.add(pipeline);
+    }
+    return pipeline;
+  }
+
+  /**
+   * gets the next index of in the pipelines to get.
+   *
+   * @return index in the link list to get.
+   */
+  private int getNextIndex() {
+    return pipelineIndex.incrementAndGet() % activePipelines.size();
+  }
+
+  /**
+   * Allocates a set of new nodes for the Ratis pipeline.
+   *
+   * @param replicationFactor - One or Three
+   * @return List of Datanodes.
+   */
+  private List<DatanodeID> allocatePipelineNodes(OzoneProtos.ReplicationFactor
+      replicationFactor) {
+    List<DatanodeID> newNodesList = new LinkedList<>();
+    List<DatanodeID> datanodes =
+        nodeManager.getNodes(OzoneProtos.NodeState.HEALTHY);
+    int count = getReplicationCount(replicationFactor);
+    //TODO: Add Raft State to the Nodes, so we can query and skip nodes from
+    // data from datanode instead of maintaining a set.
+    for (DatanodeID datanode : datanodes) {
+      if (!ratisMembers.contains(datanode)) {
+        newNodesList.add(datanode);
+        if (newNodesList.size() == count) {
+          LOG.info("Allocating a new pipeline of size: {}", count);
+          return newNodesList;
+        }
+      }
+    }
     return null;
   }
 
+  private int getReplicationCount(OzoneProtos.ReplicationFactor factor) {
+    switch (factor) {
+    case ONE:
+      return 1;
+    case THREE:
+      return 3;
+    default:
+      throw new IllegalArgumentException("Unexpected replication count");
+    }
+  }
+
   /**
    * Creates a pipeline from a specified set of Nodes.
    *

http://git-wip-us.apache.org/repos/asf/hadoop/blob/019fb091/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
index 63c45b9..99c0164 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/scm/pipelines/standalone/StandaloneManagerImpl.java
@@ -16,22 +16,27 @@
  */
 package org.apache.hadoop.ozone.scm.pipelines.standalone;
 
-import com.google.common.base.Preconditions;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
 import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
 import org.apache.hadoop.ozone.scm.node.NodeManager;
 import org.apache.hadoop.ozone.scm.pipelines.PipelineManager;
+import org.apache.hadoop.ozone.scm.pipelines.PipelineSelector;
 import org.apache.hadoop.scm.container.common.helpers.Pipeline;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
+import java.util.UUID;
 
 /**
  * Standalone Manager Impl to prove that pluggable interface
  * works with current tests.
  */
 public class StandaloneManagerImpl implements PipelineManager {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StandaloneManagerImpl.class);
   private final NodeManager nodeManager;
   private final ContainerPlacementPolicy placementPolicy;
   private final long containerSize;
@@ -49,31 +54,6 @@ public class StandaloneManagerImpl implements PipelineManager {
     this.containerSize =  containerSize;
   }
 
-  /**
-   * Translates a list of nodes, ordered such that the first is the leader, into
-   * a corresponding {@link Pipeline} object.
-   *
-   * @param nodes - list of datanodes on which we will allocate the container.
-   * The first of the list will be the leader node.
-   * @param containerName container name
-   * @return pipeline corresponding to nodes
-   */
-  private static Pipeline newPipelineFromNodes(final List<DatanodeID> nodes,
-      final String containerName) {
-    Preconditions.checkNotNull(nodes);
-    Preconditions.checkArgument(nodes.size() > 0);
-    String leaderId = nodes.get(0).getDatanodeUuid();
-    Pipeline pipeline = new Pipeline(leaderId);
-    for (DatanodeID node : nodes) {
-      pipeline.addMember(node);
-    }
-
-    // The default state of a pipeline is operational, so not setting
-    // explicit state here.
-
-    pipeline.setContainerName(containerName);
-    return pipeline;
-  }
 
   /**
    * This function is called by the Container Manager while allocating a new
@@ -90,7 +70,12 @@ public class StandaloneManagerImpl implements PipelineManager {
       .ReplicationFactor replicationFactor) throws IOException {
     List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
         replicationFactor.getNumber(), containerSize);
-    return newPipelineFromNodes(datanodes, containerName);
+    Pipeline pipeline = PipelineSelector.newPipelineFromNodes(datanodes);
+    String pipelineName = "SA-" + UUID.randomUUID().toString().substring(3);
+    pipeline.setContainerName(containerName);
+    pipeline.setPipelineName(pipelineName);
+    LOG.info("Creating new standalone pipeline: {}", pipeline.toString());
+    return pipeline;
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message