hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From omal...@apache.org
Subject [29/68] [abbrv] [partial] hadoop git commit: HDFS-13405. Ozone: Rename HDSL to HDDS. Contributed by Ajay Kumar, Elek Marton, Mukul Kumar Singh, Shashikant Banerjee and Anu Engineer.
Date Thu, 26 Apr 2018 21:00:58 GMT
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
new file mode 100644
index 0000000..1a78dee
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManager.java
@@ -0,0 +1,1290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.protobuf.BlockingService;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.scm.block.BlockManager;
+import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
+import org.apache.hadoop.hdds.scm.container.ContainerMapping;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.DeleteBlockResult;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
+import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMMetrics;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto
+    .DeleteBlockTransactionResult;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos
+    .ContainerBlocksDeletionACKResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ContainerReportsResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.ReportState;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCmdType;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeAddressList;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMReregisterCmdResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
+import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+import org.apache.hadoop.ozone.common.Storage.StorageState;
+import org.apache.hadoop.ozone.common.StorageInfo;
+import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.ozone.protocolPB
+    .ScmBlockLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolPB;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerDatanodeProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB
+    .StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.GenericOptionsParser;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_CLIENT_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DATANODE_ADDRESS_KEY;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_DEFAULT;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys
+    .OZONE_SCM_HANDLER_COUNT_KEY;
+import static org.apache.hadoop.hdds.protocol.proto
+    .ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
+import static org.apache.hadoop.hdds.server.ServerUtils.updateRPCListenAddress;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.util.ExitUtil.terminate;
+
+/**
+ * StorageContainerManager is the main entry point for the service that provides
+ * information about which SCM nodes host containers.
+ *
+ * DataNodes report to StorageContainerManager using heartbeat
+ * messages. SCM allocates containers and returns a pipeline.
+ *
+ * A client once it gets a pipeline (a list of datanodes) will connect to the
+ * datanodes and create a container, which then can be used to store data.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS", "CBLOCK", "OZONE", "HBASE"})
+public class StorageContainerManager extends ServiceRuntimeInfoImpl
+    implements StorageContainerDatanodeProtocol,
+    StorageContainerLocationProtocol, ScmBlockLocationProtocol, SCMMXBean {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(StorageContainerManager.class);
+
+  /**
+   *  Startup options.
+   */
+  public enum StartupOption {
+    INIT("-init"),
+    CLUSTERID("-clusterid"),
+    GENCLUSTERID("-genclusterid"),
+    REGULAR("-regular"),
+    HELP("-help");
+
+    private final String name;
+    private String clusterId = null;
+
+    public void setClusterId(String cid) {
+      if(cid != null && !cid.isEmpty()) {
+        clusterId = cid;
+      }
+    }
+
+    public String getClusterId() {
+      return clusterId;
+    }
+
+    StartupOption(String arg) {
+      this.name = arg;
+    }
+
+    public String getName() {
+      return name;
+    }
+  }
+
+  /**
+   * NodeManager and container Managers for SCM.
+   */
+  private final NodeManager scmNodeManager;
+  private final Mapping scmContainerManager;
+  private final BlockManager scmBlockManager;
+  private final SCMStorage scmStorage;
+
+  /** The RPC server that listens to requests from DataNodes. */
+  private final RPC.Server datanodeRpcServer;
+  private final InetSocketAddress datanodeRpcAddress;
+
+  /** The RPC server that listens to requests from clients. */
+  private final RPC.Server clientRpcServer;
+  private final InetSocketAddress clientRpcAddress;
+
+  /** The RPC server that listens to requests from block service clients. */
+  private final RPC.Server blockRpcServer;
+  private final InetSocketAddress blockRpcAddress;
+
+  private final StorageContainerManagerHttpServer httpServer;
+
+  /** SCM mxbean. */
+  private ObjectName scmInfoBeanName;
+
+  /** SCM super user. */
+  private final String scmUsername;
+  private final Collection<String> scmAdminUsernames;
+
+  /** SCM metrics. */
+  private static SCMMetrics metrics;
+  /** Key = DatanodeUuid, value = ContainerStat. */
+  private Cache<String, ContainerStat> containerReportCache;
+
+
+  private static final String USAGE =
+      "Usage: \n oz scm [genericOptions] "
+          + "[ " + StartupOption.INIT.getName() + " [ "
+          + StartupOption.CLUSTERID.getName() + " <cid> ] ]\n "
+          + "oz scm [genericOptions] [ "
+          + StartupOption.GENCLUSTERID.getName() + " ]\n " +
+          "oz scm [ "
+          + StartupOption.HELP.getName() + " ]\n";
+  /**
+   * Creates a new StorageContainerManager.  Configuration will be updated with
+   * information on the actual listening addresses used for RPC servers.
+   *
+   * @param conf configuration
+   */
+  private StorageContainerManager(OzoneConfiguration conf)
+      throws IOException {
+
+    final int handlerCount = conf.getInt(
+        OZONE_SCM_HANDLER_COUNT_KEY, OZONE_SCM_HANDLER_COUNT_DEFAULT);
+    final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
+        OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
+    StorageContainerManager.initMetrics();
+    initContainerReportCache(conf);
+
+    scmStorage = new SCMStorage(conf);
+    if (scmStorage.getState() != StorageState.INITIALIZED) {
+      throw new SCMException("SCM not initialized.",
+          ResultCodes.SCM_NOT_INITIALIZED);
+    }
+    scmNodeManager = new SCMNodeManager(conf, scmStorage.getClusterID(), this);
+    scmContainerManager = new ContainerMapping(conf, scmNodeManager, cacheSize);
+    scmBlockManager = new BlockManagerImpl(conf, scmNodeManager,
+        scmContainerManager, cacheSize);
+
+    scmAdminUsernames = conf.getTrimmedStringCollection(
+        OzoneConfigKeys.OZONE_ADMINISTRATORS);
+    scmUsername = UserGroupInformation.getCurrentUser().getUserName();
+    if (!scmAdminUsernames.contains(scmUsername)) {
+      scmAdminUsernames.add(scmUsername);
+    }
+
+    RPC.setProtocolEngine(conf, StorageContainerDatanodeProtocolPB.class,
+        ProtobufRpcEngine.class);
+    RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+    RPC.setProtocolEngine(conf, ScmBlockLocationProtocolPB.class,
+        ProtobufRpcEngine.class);
+
+    BlockingService dnProtoPbService = StorageContainerDatanodeProtocolProtos.
+        StorageContainerDatanodeProtocolService.newReflectiveBlockingService(
+        new StorageContainerDatanodeProtocolServerSideTranslatorPB(this));
+
+    final InetSocketAddress datanodeRpcAddr =
+        HddsServerUtil.getScmDataNodeBindAddress(conf);
+    datanodeRpcServer = startRpcServer(conf, datanodeRpcAddr,
+        StorageContainerDatanodeProtocolPB.class, dnProtoPbService,
+        handlerCount);
+    datanodeRpcAddress = updateRPCListenAddress(conf,
+        OZONE_SCM_DATANODE_ADDRESS_KEY, datanodeRpcAddr, datanodeRpcServer);
+
+    // SCM Container Service RPC
+    BlockingService storageProtoPbService =
+        StorageContainerLocationProtocolProtos
+            .StorageContainerLocationProtocolService
+            .newReflectiveBlockingService(
+            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
+
+    final InetSocketAddress scmAddress =
+        HddsServerUtil.getScmClientBindAddress(conf);
+    clientRpcServer = startRpcServer(conf, scmAddress,
+        StorageContainerLocationProtocolPB.class, storageProtoPbService,
+        handlerCount);
+    clientRpcAddress = updateRPCListenAddress(conf,
+        OZONE_SCM_CLIENT_ADDRESS_KEY, scmAddress, clientRpcServer);
+
+    // SCM Block Service RPC
+    BlockingService blockProtoPbService =
+        ScmBlockLocationProtocolProtos
+            .ScmBlockLocationProtocolService
+            .newReflectiveBlockingService(
+            new ScmBlockLocationProtocolServerSideTranslatorPB(this));
+
+    final InetSocketAddress scmBlockAddress =
+        HddsServerUtil.getScmBlockClientBindAddress(conf);
+    blockRpcServer = startRpcServer(conf, scmBlockAddress,
+        ScmBlockLocationProtocolPB.class, blockProtoPbService,
+        handlerCount);
+    blockRpcAddress = updateRPCListenAddress(conf,
+        OZONE_SCM_BLOCK_CLIENT_ADDRESS_KEY, scmBlockAddress, blockRpcServer);
+
+    httpServer = new StorageContainerManagerHttpServer(conf);
+
+    registerMXBean();
+  }
+
+  /**
+   * Initialize container reports cache that sent from datanodes.
+   *
+   * @param conf
+   */
+  private void initContainerReportCache(OzoneConfiguration conf) {
+    containerReportCache = CacheBuilder.newBuilder()
+        .expireAfterAccess(Long.MAX_VALUE, TimeUnit.MILLISECONDS)
+        .maximumSize(Integer.MAX_VALUE)
+        .removalListener(new RemovalListener<String, ContainerStat>() {
+          @Override
+          public void onRemoval(
+              RemovalNotification<String, ContainerStat> removalNotification) {
+            synchronized (containerReportCache) {
+              ContainerStat stat = removalNotification.getValue();
+              // remove invalid container report
+              metrics.decrContainerStat(stat);
+              LOG.debug(
+                  "Remove expired container stat entry for datanode: {}.",
+                  removalNotification.getKey());
+            }
+          }
+        }).build();
+  }
+
+  /**
+   * Builds a message for logging startup information about an RPC server.
+   *
+   * @param description RPC server description
+   * @param addr RPC server listening address
+   * @return server startup message
+   */
+  private static String buildRpcServerStartMessage(String description,
+      InetSocketAddress addr) {
+    return addr != null ? String.format("%s is listening at %s",
+        description, addr.toString()) :
+        String.format("%s not started", description);
+  }
+
+  /**
+   * Starts an RPC server, if configured.
+   *
+   * @param conf configuration
+   * @param addr configured address of RPC server
+   * @param protocol RPC protocol provided by RPC server
+   * @param instance RPC protocol implementation instance
+   * @param handlerCount RPC server handler count
+   *
+   * @return RPC server
+   * @throws IOException if there is an I/O error while creating RPC server
+   */
+  private static RPC.Server startRpcServer(OzoneConfiguration conf,
+      InetSocketAddress addr, Class<?> protocol, BlockingService instance,
+      int handlerCount)
+      throws IOException {
+    RPC.Server rpcServer = new RPC.Builder(conf)
+        .setProtocol(protocol)
+        .setInstance(instance)
+        .setBindAddress(addr.getHostString())
+        .setPort(addr.getPort())
+        .setNumHandlers(handlerCount)
+        .setVerbose(false)
+        .setSecretManager(null)
+        .build();
+
+    DFSUtil.addPBProtocol(conf, protocol, instance, rpcServer);
+    return rpcServer;
+  }
+
+  private void registerMXBean() {
+    Map<String, String> jmxProperties = new HashMap<>();
+    jmxProperties.put("component", "ServerRuntime");
+    this.scmInfoBeanName =
+        MBeans.register("StorageContainerManager",
+            "StorageContainerManagerInfo",
+            jmxProperties,
+            this);
+  }
+
+  private void unregisterMXBean() {
+    if(this.scmInfoBeanName != null) {
+      MBeans.unregister(this.scmInfoBeanName);
+      this.scmInfoBeanName = null;
+    }
+  }
+
+  /**
+   * Main entry point for starting StorageContainerManager.
+   *
+   * @param argv arguments
+   * @throws IOException if startup fails due to I/O error
+   */
+  public static void main(String[] argv) throws IOException {
+    if (DFSUtil.parseHelpArgument(argv, USAGE,
+        System.out, true)) {
+      System.exit(0);
+    }
+    try {
+      OzoneConfiguration conf = new OzoneConfiguration();
+      GenericOptionsParser hParser = new GenericOptionsParser(conf, argv);
+      if (!hParser.isParseSuccessful()) {
+        System.err.println("USAGE: " + USAGE + "\n");
+        hParser.printGenericCommandUsage(System.err);
+        System.exit(1);
+      }
+      StringUtils.startupShutdownMessage(StorageContainerManager.class,
+          argv, LOG);
+      StorageContainerManager scm = createSCM(hParser.getRemainingArgs(), conf);
+      if (scm != null) {
+        scm.start();
+        scm.join();
+      }
+    } catch (Throwable t) {
+      LOG.error("Failed to start the StorageContainerManager.", t);
+      terminate(1, t);
+    }
+  }
+
+  private static void printUsage(PrintStream out) {
+    out.println(USAGE + "\n");
+  }
+
+  public static StorageContainerManager createSCM(String[] argv,
+      OzoneConfiguration conf) throws IOException {
+    if (!HddsUtils.isHddsEnabled(conf)) {
+      System.err.println("SCM cannot be started in secure mode or when " +
+          OZONE_ENABLED + " is set to false");
+      System.exit(1);
+    }
+    StartupOption startOpt = parseArguments(argv);
+    if (startOpt == null) {
+      printUsage(System.err);
+      terminate(1);
+      return null;
+    }
+    switch (startOpt) {
+    case INIT:
+      terminate(scmInit(conf) ? 0 : 1);
+      return null;
+    case GENCLUSTERID:
+      System.out.println("Generating new cluster id:");
+      System.out.println(StorageInfo.newClusterID());
+      terminate(0);
+      return null;
+    case HELP:
+      printUsage(System.err);
+      terminate(0);
+      return null;
+    default:
+      return new StorageContainerManager(conf);
+    }
+  }
+
+  /**
+   * Routine to set up the Version info for StorageContainerManager.
+   *
+   * @param conf OzoneConfiguration
+   * @return true if SCM initialization is successful, false otherwise.
+   * @throws IOException if init fails due to I/O error
+   */
+  public static boolean scmInit(OzoneConfiguration conf) throws IOException {
+    SCMStorage scmStorage = new SCMStorage(conf);
+    StorageState state = scmStorage.getState();
+    if (state != StorageState.INITIALIZED) {
+      try {
+        String clusterId = StartupOption.INIT.getClusterId();
+        if (clusterId != null && !clusterId.isEmpty()) {
+          scmStorage.setClusterId(clusterId);
+        }
+        scmStorage.initialize();
+        System.out.println("SCM initialization succeeded." +
+            "Current cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
+                + scmStorage.getClusterID());
+        return true;
+      } catch (IOException ioe) {
+        LOG.error("Could not initialize SCM version file", ioe);
+        return false;
+      }
+    } else {
+      System.out.println("SCM already initialized. Reusing existing" +
+          " cluster id for sd=" + scmStorage.getStorageDir() + ";cid="
+              + scmStorage.getClusterID());
+      return true;
+    }
+  }
+
+  private static StartupOption parseArguments(String[] args) {
+    int argsLen = (args == null) ? 0 : args.length;
+    StartupOption startOpt = StartupOption.HELP;
+    if (argsLen == 0) {
+      startOpt = StartupOption.REGULAR;
+    }
+    for (int i = 0; i < argsLen; i++) {
+      String cmd = args[i];
+      if (StartupOption.INIT.getName().equalsIgnoreCase(cmd)) {
+        startOpt = StartupOption.INIT;
+        if (argsLen > 3) {
+          return null;
+        }
+        for (i = i + 1; i < argsLen; i++) {
+          if (args[i].equalsIgnoreCase(StartupOption.CLUSTERID.getName())) {
+            i++;
+            if (i < argsLen && !args[i].isEmpty()) {
+              startOpt.setClusterId(args[i]);
+            } else {
+              // if no cluster id specified or is empty string, return null
+              LOG.error("Must specify a valid cluster ID after the "
+                  + StartupOption.CLUSTERID.getName() + " flag");
+              return null;
+            }
+          } else {
+            return null;
+          }
+        }
+      } else if (StartupOption.GENCLUSTERID.getName().equalsIgnoreCase(cmd)) {
+        if (argsLen > 1) {
+          return null;
+        }
+        startOpt = StartupOption.GENCLUSTERID;
+      }
+    }
+    return startOpt;
+  }
+
+  /**
+   * Returns a SCMCommandRepose from the SCM Command.
+   * @param cmd - Cmd
+   * @return SCMCommandResponseProto
+   * @throws InvalidProtocolBufferException
+   */
+  @VisibleForTesting
+  public SCMCommandResponseProto getCommandResponse(SCMCommand cmd,
+      final String datanodID)
+      throws IOException {
+    SCMCmdType type = cmd.getType();
+    SCMCommandResponseProto.Builder builder =
+        SCMCommandResponseProto.newBuilder()
+        .setDatanodeUUID(datanodID);
+    switch (type) {
+    case registeredCommand:
+      return builder.setCmdType(SCMCmdType.registeredCommand)
+          .setRegisteredProto(
+              SCMRegisteredCmdResponseProto.getDefaultInstance())
+          .build();
+    case versionCommand:
+      return builder.setCmdType(SCMCmdType.versionCommand)
+          .setVersionProto(SCMVersionResponseProto.getDefaultInstance())
+          .build();
+    case sendContainerReport:
+      return builder.setCmdType(SCMCmdType.sendContainerReport)
+          .setSendReport(SendContainerReportProto.getDefaultInstance())
+          .build();
+    case reregisterCommand:
+      return builder.setCmdType(SCMCmdType.reregisterCommand)
+          .setReregisterProto(SCMReregisterCmdResponseProto
+              .getDefaultInstance())
+          .build();
+    case deleteBlocksCommand:
+      // Once SCM sends out the deletion message, increment the count.
+      // this is done here instead of when SCM receives the ACK, because
+      // DN might not be able to response the ACK for sometime. In case
+      // it times out, SCM needs to re-send the message some more times.
+      List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
+          .stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
+      this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
+      return builder.setCmdType(SCMCmdType.deleteBlocksCommand)
+          .setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
+          .build();
+    case closeContainerCommand:
+      return builder.setCmdType(SCMCmdType.closeContainerCommand)
+          .setCloseContainerProto(((CloseContainerCommand)cmd).getProto())
+          .build();
+    default:
+      throw new IllegalArgumentException("Not implemented");
+    }
+  }
+
+  @VisibleForTesting
+  public static SCMRegisteredCmdResponseProto getRegisteredResponse(
+      SCMCommand cmd, SCMNodeAddressList addressList) {
+    Preconditions.checkState(cmd.getClass() == RegisteredCommand.class);
+    RegisteredCommand rCmd = (RegisteredCommand) cmd;
+    SCMCmdType type = cmd.getType();
+    if (type != SCMCmdType.registeredCommand) {
+      throw new IllegalArgumentException("Registered command is not well " +
+          "formed. Internal Error.");
+    }
+    return SCMRegisteredCmdResponseProto.newBuilder()
+        //TODO : Fix this later when we have multiple SCM support.
+        //.setAddressList(addressList)
+        .setErrorCode(rCmd.getError())
+        .setClusterID(rCmd.getClusterID())
+        .setDatanodeUUID(rCmd.getDatanodeUUID()).build();
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public Pipeline getContainer(String containerName) throws IOException {
+    checkAdminAccess();
+    return scmContainerManager.getContainer(containerName).getPipeline();
+  }
+
+  @VisibleForTesting
+  public ContainerInfo getContainerInfo(String containerName)
+      throws IOException {
+    return scmContainerManager.getContainer(containerName);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public List<ContainerInfo> listContainer(String startName,
+      String prefixName, int count) throws IOException {
+    return scmContainerManager.listContainer(startName, prefixName, count);
+  }
+
+  /**
+   * {@inheritDoc}
+   */
+  @Override
+  public void deleteContainer(String containerName) throws IOException {
+    checkAdminAccess();
+    scmContainerManager.deleteContainer(containerName);
+  }
+
+  /**
+   * Queries a list of Node Statuses.
+   *
+   * @param nodeStatuses
+   * @param queryScope
+   * @param poolName @return List of Datanodes.
+   */
+  @Override
+  public HddsProtos.NodePool queryNode(EnumSet<NodeState> nodeStatuses,
+      HddsProtos.QueryScope queryScope, String poolName) throws IOException {
+
+    if (queryScope == HddsProtos.QueryScope.POOL) {
+      throw new IllegalArgumentException("Not Supported yet");
+    }
+
+    List<DatanodeDetails> datanodes = queryNode(nodeStatuses);
+    HddsProtos.NodePool.Builder poolBuilder =
+        HddsProtos.NodePool.newBuilder();
+
+    for (DatanodeDetails datanode : datanodes) {
+      HddsProtos.Node node = HddsProtos.Node.newBuilder()
+          .setNodeID(datanode.getProtoBufMessage())
+          .addAllNodeStates(nodeStatuses)
+          .build();
+      poolBuilder.addNodes(node);
+    }
+
+    return poolBuilder.build();
+  }
+
+  /**
+   * Notify from client when begin/finish operation for container/pipeline
+   * objects on datanodes.
+   * @param type
+   * @param name
+   * @param op
+   * @param stage
+   */
+  @Override
+  public void notifyObjectStageChange(
+      ObjectStageChangeRequestProto.Type type, String name,
+      ObjectStageChangeRequestProto.Op op,
+      ObjectStageChangeRequestProto.Stage stage) throws IOException {
+
+    LOG.info("Object type {} name {} op {} new stage {}",
+        type, name, op, stage);
+    if (type == ObjectStageChangeRequestProto.Type.container) {
+      if (op == ObjectStageChangeRequestProto.Op.create) {
+        if (stage == ObjectStageChangeRequestProto.Stage.begin) {
+          scmContainerManager.updateContainerState(name,
+              HddsProtos.LifeCycleEvent.CREATE);
+        } else {
+          scmContainerManager.updateContainerState(name,
+              HddsProtos.LifeCycleEvent.CREATED);
+        }
+      } else if (op == ObjectStageChangeRequestProto.Op.close) {
+        if (stage == ObjectStageChangeRequestProto.Stage.begin) {
+          scmContainerManager.updateContainerState(name,
+              HddsProtos.LifeCycleEvent.FINALIZE);
+        } else {
+          scmContainerManager.updateContainerState(name,
+              HddsProtos.LifeCycleEvent.CLOSE);
+        }
+      }
+    } //else if (type == ObjectStageChangeRequestProto.Type.pipeline) {
+    // TODO: pipeline state update will be addressed in future patch.
+    //}
+  }
+
+  /**
+   * Creates a replication pipeline of a specified type.
+   */
+  @Override
+  public Pipeline createReplicationPipeline(
+      HddsProtos.ReplicationType replicationType,
+      HddsProtos.ReplicationFactor factor,
+      HddsProtos.NodePool nodePool)
+      throws IOException {
+     // TODO: will be addressed in future patch.
+    return null;
+  }
+
+  /**
+   * Queries a list of Node that match a set of statuses.
+   * <p>
+   * For example, if the nodeStatuses is HEALTHY and RAFT_MEMBER,
+   * then this call will return all healthy nodes which members in
+   * Raft pipeline.
+   * <p>
+   * Right now we don't support operations, so we assume it is an AND operation
+   * between the operators.
+   *
+   * @param nodeStatuses - A set of NodeStates.
+   * @return List of Datanodes.
+   */
+
+  public List<DatanodeDetails> queryNode(EnumSet<NodeState> nodeStatuses) {
+    Preconditions.checkNotNull(nodeStatuses, "Node Query set cannot be null");
+    Preconditions.checkState(nodeStatuses.size() > 0, "No valid arguments " +
+        "in the query set");
+    List<DatanodeDetails> resultList = new LinkedList<>();
+    Set<DatanodeDetails> currentSet = new TreeSet<>();
+
+    for (NodeState nodeState : nodeStatuses) {
+      Set<DatanodeDetails> nextSet = queryNodeState(nodeState);
+      if ((nextSet == null) || (nextSet.size() == 0)) {
+        // Right now we only support AND operation. So intersect with
+        // any empty set is null.
+        return resultList;
+      }
+      // First time we have to add all the elements, next time we have to
+      // do an intersection operation on the set.
+      if (currentSet.size() == 0) {
+        currentSet.addAll(nextSet);
+      } else {
+        currentSet.retainAll(nextSet);
+      }
+    }
+
+    resultList.addAll(currentSet);
+    return resultList;
+  }
+
+  /**
+   * Query the System for Nodes.
+   *
+   * @param nodeState - NodeState that we are interested in matching.
+   * @return Set of Datanodes that match the NodeState.
+   */
+  private Set<DatanodeDetails> queryNodeState(NodeState nodeState) {
+    if (nodeState == NodeState.RAFT_MEMBER ||
+        nodeState == NodeState.FREE_NODE) {
+      throw new IllegalStateException("Not implemented yet");
+    }
+    Set<DatanodeDetails> returnSet = new TreeSet<>();
+    List<DatanodeDetails> tmp = getScmNodeManager().getNodes(nodeState);
+    if ((tmp != null) && (tmp.size() > 0)) {
+      returnSet.addAll(tmp);
+    }
+    return returnSet;
+  }
+
+  /**
+   * Asks SCM where a container should be allocated. SCM responds with the set
+   * of datanodes that should be used creating this container.
+   *
+   * @param containerName - Name of the container.
+   * @param replicationFactor - replication factor.
+   * @return pipeline
+   * @throws IOException
+   */
+  @Override
+  public Pipeline allocateContainer(HddsProtos.ReplicationType replicationType,
+      HddsProtos.ReplicationFactor replicationFactor, String containerName,
+      String owner) throws IOException {
+
+    checkAdminAccess();
+    return scmContainerManager
+        .allocateContainer(replicationType, replicationFactor, containerName,
+            owner).getPipeline();
+  }
+
+  /**
+   * Returns listening address of StorageLocation Protocol RPC server.
+   *
+   * @return listen address of StorageLocation RPC server
+   */
+  @VisibleForTesting
+  public InetSocketAddress getClientRpcAddress() {
+    return clientRpcAddress;
+  }
+
+  @Override
+  public String getClientRpcPort() {
+    InetSocketAddress addr = getClientRpcAddress();
+    return addr == null ? "0" : Integer.toString(addr.getPort());
+  }
+
+  /**
+   * Returns listening address of StorageDatanode Protocol RPC server.
+   *
+   * @return Address where datanode are communicating.
+   */
+  public InetSocketAddress getDatanodeRpcAddress() {
+    return datanodeRpcAddress;
+  }
+
+  @Override
+  public String getDatanodeRpcPort() {
+    InetSocketAddress addr = getDatanodeRpcAddress();
+    return addr == null ? "0" : Integer.toString(addr.getPort());
+  }
+
+  /**
+   * Start service.
+   */
+  public void start() throws IOException {
+    LOG.info(buildRpcServerStartMessage(
+        "StorageContainerLocationProtocol RPC server", clientRpcAddress));
+    DefaultMetricsSystem.initialize("StorageContainerManager");
+    clientRpcServer.start();
+    LOG.info(buildRpcServerStartMessage(
+        "ScmBlockLocationProtocol RPC server", blockRpcAddress));
+    blockRpcServer.start();
+    LOG.info(buildRpcServerStartMessage("RPC server for DataNodes",
+        datanodeRpcAddress));
+    datanodeRpcServer.start();
+    httpServer.start();
+    scmBlockManager.start();
+
+    setStartTime();
+
+  }
+
+  /**
+   * Stop service.
+   */
+  public void stop() {
+    try {
+      LOG.info("Stopping block service RPC server");
+      blockRpcServer.stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager blockRpcServer stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping the StorageContainerLocationProtocol RPC server");
+      clientRpcServer.stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager clientRpcServer stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping the RPC server for DataNodes");
+      datanodeRpcServer.stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager datanodeRpcServer stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Storage Container Manager HTTP server.");
+      httpServer.stop();
+    } catch (Exception ex) {
+      LOG.error("Storage Container Manager HTTP server stop failed.", ex);
+    }
+
+    try {
+      LOG.info("Stopping Block Manager Service.");
+      scmBlockManager.stop();
+    } catch (Exception ex) {
+      LOG.error("SCM block manager service stop failed.", ex);
+    }
+
+    if (containerReportCache != null) {
+      containerReportCache.invalidateAll();
+      containerReportCache.cleanUp();
+    }
+
+    if (metrics != null) {
+      metrics.unRegister();
+    }
+
+    unregisterMXBean();
+    IOUtils.cleanupWithLogger(LOG, scmContainerManager);
+    IOUtils.cleanupWithLogger(LOG, scmNodeManager);
+  }
+
+  /**
+   * Wait until service has completed shutdown.
+   */
+  public void join() {
+    try {
+      blockRpcServer.join();
+      clientRpcServer.join();
+      datanodeRpcServer.join();
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOG.info("Interrupted during StorageContainerManager join.");
+    }
+  }
+
+  /**
+   * Returns SCM version.
+   *
+   * @return Version info.
+   */
+  @Override
+  public SCMVersionResponseProto getVersion(
+      SCMVersionRequestProto versionRequest) throws IOException {
+    return getScmNodeManager().getVersion(versionRequest).getProtobufMessage();
+  }
+
+  /**
+   * Used by data node to send a Heartbeat.
+   *
+   * @param datanodeDetails - Datanode Details.
+   * @param nodeReport - Node Report
+   * @param reportState - Container report ready info.
+   * @return - SCMHeartbeatResponseProto
+   * @throws IOException
+   */
+  @Override
+  public SCMHeartbeatResponseProto sendHeartbeat(
+      DatanodeDetailsProto datanodeDetails, SCMNodeReport nodeReport,
+      ReportState reportState) throws IOException {
+    List<SCMCommand> commands =
+        getScmNodeManager().sendHeartbeat(datanodeDetails, nodeReport,
+            reportState);
+    List<SCMCommandResponseProto> cmdResponses = new LinkedList<>();
+    for (SCMCommand cmd : commands) {
+      cmdResponses.add(getCommandResponse(cmd, datanodeDetails.getUuid()
+          .toString()));
+    }
+    return SCMHeartbeatResponseProto.newBuilder().addAllCommands(cmdResponses)
+        .build();
+  }
+
+  /**
+   * Register Datanode.
+   *
+   * @param datanodeDetails - DatanodID.
+   * @param scmAddresses - List of SCMs this datanode is configured to
+   * communicate.
+   * @return SCM Command.
+   */
+  @Override
+  public StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto
+      register(DatanodeDetailsProto datanodeDetails, String[] scmAddresses) {
+    // TODO : Return the list of Nodes that forms the SCM HA.
+    return getRegisteredResponse(
+        scmNodeManager.register(datanodeDetails), null);
+  }
+
+  /**
+   * Send a container report.
+   *
+   * @param reports -- Container report
+   * @return HeartbeatRespose.nullcommand.
+   * @throws IOException
+   */
+  @Override
+  public ContainerReportsResponseProto sendContainerReport(
+      ContainerReportsRequestProto reports) throws IOException {
+    updateContainerReportMetrics(reports);
+
+    // should we process container reports async?
+    scmContainerManager.processContainerReports(reports);
+    return ContainerReportsResponseProto.newBuilder().build();
+  }
+
+  private void updateContainerReportMetrics(
+      ContainerReportsRequestProto reports) {
+    ContainerStat newStat = null;
+    // TODO: We should update the logic once incremental container report
+    // type is supported.
+    if (reports
+        .getType() == ContainerReportsRequestProto.reportType.fullReport) {
+      newStat = new ContainerStat();
+      for (StorageContainerDatanodeProtocolProtos.ContainerInfo info : reports
+          .getReportsList()) {
+        newStat.add(new ContainerStat(info.getSize(), info.getUsed(),
+            info.getKeyCount(), info.getReadBytes(), info.getWriteBytes(),
+            info.getReadCount(), info.getWriteCount()));
+      }
+
+      // update container metrics
+      metrics.setLastContainerStat(newStat);
+    }
+
+    // Update container stat entry, this will trigger a removal operation if it
+    // exists in cache.
+    synchronized (containerReportCache) {
+      String datanodeUuid = reports.getDatanodeDetails().getUuid();
+      if (datanodeUuid != null && newStat != null) {
+        containerReportCache.put(datanodeUuid, newStat);
+        // update global view container metrics
+        metrics.incrContainerStat(newStat);
+      }
+    }
+  }
+
+  /**
+   * Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
+   * SCM considers the blocks are deleted and update the metadata in SCM DB.
+   *
+   * @param acks
+   * @return
+   * @throws IOException
+   */
+  @Override
+  public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
+      ContainerBlocksDeletionACKProto acks) throws IOException {
+    if (acks.getResultsCount() > 0) {
+      List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
+      for (DeleteBlockTransactionResult result : resultList) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+                  + "success={}", result.getTxID(), result.getSuccess());
+        }
+        if (result.getSuccess()) {
+          LOG.debug("Purging TXID={} from block deletion log",
+              result.getTxID());
+          this.getScmBlockManager().getDeletedBlockLog()
+              .commitTransactions(Collections.singletonList(result.getTxID()));
+        } else {
+          LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+              + "TX in next interval", result.getTxID());
+        }
+      }
+    }
+    return ContainerBlocksDeletionACKResponseProto.newBuilder()
+        .getDefaultInstanceForType();
+  }
+
+  /**
+   * Returns the Number of Datanodes that are communicating with SCM.
+   *
+   * @param nodestate Healthy, Dead etc.
+   * @return int -- count
+   */
+  public int getNodeCount(NodeState nodestate) {
+    return scmNodeManager.getNodeCount(nodestate);
+  }
+
+  /**
+   * Returns SCM container manager.
+   */
+  @VisibleForTesting
+  public Mapping getScmContainerManager() {
+    return scmContainerManager;
+  }
+
+  /**
+   * Returns node manager.
+   * @return - Node Manager
+   */
+  @VisibleForTesting
+  public NodeManager getScmNodeManager() {
+    return scmNodeManager;
+  }
+
+  @VisibleForTesting
+  public BlockManager getScmBlockManager() {
+    return scmBlockManager;
+  }
+
+  /**
+   * Get block locations.
+   * @param keys batch of block keys to retrieve.
+   * @return set of allocated blocks.
+   * @throws IOException
+   */
+  @Override
+  public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
+      throws IOException {
+    Set<AllocatedBlock> locatedBlocks = new HashSet<>();
+    for (String key: keys) {
+      Pipeline pipeline = scmBlockManager.getBlock(key);
+      AllocatedBlock block = new AllocatedBlock.Builder()
+          .setKey(key)
+          .setPipeline(pipeline).build();
+      locatedBlocks.add(block);
+    }
+    return locatedBlocks;
+  }
+
+  /**
+   * Asks SCM where a block should be allocated. SCM responds with the set of
+   * datanodes that should be used creating this block.
+   *
+   * @param size - size of the block.
+   * @param type - Replication type.
+   * @param factor
+   * @return allocated block accessing info (key, pipeline).
+   * @throws IOException
+   */
+  @Override
+  public AllocatedBlock allocateBlock(long size,
+      HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
+      String owner) throws IOException {
+    return scmBlockManager.allocateBlock(size, type, factor, owner);
+  }
+
+  /**
+   * Get the clusterId and SCM Id from the version file in SCM.
+   */
+  @Override
+  public ScmInfo getScmInfo() throws IOException {
+    ScmInfo.Builder builder = new ScmInfo.Builder()
+        .setClusterId(scmStorage.getClusterID())
+        .setScmId(scmStorage.getScmId());
+    return builder.build();
+  }
+  /**
+   * Delete blocks for a set of object keys.
+   *
+   * @param keyBlocksInfoList list of block keys with object keys to delete.
+   * @return deletion results.
+   */
+  public List<DeleteBlockGroupResult> deleteKeyBlocks(
+      List<BlockGroup> keyBlocksInfoList) throws IOException {
+    LOG.info("SCM is informed by KSM to delete {} blocks",
+        keyBlocksInfoList.size());
+    List<DeleteBlockGroupResult> results = new ArrayList<>();
+    for (BlockGroup keyBlocks : keyBlocksInfoList) {
+      Result resultCode;
+      try {
+        // We delete blocks in an atomic operation to prevent getting
+        // into state like only a partial of blocks are deleted,
+        // which will leave key in an inconsistent state.
+        scmBlockManager.deleteBlocks(keyBlocks.getBlockIDList());
+        resultCode = Result.success;
+      } catch (SCMException scmEx) {
+        LOG.warn("Fail to delete block: {}", keyBlocks.getGroupID(), scmEx);
+        switch (scmEx.getResult()) {
+        case CHILL_MODE_EXCEPTION:
+          resultCode = Result.chillMode;
+          break;
+        case FAILED_TO_FIND_BLOCK:
+          resultCode = Result.errorNotFound;
+          break;
+        default:
+          resultCode = Result.unknownFailure;
+        }
+      } catch (IOException ex) {
+        LOG.warn("Fail to delete blocks for object key: {}",
+            keyBlocks.getGroupID(), ex);
+        resultCode = Result.unknownFailure;
+      }
+      List<DeleteBlockResult> blockResultList = new ArrayList<>();
+      for (String blockKey : keyBlocks.getBlockIDList()) {
+        blockResultList.add(new DeleteBlockResult(blockKey, resultCode));
+      }
+      results.add(new DeleteBlockGroupResult(keyBlocks.getGroupID(),
+          blockResultList));
+    }
+    return results;
+  }
+
+  @VisibleForTesting
+  public String getPpcRemoteUsername() {
+    UserGroupInformation user = ProtobufRpcEngine.Server.getRemoteUser();
+    return user == null ? null : user.getUserName();
+  }
+
+  private void checkAdminAccess() throws IOException {
+    String remoteUser = getPpcRemoteUsername();
+    if(remoteUser != null) {
+      if (!scmAdminUsernames.contains(remoteUser)) {
+        throw new IOException(
+            "Access denied for user " + remoteUser
+                + ". Superuser privilege is required.");
+      }
+    }
+  }
+
+  /**
+   * Initialize SCM metrics.
+   */
+  public static void initMetrics() {
+    metrics = SCMMetrics.create();
+  }
+
+  /**
+   * Return SCM metrics instance.
+   */
+  public static SCMMetrics getMetrics() {
+    return metrics == null ? SCMMetrics.create() : metrics;
+  }
+
+  /**
+   * Invalidate container stat entry for given datanode.
+   *
+   * @param datanodeUuid
+   */
+  public void removeContainerReport(String datanodeUuid) {
+    synchronized (containerReportCache) {
+      containerReportCache.invalidate(datanodeUuid);
+    }
+  }
+
+  /**
+   * Get container stat of specified datanode.
+   *
+   * @param datanodeUuid
+   * @return
+   */
+  public ContainerStat getContainerReport(String datanodeUuid) {
+    ContainerStat stat = null;
+    synchronized (containerReportCache) {
+      stat = containerReportCache.getIfPresent(datanodeUuid);
+    }
+
+    return stat;
+  }
+
+  /**
+   * Returns a view of the container stat entries. Modifications made to the
+   * map will directly affect the cache.
+   *
+   * @return
+   */
+  public ConcurrentMap<String, ContainerStat> getContainerReportCache() {
+    return containerReportCache.asMap();
+  }
+
+  @Override
+  public Map<String, String> getContainerReport() {
+    Map<String, String> id2StatMap = new HashMap<>();
+    synchronized (containerReportCache) {
+      ConcurrentMap<String, ContainerStat> map = containerReportCache.asMap();
+      for (Map.Entry<String, ContainerStat> entry : map.entrySet()) {
+        id2StatMap.put(entry.getKey(), entry.getValue().toJsonString());
+      }
+    }
+
+    return id2StatMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
new file mode 100644
index 0000000..1ca059c
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/StorageContainerManagerHttpServer.java
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.server.BaseHttpServer;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+
+import java.io.IOException;
+
+/**
+ * HttpServer2 wrapper for the Ozone Storage Container Manager.
+ */
+public class StorageContainerManagerHttpServer extends BaseHttpServer {
+
+  public StorageContainerManagerHttpServer(Configuration conf)
+      throws IOException {
+    super(conf, "scm");
+  }
+
+  @Override protected String getHttpAddressKey() {
+    return ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY;
+  }
+
+  @Override protected String getHttpBindHostKey() {
+    return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_KEY;
+  }
+
+  @Override protected String getHttpsAddressKey() {
+    return ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY;
+  }
+
+  @Override protected String getHttpsBindHostKey() {
+    return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_HOST_KEY;
+  }
+
+  @Override protected String getBindHostDefault() {
+    return ScmConfigKeys.OZONE_SCM_HTTP_BIND_HOST_DEFAULT;
+  }
+
+  @Override protected int getHttpBindPortDefault() {
+    return ScmConfigKeys.OZONE_SCM_HTTP_BIND_PORT_DEFAULT;
+  }
+
+  @Override protected int getHttpsBindPortDefault() {
+    return ScmConfigKeys.OZONE_SCM_HTTPS_BIND_PORT_DEFAULT;
+  }
+
+  @Override protected String getKeytabFile() {
+    return ScmConfigKeys.OZONE_SCM_KEYTAB_FILE;
+  }
+
+  @Override protected String getSpnegoPrincipal() {
+    return OzoneConfigKeys.OZONE_SCM_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL;
+  }
+
+  @Override protected String getEnabledKey() {
+    return ScmConfigKeys.OZONE_SCM_HTTP_ENABLED_KEY;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
new file mode 100644
index 0000000..4ab2516
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManager.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ *
+ *  Block APIs.
+ *  Container is transparent to these APIs.
+ */
+public interface BlockManager extends Closeable {
+  /**
+   * Allocates a new block for a given size.
+   * @param size - Block Size
+   * @param type Replication Type
+   * @param factor - Replication Factor
+   * @return AllocatedBlock
+   * @throws IOException
+   */
+  AllocatedBlock allocateBlock(long size, HddsProtos.ReplicationType type,
+      HddsProtos.ReplicationFactor factor, String owner) throws IOException;
+
+  /**
+   *  Give the key to the block, get the pipeline info.
+   * @param key - key to the block.
+   * @return - Pipeline that used to access the block.
+   * @throws IOException
+   */
+  Pipeline getBlock(String key) throws IOException;
+
+  /**
+   * Deletes a list of blocks in an atomic operation. Internally, SCM
+   * writes these blocks into a {@link DeletedBlockLog} and deletes them
+   * from SCM DB. If this is successful, given blocks are entering pending
+   * deletion state and becomes invisible from SCM namespace.
+   *
+   * @param blockIDs block IDs. This is often the list of blocks of
+   *                 a particular object key.
+   * @throws IOException if exception happens, non of the blocks is deleted.
+   */
+  void deleteBlocks(List<String> blockIDs) throws IOException;
+
+  /**
+   * @return the block deletion transaction log maintained by SCM.
+   */
+  DeletedBlockLog getDeletedBlockLog();
+
+  /**
+   * Start block manager background services.
+   * @throws IOException
+   */
+  void start() throws IOException;
+
+  /**
+   * Shutdown block manager background services.
+   * @throws IOException
+   */
+  void stop() throws IOException;
+
+  /**
+   * @return the block deleting service executed in SCM.
+   */
+  SCMBlockDeletingService getSCMBlockDeletingService();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
new file mode 100644
index 0000000..d966112
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -0,0 +1,530 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.node.NodeManager;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .CHILL_MODE_EXCEPTION;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .FAILED_TO_FIND_BLOCK;
+import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes
+    .INVALID_BLOCK_SIZE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
+
+/** Block Manager manages the block access for SCM. */
+public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(BlockManagerImpl.class);
+  // TODO : FIX ME : Hard coding the owner.
+  // Currently only user of the block service is Ozone, CBlock manages blocks
+  // by itself and does not rely on the Block service offered by SCM.
+
+  private final NodeManager nodeManager;
+  private final Mapping containerManager;
+  private final MetadataStore blockStore;
+
+  private final Lock lock;
+  private final long containerSize;
+  private final long cacheSize;
+
+  private final DeletedBlockLog deletedBlockLog;
+  private final SCMBlockDeletingService blockDeletingService;
+
+  private final int containerProvisionBatchSize;
+  private final Random rand;
+  private ObjectName mxBean;
+
+  /**
+   * Constructor.
+   *
+   * @param conf - configuration.
+   * @param nodeManager - node manager.
+   * @param containerManager - container manager.
+   * @param cacheSizeMB - cache size for level db store.
+   * @throws IOException
+   */
+  public BlockManagerImpl(final Configuration conf,
+      final NodeManager nodeManager, final Mapping containerManager,
+      final int cacheSizeMB) throws IOException {
+    this.nodeManager = nodeManager;
+    this.containerManager = containerManager;
+    this.cacheSize = cacheSizeMB;
+
+    this.containerSize = OzoneConsts.GB * conf.getInt(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_GB,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT);
+    File metaDir = getOzoneMetaDirPath(conf);
+    String scmMetaDataDir = metaDir.getPath();
+
+    // Write the block key to container name mapping.
+    File blockContainerDbPath = new File(scmMetaDataDir, BLOCK_DB);
+    blockStore =
+        MetadataStoreBuilder.newBuilder()
+            .setConf(conf)
+            .setDbFile(blockContainerDbPath)
+            .setCacheSize(this.cacheSize * OzoneConsts.MB)
+            .build();
+
+    this.containerProvisionBatchSize =
+        conf.getInt(
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE,
+            ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE_DEFAULT);
+    rand = new Random();
+    this.lock = new ReentrantLock();
+
+    mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
+
+    // SCM block deleting transaction log and deleting service.
+    deletedBlockLog = new DeletedBlockLogImpl(conf);
+    long svcInterval =
+        conf.getTimeDuration(OZONE_BLOCK_DELETING_SERVICE_INTERVAL,
+            OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    long serviceTimeout =
+        conf.getTimeDuration(
+            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
+            OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
+    blockDeletingService =
+        new SCMBlockDeletingService(
+            deletedBlockLog, containerManager, nodeManager, svcInterval,
+            serviceTimeout, conf);
+  }
+
+  /**
+   * Start block manager services.
+   *
+   * @throws IOException
+   */
+  public void start() throws IOException {
+    this.blockDeletingService.start();
+  }
+
+  /**
+   * Shutdown block manager services.
+   *
+   * @throws IOException
+   */
+  public void stop() throws IOException {
+    this.blockDeletingService.shutdown();
+    this.close();
+  }
+
+  /**
+   * Pre allocate specified count of containers for block creation.
+   *
+   * @param count - Number of containers to allocate.
+   * @param type - Type of containers
+   * @param factor - how many copies needed for this container.
+   * @throws IOException
+   */
+  private void preAllocateContainers(int count, ReplicationType type,
+      ReplicationFactor factor, String owner)
+      throws IOException {
+    lock.lock();
+    try {
+      for (int i = 0; i < count; i++) {
+        String containerName = UUID.randomUUID().toString();
+        ContainerInfo containerInfo = null;
+        try {
+          // TODO: Fix this later when Ratis is made the Default.
+          containerInfo = containerManager.allocateContainer(type, factor,
+              containerName, owner);
+
+          if (containerInfo == null) {
+            LOG.warn("Unable to allocate container.");
+            continue;
+          }
+        } catch (IOException ex) {
+          LOG.warn("Unable to allocate container: {}", ex);
+          continue;
+        }
+      }
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Allocates a block in a container and returns that info.
+   *
+   * @param size - Block Size
+   * @param type Replication Type
+   * @param factor - Replication Factor
+   * @return Allocated block
+   * @throws IOException on failure.
+   */
+  @Override
+  public AllocatedBlock allocateBlock(final long size,
+      ReplicationType type, ReplicationFactor factor, String owner)
+      throws IOException {
+    LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
+
+    if (size < 0 || size > containerSize) {
+      LOG.warn("Invalid block size requested : {}", size);
+      throw new SCMException("Unsupported block size: " + size,
+          INVALID_BLOCK_SIZE);
+    }
+
+    if (!nodeManager.isOutOfChillMode()) {
+      LOG.warn("Not out of Chill mode.");
+      throw new SCMException("Unable to create block while in chill mode",
+          CHILL_MODE_EXCEPTION);
+    }
+
+    lock.lock();
+    try {
+      /*
+               Here is the high level logic.
+
+               1. First we check if there are containers in ALLOCATED state,
+               that is
+                SCM has allocated them in the SCM namespace but the
+                corresponding
+                container has not been created in the Datanode yet. If we
+                have any
+                in that state, we will return that to the client, which allows
+                client to finish creating those containers. This is a sort of
+                 greedy
+                 algorithm, our primary purpose is to get as many containers as
+                 possible.
+
+                2. If there are no allocated containers -- Then we find a Open
+                container that matches that pattern.
+
+                3. If both of them fail, the we will pre-allocate a bunch of
+                conatainers in SCM and try again.
+
+               TODO : Support random picking of two containers from the list.
+                So we
+               can use different kind of policies.
+      */
+
+      ContainerInfo containerInfo;
+
+      // Look for ALLOCATED container that matches all other parameters.
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(
+                  size, owner, type, factor, HddsProtos.LifeCycleState
+                      .ALLOCATED);
+      if (containerInfo != null) {
+        containerManager.updateContainerState(containerInfo.getContainerName(),
+            HddsProtos.LifeCycleEvent.CREATE);
+        return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+      }
+
+      // Since we found no allocated containers that match our criteria, let us
+      // look for OPEN containers that match the criteria.
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(size, owner, type, factor, HddsProtos
+                  .LifeCycleState.OPEN);
+      if (containerInfo != null) {
+        return newBlock(containerInfo, HddsProtos.LifeCycleState.OPEN);
+      }
+
+      // We found neither ALLOCATED or OPEN Containers. This generally means
+      // that most of our containers are full or we have not allocated
+      // containers of the type and replication factor. So let us go and
+      // allocate some.
+      preAllocateContainers(containerProvisionBatchSize, type, factor, owner);
+
+      // Since we just allocated a set of containers this should work
+      containerInfo =
+          containerManager
+              .getStateManager()
+              .getMatchingContainer(
+                  size, owner, type, factor, HddsProtos.LifeCycleState
+                      .ALLOCATED);
+      if (containerInfo != null) {
+        containerManager.updateContainerState(containerInfo.getContainerName(),
+            HddsProtos.LifeCycleEvent.CREATE);
+        return newBlock(containerInfo, HddsProtos.LifeCycleState.ALLOCATED);
+      }
+
+      // we have tried all strategies we know and but somehow we are not able
+      // to get a container for this block. Log that info and return a null.
+      LOG.error(
+          "Unable to allocate a block for the size: {}, type: {}, " +
+              "factor: {}",
+          size,
+          type,
+          factor);
+      return null;
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * newBlock - returns a new block assigned to a container.
+   *
+   * @param containerInfo - Container Info.
+   * @param state - Current state of the container.
+   * @return AllocatedBlock
+   */
+  private AllocatedBlock newBlock(
+      ContainerInfo containerInfo, HddsProtos.LifeCycleState state)
+      throws IOException {
+
+    // TODO : Replace this with Block ID.
+    String blockKey = UUID.randomUUID().toString();
+    boolean createContainer = (state == HddsProtos.LifeCycleState.ALLOCATED);
+
+    AllocatedBlock.Builder abb =
+        new AllocatedBlock.Builder()
+            .setKey(blockKey)
+            // TODO : Use containerinfo instead of pipeline.
+            .setPipeline(containerInfo.getPipeline())
+            .setShouldCreateContainer(createContainer);
+    LOG.trace("New block allocated : {} Container ID: {}", blockKey,
+        containerInfo.toString());
+
+    if (containerInfo.getPipeline().getMachines().size() == 0) {
+      LOG.error("Pipeline Machine count is zero.");
+      return null;
+    }
+
+    // Persist this block info to the blockStore DB, so getBlock(key) can
+    // find which container the block lives.
+    // TODO : Remove this DB in future
+    // and make this a KSM operation. Category: SCALABILITY.
+    if (containerInfo.getPipeline().getMachines().size() > 0) {
+      blockStore.put(
+          DFSUtil.string2Bytes(blockKey),
+          DFSUtil.string2Bytes(containerInfo.getPipeline().getContainerName()));
+    }
+    return abb.build();
+  }
+
+  /**
+   * Given a block key, return the Pipeline information.
+   *
+   * @param key - block key assigned by SCM.
+   * @return Pipeline (list of DNs and leader) to access the block.
+   * @throws IOException
+   */
+  @Override
+  public Pipeline getBlock(final String key) throws IOException {
+    lock.lock();
+    try {
+      byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
+      if (containerBytes == null) {
+        throw new SCMException(
+            "Specified block key does not exist. key : " + key,
+            FAILED_TO_FIND_BLOCK);
+      }
+
+      String containerName = DFSUtil.bytes2String(containerBytes);
+      ContainerInfo containerInfo = containerManager.getContainer(
+          containerName);
+      if (containerInfo == null) {
+        LOG.debug("Container {} allocated by block service"
+            + "can't be found in SCM", containerName);
+        throw new SCMException(
+            "Unable to find container for the block",
+            SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
+      }
+      return containerInfo.getPipeline();
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  /**
+   * Deletes a list of blocks in an atomic operation. Internally, SCM writes
+   * these blocks into a
+   * {@link DeletedBlockLog} and deletes them from SCM DB. If this is
+   * successful, given blocks are
+   * entering pending deletion state and becomes invisible from SCM namespace.
+   *
+   * @param blockIDs block IDs. This is often the list of blocks of a
+   * particular object key.
+   * @throws IOException if exception happens, non of the blocks is deleted.
+   */
+  @Override
+  public void deleteBlocks(List<String> blockIDs) throws IOException {
+    if (!nodeManager.isOutOfChillMode()) {
+      throw new SCMException("Unable to delete block while in chill mode",
+          CHILL_MODE_EXCEPTION);
+    }
+
+    lock.lock();
+    LOG.info("Deleting blocks {}", String.join(",", blockIDs));
+    Map<String, List<String>> containerBlocks = new HashMap<>();
+    BatchOperation batch = new BatchOperation();
+    BatchOperation rollbackBatch = new BatchOperation();
+    // TODO: track the block size info so that we can reclaim the container
+    // TODO: used space when the block is deleted.
+    try {
+      for (String blockKey : blockIDs) {
+        byte[] blockKeyBytes = DFSUtil.string2Bytes(blockKey);
+        byte[] containerBytes = blockStore.get(blockKeyBytes);
+        if (containerBytes == null) {
+          throw new SCMException(
+              "Specified block key does not exist. key : " + blockKey,
+              FAILED_TO_FIND_BLOCK);
+        }
+        batch.delete(blockKeyBytes);
+        rollbackBatch.put(blockKeyBytes, containerBytes);
+
+        // Merge blocks to a container to blocks mapping,
+        // prepare to persist this info to the deletedBlocksLog.
+        String containerName = DFSUtil.bytes2String(containerBytes);
+        if (containerBlocks.containsKey(containerName)) {
+          containerBlocks.get(containerName).add(blockKey);
+        } else {
+          List<String> item = new ArrayList<>();
+          item.add(blockKey);
+          containerBlocks.put(containerName, item);
+        }
+      }
+
+      // We update SCM DB first, so if this step fails, we end up here,
+      // nothing gets into the delLog so no blocks will be accidentally
+      // removed. If we write the log first, once log is written, the
+      // async deleting service will start to scan and might be picking
+      // up some blocks to do real deletions, that might cause data loss.
+      blockStore.writeBatch(batch);
+      try {
+        deletedBlockLog.addTransactions(containerBlocks);
+      } catch (IOException e) {
+        try {
+          // If delLog update is failed, we need to rollback the changes.
+          blockStore.writeBatch(rollbackBatch);
+        } catch (IOException rollbackException) {
+          // This is a corner case. AddTX fails and rollback also fails,
+          // this will leave these blocks in inconsistent state. They were
+          // moved to pending deletion state in SCM DB but were not written
+          // into delLog so real deletions would not be done. Blocks become
+          // to be invisible from namespace but actual data are not removed.
+          // We log an error here so admin can manually check and fix such
+          // errors.
+          LOG.error(
+              "Blocks might be in inconsistent state because"
+                  + " they were moved to pending deletion state in SCM DB but"
+                  + " not written into delLog. Admin can manually add them"
+                  + " into delLog for deletions. Inconsistent block list: {}",
+              String.join(",", blockIDs),
+              e);
+          throw rollbackException;
+        }
+        throw new IOException(
+            "Skip writing the deleted blocks info to"
+                + " the delLog because addTransaction fails. Batch skipped: "
+                + String.join(",", blockIDs),
+            e);
+      }
+      // TODO: Container report handling of the deleted blocks:
+      // Remove tombstone and update open container usage.
+      // We will revisit this when the closed container replication is done.
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  public DeletedBlockLog getDeletedBlockLog() {
+    return this.deletedBlockLog;
+  }
+
+  @VisibleForTesting
+  public String getDeletedKeyName(String key) {
+    return StringUtils.format(".Deleted/%s", key);
+  }
+
+  /**
+   * Close the resources for BlockManager.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (blockStore != null) {
+      blockStore.close();
+    }
+    if (deletedBlockLog != null) {
+      deletedBlockLog.close();
+    }
+    blockDeletingService.shutdown();
+    if (mxBean != null) {
+      MBeans.unregister(mxBean);
+      mxBean = null;
+    }
+  }
+
+  @Override
+  public int getOpenContainersNo() {
+    return 0;
+    // TODO : FIX ME : The open container being a single number does not make
+    // sense.
+    // We have to get open containers by Replication Type and Replication
+    // factor. Hence returning 0 for now.
+    // containers.get(HddsProtos.LifeCycleState.OPEN).size();
+  }
+
+  @Override
+  public SCMBlockDeletingService getSCMBlockDeletingService() {
+    return this.blockDeletingService;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
new file mode 100644
index 0000000..23c6983
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockmanagerMXBean.java
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+
+/**
+ * JMX interface for the block manager.
+ */
+public interface BlockmanagerMXBean {
+
+  /**
+   * Number of open containers manager by the block manager.
+   */
+  int getOpenContainersNo();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
new file mode 100644
index 0000000..47074d2
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DatanodeDeletedBlockTransactions.java
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.collect.ArrayListMultimap;
+import org.apache.hadoop.hdds.scm.container.Mapping;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+/**
+ * A wrapper class to hold info about datanode and all deleted block
+ * transactions that will be sent to this datanode.
+ */
+public class DatanodeDeletedBlockTransactions {
+  private int nodeNum;
+  // The throttle size for each datanode.
+  private int maximumAllowedTXNum;
+  // Current counter of inserted TX.
+  private int currentTXNum;
+  private Mapping mappingService;
+  // A list of TXs mapped to a certain datanode ID.
+  private final ArrayListMultimap<UUID, DeletedBlocksTransaction>
+      transactions;
+
+  DatanodeDeletedBlockTransactions(Mapping mappingService,
+      int maximumAllowedTXNum, int nodeNum) {
+    this.transactions = ArrayListMultimap.create();
+    this.mappingService = mappingService;
+    this.maximumAllowedTXNum = maximumAllowedTXNum;
+    this.nodeNum = nodeNum;
+  }
+
+  public void addTransaction(DeletedBlocksTransaction tx) throws IOException {
+    ContainerInfo info = null;
+    try {
+      info = mappingService.getContainer(tx.getContainerName());
+    } catch (IOException e) {
+      SCMBlockDeletingService.LOG.warn("Got container info error.", e);
+    }
+
+    if (info == null) {
+      SCMBlockDeletingService.LOG.warn(
+          "Container {} not found, continue to process next",
+          tx.getContainerName());
+      return;
+    }
+
+    for (DatanodeDetails dd : info.getPipeline().getMachines()) {
+      UUID dnID = dd.getUuid();
+      if (transactions.containsKey(dnID)) {
+        List<DeletedBlocksTransaction> txs = transactions.get(dnID);
+        if (txs != null && txs.size() < maximumAllowedTXNum) {
+          boolean hasContained = false;
+          for (DeletedBlocksTransaction t : txs) {
+            if (t.getContainerName().equals(tx.getContainerName())) {
+              hasContained = true;
+              break;
+            }
+          }
+
+          if (!hasContained) {
+            txs.add(tx);
+            currentTXNum++;
+          }
+        }
+      } else {
+        currentTXNum++;
+        transactions.put(dnID, tx);
+      }
+      SCMBlockDeletingService.LOG.debug("Transaction added: {} <- TX({})", dnID,
+          tx.getTxID());
+    }
+  }
+
+  Set<UUID> getDatanodeIDs() {
+    return transactions.keySet();
+  }
+
+  boolean isEmpty() {
+    return transactions.isEmpty();
+  }
+
+  boolean hasTransactions(UUID dnId) {
+    return transactions.containsKey(dnId) &&
+        !transactions.get(dnId).isEmpty();
+  }
+
+  List<DeletedBlocksTransaction> getDatanodeTransactions(UUID dnId) {
+    return transactions.get(dnId);
+  }
+
+  List<String> getTransactionIDList(UUID dnId) {
+    if (hasTransactions(dnId)) {
+      return transactions.get(dnId).stream()
+          .map(DeletedBlocksTransaction::getTxID).map(String::valueOf)
+          .collect(Collectors.toList());
+    } else {
+      return Collections.emptyList();
+    }
+  }
+
+  boolean isFull() {
+    return currentTXNum >= maximumAllowedTXNum * nodeNum;
+  }
+
+  int getTXNum() {
+    return currentTXNum;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/8b832f3c/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
new file mode 100644
index 0000000..f7b770e
--- /dev/null
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/DeletedBlockLog.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.block;
+
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * The DeletedBlockLog is a persisted log in SCM to keep tracking
+ * container blocks which are under deletion. It maintains info
+ * about under-deletion container blocks that notified by KSM,
+ * and the state how it is processed.
+ */
+public interface DeletedBlockLog extends Closeable {
+
+  /**
+   *  A limit size list of transactions. Note count is the max number
+   *  of TXs to return, we might not be able to always return this
+   *  number. and the processCount of those transactions
+   *  should be [0, MAX_RETRY).
+   *
+   * @param count - number of transactions.
+   * @return a list of BlockDeletionTransaction.
+   */
+  List<DeletedBlocksTransaction> getTransactions(int count)
+      throws IOException;
+
+  /**
+   * Scan entire log once and returns TXs to DatanodeDeletedBlockTransactions.
+   * Once DatanodeDeletedBlockTransactions is full, the scan behavior will
+   * stop.
+   * @param transactions a list of TXs will be set into.
+   * @throws IOException
+   */
+  void getTransactions(DatanodeDeletedBlockTransactions transactions)
+      throws IOException;
+
+  /**
+   * Return all failed transactions in the log. A transaction is considered
+   * to be failed if it has been sent more than MAX_RETRY limit and its
+   * count is reset to -1.
+   *
+   * @return a list of failed deleted block transactions.
+   * @throws IOException
+   */
+  List<DeletedBlocksTransaction> getFailedTransactions()
+      throws IOException;
+
+  /**
+   * Increments count for given list of transactions by 1.
+   * The log maintains a valid range of counts for each transaction
+   * [0, MAX_RETRY]. If exceed this range, resets it to -1 to indicate
+   * the transaction is no longer valid.
+   *
+   * @param txIDs - transaction ID.
+   */
+  void incrementCount(List<Long> txIDs)
+      throws IOException;
+
+  /**
+   * Commits a transaction means to delete all footprints of a transaction
+   * from the log. This method doesn't guarantee all transactions can be
+   * successfully deleted, it tolerate failures and tries best efforts to.
+   *
+   * @param txIDs - transaction IDs.
+   */
+  void commitTransactions(List<Long> txIDs) throws IOException;
+
+  /**
+   * Creates a block deletion transaction and adds that into the log.
+   *
+   * @param containerName - container name.
+   * @param blocks - blocks that belong to the same container.
+   *
+   * @throws IOException
+   */
+  void addTransaction(String containerName, List<String> blocks)
+      throws IOException;
+
+  /**
+   * Creates block deletion transactions for a set of containers,
+   * add into the log and persist them atomically. An object key
+   * might be stored in multiple containers and multiple blocks,
+   * this API ensures that these updates are done in atomic manner
+   * so if any of them fails, the entire operation fails without
+   * any updates to the log. Note, this doesn't mean to create only
+   * one transaction, it creates multiple transactions (depends on the
+   * number of containers) together (on success) or non (on failure).
+   *
+   * @param containerBlocksMap a map of containerBlocks.
+   * @throws IOException
+   */
+  void addTransactions(Map<String, List<String>> containerBlocksMap)
+      throws IOException;
+
+  /**
+   * Returns the total number of valid transactions. A transaction is
+   * considered to be valid as long as its count is in range [0, MAX_RETRY].
+   *
+   * @return number of a valid transactions.
+   * @throws IOException
+   */
+  int getNumOfValidTransactions() throws IOException;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message