hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From a...@apache.org
Subject [hadoop] branch trunk updated: HDDS-1119. DN get OM certificate from SCM CA for block token validation. Contributed by Ajay Kumar. (#601)
Date Tue, 19 Mar 2019 06:08:24 GMT
This is an automated email from the ASF dual-hosted git repository.

ajay pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new f10d493   HDDS-1119. DN get OM certificate from SCM CA for block token validation. Contributed by Ajay Kumar. (#601)
f10d493 is described below

commit f10d49332522beca7cb7342e68b2acdbe4c974f8
Author: Ajay Yadav <7813154+ajayydv@users.noreply.github.com>
AuthorDate: Mon Mar 18 23:08:17 2019 -0700

     HDDS-1119. DN get OM certificate from SCM CA for block token validation. Contributed by Ajay Kumar. (#601)
---
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  44 ++++-
 .../hdds/security/token/BlockTokenVerifier.java    |  45 ++---
 .../x509/certificate/client/CertificateClient.java |  19 +-
 .../certificate/client/DNCertificateClient.java    |   7 +-
 .../client/DefaultCertificateClient.java           | 206 ++++++++++++++++----
 .../certificate/client/OMCertificateClient.java    |   7 +-
 .../x509/exceptions/CertificateException.java      |   3 +-
 .../org/apache/hadoop/ozone/common/Storage.java    |  12 ++
 .../org/apache/hadoop/utils/db/CodecRegistry.java  |   1 +
 .../java/org/apache/hadoop/utils/db/LongCodec.java |  46 +++++
 hadoop-hdds/common/src/main/proto/hdds.proto       |   1 +
 .../client/TestCertificateClientInit.java          |  26 ++-
 .../client/TestDefaultCertificateClient.java       | 212 ++++++++++++++++-----
 .../apache/hadoop/ozone/HddsDatanodeService.java   |  46 +++--
 .../common/statemachine/DatanodeStateMachine.java  |  12 +-
 .../common/transport/server/XceiverServer.java     |  14 +-
 .../common/transport/server/XceiverServerGrpc.java |   6 +-
 .../transport/server/ratis/XceiverServerRatis.java |  10 +-
 .../ozone/container/ozoneimpl/OzoneContainer.java  |  10 +-
 .../hadoop/ozone/TestHddsSecureDatanodeInit.java   |   8 +-
 .../container/common/TestDatanodeStateMachine.java |   8 +-
 .../TestCloseContainerCommandHandler.java          |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   2 +-
 .../ozone/container/common/TestEndPoint.java       |  10 +-
 .../apache/hadoop/ozone/om/OMMetadataManager.java  |   8 +
 .../hadoop/ozone/om/S3SecretManagerImpl.java       |   5 +-
 .../ozone/om/codec/TokenIdentifierCodec.java       |  52 +++++
 .../security/OzoneBlockTokenSecretManager.java     |  17 +-
 .../OzoneDelegationTokenSecretManager.java         |  94 ++-------
 .../hadoop/ozone/security/OzoneSecretManager.java  |   5 -
 .../hadoop/ozone/security/OzoneSecretStore.java    | 198 ++++---------------
 .../main/compose/ozonesecure/docker-compose.yaml   |   2 +-
 .../src/main/smoketest/security/ozone-secure.robot |   3 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |  16 +-
 .../apache/hadoop/ozone/TestMiniOzoneCluster.java  |  18 +-
 .../hadoop/ozone/TestSecureOzoneCluster.java       | 125 ++++++++----
 .../ozone/client/CertificateClientTestImpl.java    |  55 +++---
 .../ozone/client/rpc/TestSecureOzoneRpcClient.java |  20 +-
 .../ozone/container/ContainerTestHelper.java       |  14 ++
 .../transport/server/ratis/TestCSMMetrics.java     |   2 +-
 .../container/metrics/TestContainerMetrics.java    |   2 +-
 .../container/ozoneimpl/TestOzoneContainer.java    |   2 +-
 .../ozoneimpl/TestOzoneContainerWithTLS.java       |   2 +-
 .../ozoneimpl/TestSecureOzoneContainer.java        |  85 ++++-----
 .../container/server/TestContainerServer.java      |  11 +-
 .../server/TestSecureContainerServer.java          |  70 +++++--
 .../hadoop/ozone/om/TestSecureOzoneManager.java    |  12 +-
 .../java/org/apache/hadoop/ozone/om/OMStorage.java |  17 ++
 .../hadoop/ozone/om/OmMetadataManagerImpl.java     |  16 ++
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |  43 +++--
 .../security/TestOzoneBlockTokenSecretManager.java |   0
 .../TestOzoneDelegationTokenSecretManager.java     |  42 ++--
 52 files changed, 1093 insertions(+), 600 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index bae22a2..1dfeecd 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -45,6 +45,7 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
   private String ipAddress;
   private String hostName;
   private List<Port> ports;
+  private String certSerialId;
 
 
   /**
@@ -54,13 +55,15 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
    * @param ipAddress IP Address of this DataNode
    * @param hostName DataNode's hostname
    * @param ports Ports used by the DataNode
+   * @param certSerialId serial id from SCM issued certificate.
    */
   private DatanodeDetails(String uuid, String ipAddress, String hostName,
-      List<Port> ports) {
+      List<Port> ports, String certSerialId) {
     this.uuid = UUID.fromString(uuid);
     this.ipAddress = ipAddress;
     this.hostName = hostName;
     this.ports = ports;
+    this.certSerialId = certSerialId;
   }
 
   protected DatanodeDetails(DatanodeDetails datanodeDetails) {
@@ -177,6 +180,9 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     if (datanodeDetailsProto.hasHostName()) {
       builder.setHostName(datanodeDetailsProto.getHostName());
     }
+    if (datanodeDetailsProto.hasCertSerialId()) {
+      builder.setCertSerialId(datanodeDetailsProto.getCertSerialId());
+    }
     for (HddsProtos.Port port : datanodeDetailsProto.getPortsList()) {
       builder.addPort(newPort(
           Port.Name.valueOf(port.getName().toUpperCase()), port.getValue()));
@@ -198,6 +204,9 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     if (hostName != null) {
       builder.setHostName(hostName);
     }
+    if (certSerialId != null) {
+      builder.setCertSerialId(certSerialId);
+    }
     for (Port port : ports) {
       builder.addPorts(HddsProtos.Port.newBuilder()
           .setName(port.getName().toString())
@@ -214,6 +223,7 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
         ipAddress +
         ", host: " +
         hostName +
+        ", certSerialId: " + certSerialId +
         "}";
   }
 
@@ -250,6 +260,7 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     private String ipAddress;
     private String hostName;
     private List<Port> ports;
+    private String certSerialId;
 
     /**
      * Default private constructor. To create Builder instance use
@@ -305,13 +316,25 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     }
 
     /**
+     * Adds certificate serial id.
+     *
+     * @param certId Serial id of SCM issued certificate.
+     *
+     * @return DatanodeDetails.Builder
+     */
+    public Builder setCertSerialId(String certId) {
+      this.certSerialId = certId;
+      return this;
+    }
+
+    /**
      * Builds and returns DatanodeDetails instance.
      *
      * @return DatanodeDetails
      */
     public DatanodeDetails build() {
       Preconditions.checkNotNull(id);
-      return new DatanodeDetails(id, ipAddress, hostName, ports);
+      return new DatanodeDetails(id, ipAddress, hostName, ports, certSerialId);
     }
 
   }
@@ -398,4 +421,21 @@ public class DatanodeDetails implements Comparable<DatanodeDetails> {
     }
   }
 
+  /**
+   * Returns serial id of SCM issued certificate.
+   *
+   * @return certificate serial id
+   */
+  public String getCertSerialId() {
+    return certSerialId;
+  }
+
+  /**
+   * Set certificate serial id of SCM issued certificate.
+   *
+   */
+  public void setCertSerialId(String certSerialId) {
+    this.certSerialId = certSerialId;
+  }
+
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
index f76dac4..2742ace 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/token/BlockTokenVerifier.java
@@ -60,9 +60,9 @@ public class BlockTokenVerifier implements TokenVerifier {
     if (conf.isBlockTokenEnabled()) {
       // TODO: add audit logs.
 
-      if (Strings.isNullOrEmpty(tokenStr) || isTestStub()) {
+      if (Strings.isNullOrEmpty(tokenStr)) {
         throw new BlockTokenException("Fail to find any token (empty or " +
-            "null.");
+            "null.)");
       }
       final Token<OzoneBlockTokenIdentifier> token = new Token();
       OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier();
@@ -78,29 +78,26 @@ public class BlockTokenVerifier implements TokenVerifier {
         throw new BlockTokenException("Failed to decode token : " + tokenStr);
       }
 
-      // TODO: revisit this when caClient is ready, skip signature check now.
-      /**
-       * the final code should like
-       * if (caClient == null) {
-       *   throw new SCMSecurityException("Certificate client not available to
-       *       validate token");
-       * }
-       */
-      if (caClient != null) {
-        X509Certificate singerCert = caClient.queryCertificate(
-            "certId=" + tokenId.getOmCertSerialId());
-        if (singerCert == null) {
-          throw new BlockTokenException("Can't find signer certificate " +
-              "(OmCertSerialId: " + tokenId.getOmCertSerialId() +
-              ") of the block token for user: " + tokenId.getUser());
-        }
-        Boolean validToken = caClient.verifySignature(tokenId.getBytes(),
-            token.getPassword(), singerCert);
-        if (!validToken) {
-          throw new BlockTokenException("Invalid block token for user: " +
-              tokenId.getUser());
-        }
+      if (caClient == null) {
+        throw new SCMSecurityException("Certificate client not available " +
+            "to validate token");
       }
+
+      X509Certificate singerCert;
+      singerCert = caClient.getCertificate(tokenId.getOmCertSerialId());
+
+      if (singerCert == null) {
+        throw new BlockTokenException("Can't find signer certificate " +
+            "(OmCertSerialId: " + tokenId.getOmCertSerialId() +
+            ") of the block token for user: " + tokenId.getUser());
+      }
+      boolean validToken = caClient.verifySignature(tokenId.getBytes(),
+          token.getPassword(), singerCert);
+      if (!validToken) {
+        throw new BlockTokenException("Invalid block token for user: " +
+            tokenId.getUser());
+      }
+
       // check expiration
       if (isExpired(tokenId.getExpiryDate())) {
         UserGroupInformation tokenUser = tokenId.getUser();
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
index 3d312462..480758b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/CertificateClient.java
@@ -54,8 +54,17 @@ public interface CertificateClient {
   /**
    * Returns the certificate  of the specified component if it exists on the
    * local system.
+   * @param certSerialId
    *
+   * @return certificate or Null if there is no data.
+   */
+  X509Certificate getCertificate(String certSerialId)
+      throws CertificateException;
 
+  /**
+   * Returns the certificate  of the specified component if it exists on the
+   * local system.
+   *
    * @return certificate or Null if there is no data.
    */
   X509Certificate getCertificate();
@@ -121,13 +130,15 @@ public interface CertificateClient {
   X509Certificate queryCertificate(String query);
 
   /**
-   * Stores the Certificate.
+   * Stores the Certificate  for this client. Don't use this api to add
+   * trusted certificates of others.
    *
-   * @param certificate - X509 Certificate
-
+   * @param pemEncodedCert        - pem encoded X509 Certificate
+   * @param force                 - override any existing file
    * @throws CertificateException - on Error.
+   *
    */
-  void storeCertificate(X509Certificate certificate)
+  void storeCertificate(String pemEncodedCert, boolean force)
       throws CertificateException;
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
index ae678fe..7790d04 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DNCertificateClient.java
@@ -32,8 +32,13 @@ public class DNCertificateClient extends DefaultCertificateClient {
 
   private static final Logger LOG =
       LoggerFactory.getLogger(DNCertificateClient.class);
+  public DNCertificateClient(SecurityConfig securityConfig,
+      String certSerialId) {
+    super(securityConfig, LOG, certSerialId);
+  }
+
   public DNCertificateClient(SecurityConfig securityConfig) {
-    super(securityConfig, LOG);
+    super(securityConfig, LOG, null);
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
index 6d023b2..26be970 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/DefaultCertificateClient.java
@@ -28,13 +28,26 @@ import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRe
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
+import org.apache.hadoop.hdds.HddsUtils;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.SCMSecurityProtocol;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolClientSideTranslatorPB;
+import org.apache.hadoop.hdds.protocolPB.SCMSecurityProtocolPB;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.ProtobufRpcEngine;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.slf4j.Logger;
 
 import java.io.ByteArrayInputStream;
+import java.io.File;
 import java.io.IOException;
 import java.io.InputStream;
+import java.net.InetSocketAddress;
 import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -47,11 +60,12 @@ import java.security.PublicKey;
 import java.security.Signature;
 import java.security.SignatureException;
 import java.security.cert.CertStore;
-import java.security.cert.CertificateEncodingException;
 import java.security.cert.X509Certificate;
 import java.security.spec.InvalidKeySpecException;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
 
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.GETCERT;
@@ -65,24 +79,75 @@ import static org.apache.hadoop.hdds.security.x509.exceptions.CertificateExcepti
  */
 public abstract class DefaultCertificateClient implements CertificateClient {
 
+  private static final String CERT_FILE_NAME_FORMAT = "%s.crt";
   private final Logger logger;
   private final SecurityConfig securityConfig;
   private final KeyCodec keyCodec;
   private PrivateKey privateKey;
   private PublicKey publicKey;
   private X509Certificate x509Certificate;
+  private Map<String, X509Certificate> certificateMap;
+  private String certSerialId;
 
 
-  DefaultCertificateClient(SecurityConfig securityConfig, Logger log) {
+  DefaultCertificateClient(SecurityConfig securityConfig, Logger log,
+      String certSerialId) {
     Objects.requireNonNull(securityConfig);
     this.securityConfig = securityConfig;
     keyCodec = new KeyCodec(securityConfig);
     this.logger = log;
+    this.certificateMap = new ConcurrentHashMap<>();
+    this.certSerialId = certSerialId;
+
+    loadAllCertificates();
+  }
+
+  /**
+   * Load all certificates from configured location.
+   * */
+  private void loadAllCertificates() {
+    // See if certs directory exists in file system.
+    Path certPath = securityConfig.getCertificateLocation();
+    if (Files.exists(certPath) && Files.isDirectory(certPath)) {
+      getLogger().info("Loading certificate from location:{}.",
+          certPath);
+      File[] certFiles = certPath.toFile().listFiles();
+
+      if (certFiles != null) {
+        CertificateCodec certificateCodec =
+            new CertificateCodec(securityConfig);
+        for (File file : certFiles) {
+          if (file.isFile()) {
+            try {
+              X509CertificateHolder x509CertificateHolder = certificateCodec
+                  .readCertificate(certPath, file.getName());
+              X509Certificate cert =
+                  CertificateCodec.getX509Certificate(x509CertificateHolder);
+              if (cert != null && cert.getSerialNumber() != null) {
+                if (cert.getSerialNumber().toString().equals(certSerialId)) {
+                  x509Certificate = cert;
+                }
+                certificateMap.putIfAbsent(cert.getSerialNumber().toString(),
+                    cert);
+                getLogger().info("Added certificate from file:{}.",
+                    file.getAbsolutePath());
+              } else {
+                getLogger().error("Error reading certificate from file:{}",
+                    file);
+              }
+            } catch (java.security.cert.CertificateException | IOException e) {
+              getLogger().error("Error reading certificate from file:{}.",
+                  file.getAbsolutePath(), e);
+            }
+          }
+        }
+      }
+    }
   }
 
   /**
-   * Returns the private key of the specified component if it exists on the
-   * local system.
+   * Returns the private key of the specified  if it exists on the local
+   * system.
    *
    * @return private key or Null if there is no data.
    */
@@ -106,8 +171,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   }
 
   /**
-   * Returns the public key of the specified component if it exists on the
-   * local system.
+   * Returns the public key of the specified if it exists on the local system.
    *
    * @return public key or Null if there is no data.
    */
@@ -131,35 +195,73 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   }
 
   /**
-   * Returns the certificate  of the specified component if it exists on the
-   * local system.
+   * Returns the default certificate of given client if it exists.
    *
    * @return certificate or Null if there is no data.
    */
   @Override
   public X509Certificate getCertificate() {
-    if(x509Certificate != null){
+    if (x509Certificate != null) {
       return x509Certificate;
     }
 
-    Path certPath = securityConfig.getCertificateLocation();
-    if (OzoneSecurityUtil.checkIfFileExist(certPath,
-        securityConfig.getCertificateFileName())) {
-      CertificateCodec certificateCodec =
-          new CertificateCodec(securityConfig);
-      try {
-        X509CertificateHolder x509CertificateHolder =
-            certificateCodec.readCertificate();
-        x509Certificate =
-            CertificateCodec.getX509Certificate(x509CertificateHolder);
-      } catch (java.security.cert.CertificateException | IOException e) {
-        getLogger().error("Error reading certificate.", e);
-      }
+    if (certSerialId == null) {
+      getLogger().error("Default certificate serial id is not set. Can't " +
+          "locate the default certificate for this client.");
+      return null;
+    }
+    // Refresh the cache from file system.
+    loadAllCertificates();
+    if (certificateMap.containsKey(certSerialId)) {
+      x509Certificate = certificateMap.get(certSerialId);
     }
     return x509Certificate;
   }
 
   /**
+   * Returns the certificate  with the specified certificate serial id if it
+   * exists else try to get it from SCM.
+   * @param  certId
+   *
+   * @return certificate or Null if there is no data.
+   */
+  @Override
+  public X509Certificate getCertificate(String certId)
+      throws CertificateException {
+    // Check if it is in cache.
+    if (certificateMap.containsKey(certId)) {
+      return certificateMap.get(certId);
+    }
+    // Try to get it from SCM.
+    return this.getCertificateFromScm(certId);
+  }
+
+  /**
+   * Get certificate from SCM and store it in local file system.
+   * @param certId
+   * @return certificate
+   */
+  private X509Certificate getCertificateFromScm(String certId)
+      throws CertificateException {
+
+    getLogger().info("Getting certificate with certSerialId:{}.",
+        certId);
+    try {
+      SCMSecurityProtocol scmSecurityProtocolClient = getScmSecurityClient(
+          (OzoneConfiguration) securityConfig.getConfiguration());
+      String pemEncodedCert =
+          scmSecurityProtocolClient.getCertificate(certId);
+      this.storeCertificate(pemEncodedCert, true);
+      return CertificateCodec.getX509Certificate(pemEncodedCert);
+    } catch (Exception e) {
+      getLogger().error("Error while getting Certificate with " +
+          "certSerialId:{} from scm.", certId, e);
+      throw new CertificateException("Error while getting certificate for " +
+          "certSerialId:" + certId, e, CERTIFICATE_ERROR);
+    }
+  }
+
+  /**
    * Verifies if this certificate is part of a trusted chain.
    *
    * @param certificate - certificate.
@@ -171,8 +273,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   }
 
   /**
-   * Creates digital signature over the data stream using the components
-   * private key.
+   * Creates digital signature over the data stream using the s private key.
    *
    * @param stream - Data stream to sign.
    * @throws CertificateException - on Error.
@@ -200,10 +301,9 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   }
 
   /**
-   * Creates digital signature over the data stream using the components
-   * private key.
+   * Creates digital signature over the data stream using the s private key.
    *
-   * @param data        - Data to sign.
+   * @param data - Data to sign.
    * @throws CertificateException - on Error.
    */
   @Override
@@ -349,29 +449,39 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   }
 
   /**
-   * Stores the Certificate  for this client. Don't use this api to add
-   * trusted certificates of other components.
+   * Stores the Certificate  for this client. Don't use this api to add trusted
+   * certificates of others.
    *
-   * @param certificate - X509 Certificate
+   * @param pemEncodedCert - pem encoded X509 Certificate
+   * @param force - override any existing file
    * @throws CertificateException - on Error.
+   *
    */
   @Override
-  public void storeCertificate(X509Certificate certificate)
+  public void storeCertificate(String pemEncodedCert, boolean force)
       throws CertificateException {
     CertificateCodec certificateCodec = new CertificateCodec(securityConfig);
     try {
-      certificateCodec.writeCertificate(
-          new X509CertificateHolder(certificate.getEncoded()));
-    } catch (IOException | CertificateEncodingException e) {
+      Path basePath = securityConfig.getCertificateLocation();
+
+      X509Certificate cert =
+          CertificateCodec.getX509Certificate(pemEncodedCert);
+      String certName = String.format(CERT_FILE_NAME_FORMAT,
+          cert.getSerialNumber().toString());
+
+      certificateCodec.writeCertificate(basePath, certName,
+          pemEncodedCert, force);
+      certificateMap.putIfAbsent(cert.getSerialNumber().toString(), cert);
+    } catch (IOException | java.security.cert.CertificateException e) {
       throw new CertificateException("Error while storing certificate.", e,
           CERTIFICATE_ERROR);
     }
   }
 
   /**
-   * Stores the trusted chain of certificates for a specific component.
+   * Stores the trusted chain of certificates for a specific .
    *
-   * @param ks                    - Key Store.
+   * @param ks - Key Store.
    * @throws CertificateException - on Error.
    */
   @Override
@@ -382,7 +492,7 @@ public abstract class DefaultCertificateClient implements CertificateClient {
 
 
   /**
-   * Stores the trusted chain of certificates for a specific component.
+   * Stores the trusted chain of certificates for a specific .
    *
    * @param certificates - List of Certificates.
    * @throws CertificateException - on Error.
@@ -640,4 +750,26 @@ public abstract class DefaultCertificateClient implements CertificateClient {
   public Logger getLogger() {
     return logger;
   }
+
+  /**
+   * Create a scm security client, used to get SCM signed certificate.
+   *
+   * @return {@link SCMSecurityProtocol}
+   */
+  private static SCMSecurityProtocol getScmSecurityClient(
+      OzoneConfiguration conf) throws IOException {
+    RPC.setProtocolEngine(conf, SCMSecurityProtocolPB.class,
+        ProtobufRpcEngine.class);
+    long scmVersion =
+        RPC.getProtocolVersion(ScmBlockLocationProtocolPB.class);
+    InetSocketAddress scmSecurityProtoAdd =
+        HddsUtils.getScmAddressForSecurityProtocol(conf);
+    SCMSecurityProtocolClientSideTranslatorPB scmSecurityClient =
+        new SCMSecurityProtocolClientSideTranslatorPB(
+            RPC.getProxy(SCMSecurityProtocolPB.class, scmVersion,
+                scmSecurityProtoAdd, UserGroupInformation.getCurrentUser(),
+                conf, NetUtils.getDefaultSocketFactory(conf),
+                Client.getRpcTimeout(conf)));
+    return scmSecurityClient;
+  }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
index 5744130..b1f7504 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/certificate/client/OMCertificateClient.java
@@ -39,8 +39,13 @@ public class OMCertificateClient extends DefaultCertificateClient {
   private static final Logger LOG =
       LoggerFactory.getLogger(OMCertificateClient.class);
 
+  public OMCertificateClient(SecurityConfig securityConfig,
+      String certSerialId) {
+    super(securityConfig, LOG, certSerialId);
+  }
+
   public OMCertificateClient(SecurityConfig securityConfig) {
-    super(securityConfig, LOG);
+    super(securityConfig, LOG, null);
   }
 
   protected InitResponse handleCase(InitCase init) throws
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
index 49f8a18..b312128 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/security/x509/exceptions/CertificateException.java
@@ -83,6 +83,7 @@ public class CertificateException extends SCMSecurityException {
     CERTIFICATE_ERROR,
     BOOTSTRAP_ERROR,
     CSR_ERROR,
-    CRYPTO_SIGNATURE_VERIFICATION_ERROR
+    CRYPTO_SIGNATURE_VERIFICATION_ERROR,
+    CERTIFICATE_NOT_FOUND_ERROR
   }
 }
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
index 1826a58..9ad87ae 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/Storage.java
@@ -245,5 +245,17 @@ public abstract class Storage {
     storageInfo.writeTo(getVersionFile());
   }
 
+  /**
+   * Persists current StorageInfo to file system..
+   * @throws IOException
+   */
+  public void persistCurrentState() throws IOException {
+    if (!getCurrentDir().exists()) {
+      throw new IOException("Metadata dir doesn't exist, dir: " +
+          getCurrentDir());
+    }
+    storageInfo.writeTo(getVersionFile());
+  }
+
 }
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
index 1dd1842..104fd4b 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/CodecRegistry.java
@@ -34,6 +34,7 @@ public class CodecRegistry {
   public CodecRegistry() {
     valueCodecs = new HashMap<>();
     valueCodecs.put(String.class, new StringCodec());
+    valueCodecs.put(Long.class, new LongCodec());
   }
 
   /**
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/LongCodec.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/LongCodec.java
new file mode 100644
index 0000000..c7a249e
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/db/LongCodec.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.utils.db;
+
+import com.google.common.primitives.Longs;
+
+
+/**
+ * Codec to convert Long to/from byte array.
+ */
+public class LongCodec implements Codec<Long> {
+
+  @Override
+  public byte[] toPersistedFormat(Long object) {
+    if (object != null) {
+      return Longs.toByteArray(object);
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public Long fromPersistedFormat(byte[] rawData) {
+    if (rawData != null) {
+      return Longs.fromByteArray(rawData);
+    } else {
+      return null;
+    }
+  }
+}
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto b/hadoop-hdds/common/src/main/proto/hdds.proto
index a7d7704..f557050 100644
--- a/hadoop-hdds/common/src/main/proto/hdds.proto
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -33,6 +33,7 @@ message DatanodeDetailsProto {
     required string ipAddress = 2;     // IP address
     required string hostName = 3;      // hostname
     repeated Port ports = 4;
+    optional string certSerialId = 5;   // Certificate serial id.
 }
 
 /**
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java
index c877c30..61bcf21 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestCertificateClientInit.java
@@ -59,12 +59,15 @@ import static org.junit.Assert.assertTrue;
 @SuppressWarnings("visibilitymodifier")
 public class TestCertificateClientInit {
 
+  private KeyPair keyPair;
+  private String certSerialId = "3284792342234";
   private CertificateClient dnCertificateClient;
   private CertificateClient omCertificateClient;
   private HDDSKeyGenerator keyGenerator;
   private Path metaDirPath;
   private SecurityConfig securityConfig;
   private KeyCodec keyCodec;
+  private X509Certificate x509Certificate;
 
   @Parameter
   public boolean pvtKeyPresent;
@@ -96,10 +99,16 @@ public class TestCertificateClientInit {
     metaDirPath = Paths.get(path, "test");
     config.set(HDDS_METADATA_DIR_NAME, metaDirPath.toString());
     securityConfig = new SecurityConfig(config);
-    dnCertificateClient = new DNCertificateClient(securityConfig);
-    omCertificateClient = new OMCertificateClient(securityConfig);
     keyGenerator = new HDDSKeyGenerator(securityConfig);
+    keyPair = keyGenerator.generateKey();
+    x509Certificate = getX509Certificate();
+    certSerialId = x509Certificate.getSerialNumber().toString();
+    dnCertificateClient = new DNCertificateClient(securityConfig,
+        certSerialId);
+    omCertificateClient = new OMCertificateClient(securityConfig,
+        certSerialId);
     keyCodec = new KeyCodec(securityConfig);
+
     Files.createDirectories(securityConfig.getKeyLocation());
   }
 
@@ -113,7 +122,6 @@ public class TestCertificateClientInit {
 
   @Test
   public void testInitDatanode() throws Exception {
-    KeyPair keyPair = keyGenerator.generateKey();
     if (pvtKeyPresent) {
       keyCodec.writePrivateKey(keyPair.getPrivate());
     } else {
@@ -131,9 +139,6 @@ public class TestCertificateClientInit {
     }
 
     if (certPresent) {
-      X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
-          "CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
-
       CertificateCodec codec = new CertificateCodec(securityConfig);
       codec.writeCertificate(new X509CertificateHolder(
           x509Certificate.getEncoded()));
@@ -157,7 +162,6 @@ public class TestCertificateClientInit {
 
   @Test
   public void testInitOzoneManager() throws Exception {
-    KeyPair keyPair = keyGenerator.generateKey();
     if (pvtKeyPresent) {
       keyCodec.writePrivateKey(keyPair.getPrivate());
     } else {
@@ -175,9 +179,6 @@ public class TestCertificateClientInit {
     }
 
     if (certPresent) {
-      X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
-          "CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
-
       CertificateCodec codec = new CertificateCodec(securityConfig);
       codec.writeCertificate(new X509CertificateHolder(
           x509Certificate.getEncoded()));
@@ -202,4 +203,9 @@ public class TestCertificateClientInit {
           securityConfig.getPublicKeyFileName()));
     }
   }
+
+  private X509Certificate getX509Certificate() throws Exception {
+    return KeyStoreTestUtil.generateCertificate(
+        "CN=Test", keyPair, 10, securityConfig.getSignatureAlgo());
+  }
 }
\ No newline at end of file
diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
index 855c1cb..11be0de 100644
--- a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/security/x509/certificate/client/TestDefaultCertificateClient.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hdds.security.x509.certificate.client;
 
 import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
+import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.hdds.security.x509.keys.KeyCodec;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.junit.After;
@@ -49,8 +50,11 @@ import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.*;
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_METADATA_DIR_NAME;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
 import static org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient.InitResponse.FAILURE;
+import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getPEMEncodedString;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
@@ -62,37 +66,60 @@ import static org.junit.Assert.assertTrue;
  */
 public class TestDefaultCertificateClient {
 
+  private String certSerialId;
+  private X509Certificate x509Certificate;
   private OMCertificateClient omCertClient;
   private DNCertificateClient dnCertClient;
   private HDDSKeyGenerator keyGenerator;
-  private Path metaDirPath;
-  private SecurityConfig securityConfig;
+  private Path omMetaDirPath;
+  private Path dnMetaDirPath;
+  private SecurityConfig omSecurityConfig;
+  private SecurityConfig dnSecurityConfig;
   private final static String UTF = "UTF-8";
-  private KeyCodec keyCodec;
+  private KeyCodec omKeyCodec;
+  private KeyCodec dnKeyCodec;
 
   @Before
   public void setUp() throws Exception {
     OzoneConfiguration config = new OzoneConfiguration();
-    final String path = GenericTestUtils
+    config.setStrings(OZONE_SCM_NAMES, "localhost");
+    config.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
+    final String omPath = GenericTestUtils
         .getTempPath(UUID.randomUUID().toString());
-    metaDirPath = Paths.get(path, "test");
-    config.set(HDDS_METADATA_DIR_NAME, metaDirPath.toString());
-    securityConfig = new SecurityConfig(config);
+    final String dnPath = GenericTestUtils
+        .getTempPath(UUID.randomUUID().toString());
+
+    omMetaDirPath = Paths.get(omPath, "test");
+    dnMetaDirPath = Paths.get(dnPath, "test");
+
+    config.set(HDDS_METADATA_DIR_NAME, omMetaDirPath.toString());
+    omSecurityConfig = new SecurityConfig(config);
+    config.set(HDDS_METADATA_DIR_NAME, dnMetaDirPath.toString());
+    dnSecurityConfig = new SecurityConfig(config);
+
+
+    keyGenerator = new HDDSKeyGenerator(omSecurityConfig);
+    omKeyCodec = new KeyCodec(omSecurityConfig);
+    dnKeyCodec = new KeyCodec(dnSecurityConfig);
+
+    Files.createDirectories(omSecurityConfig.getKeyLocation());
+    Files.createDirectories(dnSecurityConfig.getKeyLocation());
+    x509Certificate = generateX509Cert(null);
+    certSerialId = x509Certificate.getSerialNumber().toString();
     getCertClient();
-    keyGenerator = new HDDSKeyGenerator(securityConfig);
-    keyCodec = new KeyCodec(securityConfig);
-    Files.createDirectories(securityConfig.getKeyLocation());
   }
 
   private void getCertClient() {
-    omCertClient = new OMCertificateClient(securityConfig);
-    dnCertClient = new DNCertificateClient(securityConfig);
+    omCertClient = new OMCertificateClient(omSecurityConfig, certSerialId);
+    dnCertClient = new DNCertificateClient(dnSecurityConfig, certSerialId);
   }
 
   @After
   public void tearDown() {
     omCertClient = null;
-    FileUtils.deleteQuietly(metaDirPath.toFile());
+    dnCertClient = null;
+    FileUtils.deleteQuietly(omMetaDirPath.toFile());
+    FileUtils.deleteQuietly(dnMetaDirPath.toFile());
   }
 
   /**
@@ -101,6 +128,7 @@ public class TestDefaultCertificateClient {
    */
   @Test
   public void testKeyOperations() throws Exception {
+    cleanupOldKeyPair();
     PrivateKey pvtKey = omCertClient.getPrivateKey();
     PublicKey publicKey = omCertClient.getPublicKey();
     assertNull(publicKey);
@@ -111,18 +139,33 @@ public class TestDefaultCertificateClient {
     assertNotNull(pvtKey);
     assertEquals(pvtKey, keyPair.getPrivate());
 
-    publicKey = omCertClient.getPublicKey();
+    publicKey = dnCertClient.getPublicKey();
     assertNotNull(publicKey);
     assertEquals(publicKey, keyPair.getPublic());
   }
 
   private KeyPair generateKeyPairFiles() throws Exception {
+    cleanupOldKeyPair();
     KeyPair keyPair = keyGenerator.generateKey();
-    keyCodec.writePrivateKey(keyPair.getPrivate());
-    keyCodec.writePublicKey(keyPair.getPublic());
+    omKeyCodec.writePrivateKey(keyPair.getPrivate());
+    omKeyCodec.writePublicKey(keyPair.getPublic());
+
+    dnKeyCodec.writePrivateKey(keyPair.getPrivate());
+    dnKeyCodec.writePublicKey(keyPair.getPublic());
     return keyPair;
   }
 
+  private void cleanupOldKeyPair() {
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
+  }
+
   /**
    * Tests: 1. storeCertificate 2. getCertificate 3. verifyCertificate
    */
@@ -130,11 +173,11 @@ public class TestDefaultCertificateClient {
   public void testCertificateOps() throws Exception {
     X509Certificate cert = omCertClient.getCertificate();
     assertNull(cert);
+    omCertClient.storeCertificate(getPEMEncodedString(x509Certificate),
+        true);
 
-    X509Certificate x509Certificate = generateX509Cert(null);
-    omCertClient.storeCertificate(x509Certificate);
-
-    cert = omCertClient.getCertificate();
+    cert = omCertClient.getCertificate(
+        x509Certificate.getSerialNumber().toString());
     assertNotNull(cert);
     assertTrue(cert.getEncoded().length > 0);
     assertEquals(cert, x509Certificate);
@@ -147,12 +190,17 @@ public class TestDefaultCertificateClient {
       keyPair = generateKeyPairFiles();
     }
     return KeyStoreTestUtil.generateCertificate("CN=Test", keyPair, 30,
-        securityConfig.getSignatureAlgo());
+        omSecurityConfig.getSignatureAlgo());
   }
 
   @Test
   public void testSignDataStream() throws Exception {
     String data = RandomStringUtils.random(100, UTF);
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
+
     // Expect error when there is no private key to sign.
     LambdaTestUtils.intercept(IOException.class, "Error while " +
             "signing the stream",
@@ -171,8 +219,8 @@ public class TestDefaultCertificateClient {
   private void validateHash(byte[] hash, byte[] data)
       throws Exception {
     Signature rsaSignature =
-        Signature.getInstance(securityConfig.getSignatureAlgo(),
-            securityConfig.getProvider());
+        Signature.getInstance(omSecurityConfig.getSignatureAlgo(),
+            omSecurityConfig.getProvider());
     rsaSignature.initVerify(omCertClient.getPublicKey());
     rsaSignature.update(data);
     Assert.assertTrue(rsaSignature.verify(hash));
@@ -184,8 +232,6 @@ public class TestDefaultCertificateClient {
   @Test
   public void verifySignatureStream() throws Exception {
     String data = RandomStringUtils.random(500, UTF);
-
-    X509Certificate x509Certificate = generateX509Cert(null);
     byte[] sign = omCertClient.signDataStream(IOUtils.toInputStream(data,
         UTF));
 
@@ -209,7 +255,6 @@ public class TestDefaultCertificateClient {
   @Test
   public void verifySignatureDataArray() throws Exception {
     String data = RandomStringUtils.random(500, UTF);
-    X509Certificate x509Certificate = generateX509Cert(null);
     byte[] sign = omCertClient.signData(data.getBytes());
 
     // Positive tests.
@@ -234,6 +279,67 @@ public class TestDefaultCertificateClient {
   }
 
   @Test
+  public void testCertificateLoadingOnInit() throws Exception {
+    KeyPair keyPair = keyGenerator.generateKey();
+    X509Certificate cert1 = generateX509Cert(keyPair);
+    X509Certificate cert2 = generateX509Cert(keyPair);
+    X509Certificate cert3 = generateX509Cert(keyPair);
+
+    Path certPath = dnSecurityConfig.getCertificateLocation();
+    CertificateCodec codec = new CertificateCodec(dnSecurityConfig);
+
+    // Certificate not found.
+    LambdaTestUtils.intercept(CertificateException.class, "Error while" +
+            " getting certificate",
+        () -> dnCertClient.getCertificate(cert1.getSerialNumber()
+            .toString()));
+    LambdaTestUtils.intercept(CertificateException.class, "Error while" +
+            " getting certificate",
+        () -> dnCertClient.getCertificate(cert2.getSerialNumber()
+            .toString()));
+    LambdaTestUtils.intercept(CertificateException.class, "Error while" +
+            " getting certificate",
+        () -> dnCertClient.getCertificate(cert3.getSerialNumber()
+            .toString()));
+    codec.writeCertificate(certPath, "1.crt",
+        getPEMEncodedString(cert1), true);
+    codec.writeCertificate(certPath, "2.crt",
+        getPEMEncodedString(cert2), true);
+    codec.writeCertificate(certPath, "3.crt",
+        getPEMEncodedString(cert3), true);
+
+    // Re instentiate DN client which will load certificates from filesystem.
+    dnCertClient = new DNCertificateClient(dnSecurityConfig, certSerialId);
+
+    assertNotNull(dnCertClient.getCertificate(cert1.getSerialNumber()
+        .toString()));
+    assertNotNull(dnCertClient.getCertificate(cert2.getSerialNumber()
+        .toString()));
+    assertNotNull(dnCertClient.getCertificate(cert3.getSerialNumber()
+        .toString()));
+
+  }
+
+  @Test
+  public void testStoreCertificate() throws Exception {
+    KeyPair keyPair = keyGenerator.generateKey();
+    X509Certificate cert1 = generateX509Cert(keyPair);
+    X509Certificate cert2 = generateX509Cert(keyPair);
+    X509Certificate cert3 = generateX509Cert(keyPair);
+
+    dnCertClient.storeCertificate(getPEMEncodedString(cert1), true);
+    dnCertClient.storeCertificate(getPEMEncodedString(cert2), true);
+    dnCertClient.storeCertificate(getPEMEncodedString(cert3), true);
+
+    assertNotNull(dnCertClient.getCertificate(cert1.getSerialNumber()
+        .toString()));
+    assertNotNull(dnCertClient.getCertificate(cert2.getSerialNumber()
+        .toString()));
+    assertNotNull(dnCertClient.getCertificate(cert3.getSerialNumber()
+        .toString()));
+  }
+
+  @Test
   public void testInitCertAndKeypairValidationFailures() throws Exception {
 
     GenericTestUtils.LogCapturer dnClientLog = GenericTestUtils.LogCapturer
@@ -246,13 +352,23 @@ public class TestDefaultCertificateClient {
     omClientLog.clearOutput();
 
     // Case 1. Expect failure when keypair validation fails.
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
-        .toString(), securityConfig.getPrivateKeyFileName()).toFile());
-    keyCodec.writePrivateKey(keyPair.getPrivate());
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
+
+
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPrivateKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
+
+    omKeyCodec.writePrivateKey(keyPair.getPrivate());
+    omKeyCodec.writePublicKey(keyPair2.getPublic());
+
+    dnKeyCodec.writePrivateKey(keyPair.getPrivate());
+    dnKeyCodec.writePublicKey(keyPair2.getPublic());
 
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
-        .toString(), securityConfig.getPublicKeyFileName()).toFile());
-    keyCodec.writePublicKey(keyPair2.getPublic());
 
     // Check for DN.
     assertEquals(dnCertClient.init(), FAILURE);
@@ -271,15 +387,18 @@ public class TestDefaultCertificateClient {
     // Case 2. Expect failure when certificate is generated from different
     // private key and keypair validation fails.
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
-        .toString(), securityConfig.getCertificateFileName()).toFile());
-    X509Certificate x509Certificate = KeyStoreTestUtil.generateCertificate(
-        "CN=Test", keyGenerator.generateKey(), 10,
-        securityConfig.getSignatureAlgo());
-    CertificateCodec codec = new CertificateCodec(securityConfig);
-    codec.writeCertificate(new X509CertificateHolder(
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getCertificateFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getCertificateFileName()).toFile());
+
+    CertificateCodec omCertCodec = new CertificateCodec(omSecurityConfig);
+    omCertCodec.writeCertificate(new X509CertificateHolder(
         x509Certificate.getEncoded()));
 
+    CertificateCodec dnCertCodec = new CertificateCodec(dnSecurityConfig);
+    dnCertCodec.writeCertificate(new X509CertificateHolder(
+        x509Certificate.getEncoded()));
     // Check for DN.
     assertEquals(dnCertClient.init(), FAILURE);
     assertTrue(dnClientLog.getOutput().contains("Keypair validation " +
@@ -297,10 +416,13 @@ public class TestDefaultCertificateClient {
     // private key and certificate validation fails.
 
     // Re write the correct public key.
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
-        .toString(), securityConfig.getPublicKeyFileName()).toFile());
-    keyCodec.writePublicKey(keyPair.getPublic());
+    omKeyCodec.writePublicKey(keyPair.getPublic());
+    dnKeyCodec.writePublicKey(keyPair.getPublic());
 
     // Check for DN.
     assertEquals(dnCertClient.init(), FAILURE);
@@ -318,8 +440,10 @@ public class TestDefaultCertificateClient {
 
     // Case 4. Failure when public key recovery fails.
     getCertClient();
-    FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
-        .toString(), securityConfig.getPublicKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(omSecurityConfig.getKeyLocation()
+        .toString(), omSecurityConfig.getPublicKeyFileName()).toFile());
+    FileUtils.deleteQuietly(Paths.get(dnSecurityConfig.getKeyLocation()
+        .toString(), dnSecurityConfig.getPublicKeyFileName()).toFile());
 
     // Check for DN.
     assertEquals(dnCertClient.init(), FAILURE);
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
index 260f348..dbff549 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/HddsDatanodeService.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
-import org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec;
 import org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -53,10 +52,10 @@ import java.io.IOException;
 import java.net.InetAddress;
 import java.security.KeyPair;
 import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
 import java.util.List;
 import java.util.UUID;
 
+import static org.apache.hadoop.hdds.security.x509.certificate.utils.CertificateCodec.getX509Certificate;
 import static org.apache.hadoop.hdds.security.x509.certificates.utils.CertificateSignRequest.getEncodedString;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
@@ -179,7 +178,8 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
         if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
           component = "dn-" + datanodeDetails.getUuidString();
 
-          dnCertClient = new DNCertificateClient(new SecurityConfig(conf));
+          dnCertClient = new DNCertificateClient(new SecurityConfig(conf),
+              datanodeDetails.getCertSerialId());
 
           if (SecurityUtil.getAuthenticationMethod(conf).equals(
               UserGroupInformation.AuthenticationMethod.KERBEROS)) {
@@ -199,7 +199,11 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
           }
           LOG.info("Hdds Datanode login successful.");
         }
-        datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf);
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          initializeCertificateClient(conf);
+        }
+        datanodeStateMachine = new DatanodeStateMachine(datanodeDetails, conf,
+            dnCertClient);
         try {
           httpServer = new HddsDatanodeHttpServer(conf);
           httpServer.start();
@@ -209,9 +213,6 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
         startPlugins();
         // Starting HDDS Daemons
         datanodeStateMachine.startDaemon();
-        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-          initializeCertificateClient(conf);
-        }
       } catch (IOException e) {
         throw new RuntimeException("Can't start the HDDS datanode plugin", e);
       } catch (AuthenticationException ex) {
@@ -268,10 +269,10 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
 
       String pemEncodedCert = secureScmClient.getDataNodeCertificate(
           datanodeDetails.getProtoBufMessage(), getEncodedString(csr));
-
-      X509Certificate x509Certificate =
-          CertificateCodec.getX509Certificate(pemEncodedCert);
-      dnCertClient.storeCertificate(x509Certificate);
+      dnCertClient.storeCertificate(pemEncodedCert, true);
+      datanodeDetails.setCertSerialId(getX509Certificate(pemEncodedCert).
+          getSerialNumber().toString());
+      persistDatanodeDetails(datanodeDetails);
     } catch (IOException | CertificateException e) {
       LOG.error("Error while storing SCM signed certificate.", e);
       throw new RuntimeException(e);
@@ -332,6 +333,29 @@ public class HddsDatanodeService extends GenericCli implements ServicePlugin {
   }
 
   /**
+   * Persist DatanodeDetails to file system.
+   * @param dnDetails
+   *
+   * @return DatanodeDetails
+   */
+  private void persistDatanodeDetails(DatanodeDetails dnDetails)
+      throws IOException {
+    String idFilePath = HddsUtils.getDatanodeIdFilePath(conf);
+    if (idFilePath == null || idFilePath.isEmpty()) {
+      LOG.error("A valid file path is needed for config setting {}",
+          ScmConfigKeys.OZONE_SCM_DATANODE_ID);
+      throw new IllegalArgumentException(ScmConfigKeys.OZONE_SCM_DATANODE_ID +
+          " must be defined. See" +
+          " https://wiki.apache.org/hadoop/Ozone#Configuration" +
+          " for details on configuring Ozone.");
+    }
+
+    Preconditions.checkNotNull(idFilePath);
+    File idFile = new File(idFilePath);
+    ContainerUtils.writeDatanodeDetailsTo(dnDetails, idFile);
+  }
+
+  /**
    * Starts all the service plugins which are configured using
    * OzoneConfigKeys.HDDS_DATANODE_PLUGINS_KEY.
    */
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index 7f5233f..a4ea31e 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
 import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos.NodeReportProto;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.container.common.report.ReportManager;
 import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
     .CloseContainerCommandHandler;
@@ -82,15 +83,17 @@ public class DatanodeStateMachine implements Closeable {
   private final ReplicationSupervisor supervisor;
 
   private JvmPauseMonitor jvmPauseMonitor;
+  private CertificateClient dnCertClient;
 
   /**
    * Constructs a a datanode state machine.
-   *
-   * @param datanodeDetails - DatanodeDetails used to identify a datanode
+   *  @param datanodeDetails - DatanodeDetails used to identify a datanode
    * @param conf - Configuration.
+   * @param certClient - Datanode Certificate client, required if security is
+   *                     enabled
    */
   public DatanodeStateMachine(DatanodeDetails datanodeDetails,
-      Configuration conf) throws IOException {
+      Configuration conf, CertificateClient certClient) throws IOException {
     this.conf = conf;
     this.datanodeDetails = datanodeDetails;
     executorService = HadoopExecutors.newCachedThreadPool(
@@ -99,7 +102,8 @@ public class DatanodeStateMachine implements Closeable {
     connectionManager = new SCMConnectionManager(conf);
     context = new StateContext(this.conf, DatanodeStates.getInitState(), this);
     container = new OzoneContainer(this.datanodeDetails,
-        new OzoneConfiguration(conf), context);
+        new OzoneConfiguration(conf), context, certClient);
+    dnCertClient = certClient;
     nextHB = new AtomicLong(Time.monotonicNow());
 
     ContainerReplicator replicator =
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
index ea9f5cd..c6b0d92 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServer.java
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.hdds.security.exception.SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN;
+
 /**
  * A server endpoint that acts as the communication layer for Ozone containers.
  */
@@ -39,10 +41,12 @@ public abstract class XceiverServer implements XceiverServerSpi {
 
   private final SecurityConfig secConfig;
   private final TokenVerifier tokenVerifier;
+  private final CertificateClient caClient;
 
-  public XceiverServer(Configuration conf) {
+  public XceiverServer(Configuration conf, CertificateClient client) {
     Preconditions.checkNotNull(conf);
     this.secConfig = new SecurityConfig(conf);
+    this.caClient = client;
     tokenVerifier = new BlockTokenVerifier(secConfig, getCaClient());
   }
 
@@ -59,17 +63,15 @@ public abstract class XceiverServer implements XceiverServerSpi {
       String encodedToken = request.getEncodedToken();
       if (encodedToken == null) {
         throw new SCMSecurityException("Security is enabled but client " +
-            "request is missing block token.",
-            SCMSecurityException.ErrorCode.MISSING_BLOCK_TOKEN);
+            "request is missing block token.", MISSING_BLOCK_TOKEN);
       }
-      tokenVerifier.verify(encodedToken, "");
+      tokenVerifier.verify(encodedToken, encodedToken);
     }
   }
 
   @VisibleForTesting
   protected CertificateClient getCaClient() {
-    // TODO: instantiate CertificateClient
-    return null;
+    return caClient;
   }
 
   protected SecurityConfig getSecurityConfig() {
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
index 74ab722..28addfd 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.scm.container.common.helpers.
     StorageContainerException;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.tracing.GrpcServerInterceptor;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
@@ -75,8 +76,9 @@ public final class XceiverServerGrpc extends XceiverServer {
    * @param conf - Configuration
    */
   public XceiverServerGrpc(DatanodeDetails datanodeDetails, Configuration conf,
-      ContainerDispatcher dispatcher, BindableService... additionalServices) {
-    super(conf);
+      ContainerDispatcher dispatcher, CertificateClient caClient,
+      BindableService... additionalServices) {
+    super(conf, caClient);
     Preconditions.checkNotNull(conf);
 
     this.id = datanodeDetails.getUuid();
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
index 19e43b9..d0a56f9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto
 import org.apache.hadoop.hdds.scm.HddsServerUtil;
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.OzoneConsts;
@@ -113,9 +114,9 @@ public final class XceiverServerRatis extends XceiverServer {
 
   private XceiverServerRatis(DatanodeDetails dd, int port,
       ContainerDispatcher dispatcher, Configuration conf, StateContext
-      context, GrpcTlsConfig tlsConfig)
+      context, GrpcTlsConfig tlsConfig, CertificateClient caClient)
       throws IOException {
-    super(conf);
+    super(conf, caClient);
     Objects.requireNonNull(dd, "id == null");
     this.port = port;
     RaftProperties serverProperties = newRaftProperties(conf);
@@ -380,7 +381,8 @@ public final class XceiverServerRatis extends XceiverServer {
 
   public static XceiverServerRatis newXceiverServerRatis(
       DatanodeDetails datanodeDetails, Configuration ozoneConf,
-      ContainerDispatcher dispatcher, StateContext context) throws IOException {
+      ContainerDispatcher dispatcher, StateContext context,
+      CertificateClient caClient) throws IOException {
     int localPort = ozoneConf.getInt(
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT,
         OzoneConfigKeys.DFS_CONTAINER_RATIS_IPC_PORT_DEFAULT);
@@ -406,7 +408,7 @@ public final class XceiverServerRatis extends XceiverServer {
     datanodeDetails.setPort(
         DatanodeDetails.newPort(DatanodeDetails.Port.Name.RATIS, localPort));
     return new XceiverServerRatis(datanodeDetails, localPort,
-        dispatcher, ozoneConf, context, tlsConfig);
+        dispatcher, ozoneConf, context, tlsConfig, caClient);
   }
 
   @Override
diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
index 3bc060a..87266a9 100644
--- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
+++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hdds.protocol.proto
     .StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto
         .StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@@ -76,11 +77,13 @@ public class OzoneContainer {
    * Construct OzoneContainer object.
    * @param datanodeDetails
    * @param conf
+   * @param certClient
    * @throws DiskOutOfSpaceException
    * @throws IOException
    */
   public OzoneContainer(DatanodeDetails datanodeDetails, OzoneConfiguration
-      conf, StateContext context) throws IOException {
+      conf, StateContext context, CertificateClient certClient)
+      throws IOException {
     this.config = conf;
     this.volumeSet = new VolumeSet(datanodeDetails.getUuidString(), conf);
     this.containerSet = new ContainerSet();
@@ -104,9 +107,10 @@ public class OzoneContainer {
      */
     this.controller = new ContainerController(containerSet, handlers);
     this.writeChannel = XceiverServerRatis.newXceiverServerRatis(
-        datanodeDetails, config, hddsDispatcher, context);
+        datanodeDetails, config, hddsDispatcher, context, certClient);
     this.readChannel = new XceiverServerGrpc(
-        datanodeDetails, config, hddsDispatcher, createReplicationService());
+        datanodeDetails, config, hddsDispatcher, certClient,
+        createReplicationService());
 
   }
 
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
index c6f65c7..2897abc 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/TestHddsSecureDatanodeInit.java
@@ -73,6 +73,7 @@ public class TestHddsSecureDatanodeInit {
     conf = new OzoneConfiguration();
     conf.setBoolean(OzoneConfigKeys.OZONE_ENABLED, true);
     conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getPath());
+    //conf.set(ScmConfigKeys.OZONE_SCM_NAMES, "localhost");
     String volumeDir = testDir + "/disk1";
     conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, volumeDir);
 
@@ -113,8 +114,7 @@ public class TestHddsSecureDatanodeInit {
 
   @Before
   public void setUpDNCertClient(){
-    client = new DNCertificateClient(securityConfig);
-    service.setCertificateClient(client);
+
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPrivateKeyFileName()).toFile());
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
@@ -123,7 +123,9 @@ public class TestHddsSecureDatanodeInit {
         .getCertificateLocation().toString(),
         securityConfig.getCertificateFileName()).toFile());
     dnLogs.clearOutput();
-
+    client = new DNCertificateClient(securityConfig,
+        certHolder.getSerialNumber().toString());
+    service.setCertificateClient(client);
   }
 
   @Test
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
index 8b84b8e..8b93936 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/TestDatanodeStateMachine.java
@@ -161,7 +161,7 @@ public class TestDatanodeStateMachine {
   public void testStartStopDatanodeStateMachine() throws IOException,
       InterruptedException, TimeoutException {
     try (DatanodeStateMachine stateMachine =
-        new DatanodeStateMachine(getNewDatanodeDetails(), conf)) {
+        new DatanodeStateMachine(getNewDatanodeDetails(), conf, null)) {
       stateMachine.startDaemon();
       SCMConnectionManager connectionManager =
           stateMachine.getConnectionManager();
@@ -219,7 +219,7 @@ public class TestDatanodeStateMachine {
     ContainerUtils.writeDatanodeDetailsTo(datanodeDetails, idPath);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -325,7 +325,7 @@ public class TestDatanodeStateMachine {
     datanodeDetails.setPort(port);
 
     try (DatanodeStateMachine stateMachine =
-             new DatanodeStateMachine(datanodeDetails, conf)) {
+             new DatanodeStateMachine(datanodeDetails, conf, null)) {
       DatanodeStateMachine.DatanodeStates currentState =
           stateMachine.getContext().getState();
       Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
@@ -388,7 +388,7 @@ public class TestDatanodeStateMachine {
       perTestConf.setStrings(entry.getKey(), entry.getValue());
       LOG.info("Test with {} = {}", entry.getKey(), entry.getValue());
       try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-          getNewDatanodeDetails(), perTestConf)) {
+          getNewDatanodeDetails(), perTestConf, null)) {
         DatanodeStateMachine.DatanodeStates currentState =
             stateMachine.getContext().getState();
         Assert.assertEquals(DatanodeStateMachine.DatanodeStates.INIT,
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 16e0e9d..731e74c 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -276,7 +276,7 @@ public class TestCloseContainerCommandHandler {
         .thenReturn(datanodeDetails);
     Mockito.when(context.getParent()).thenReturn(datanodeStateMachine);
     final OzoneContainer ozoneContainer = new  OzoneContainer(
-        datanodeDetails, conf, context);
+        datanodeDetails, conf, context, null);
     ozoneContainer.getDispatcher().setScmId(UUID.randomUUID().toString());
     return ozoneContainer;
   }
diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 388d0c1..003f26e 100644
--- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -98,7 +98,7 @@ public class TestOzoneContainer {
     // When OzoneContainer is started, the containers from disk should be
     // loaded into the containerSet.
     OzoneContainer ozoneContainer = new
-        OzoneContainer(datanodeDetails, conf, context);
+        OzoneContainer(datanodeDetails, conf, context, null);
     ContainerSet containerset = ozoneContainer.getContainerSet();
     assertEquals(10, containerset.containerCount());
   }
diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
index 600ee5a..bceec92 100644
--- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
+++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java
@@ -156,7 +156,7 @@ public class TestEndPoint {
         serverAddress, 1000)) {
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails));
+          datanodeDetails, conf, getContext(datanodeDetails), null);
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
@@ -181,7 +181,7 @@ public class TestEndPoint {
           .captureLogs(VersionEndpointTask.LOG);
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails));
+          datanodeDetails, conf, getContext(datanodeDetails), null);
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
@@ -235,7 +235,7 @@ public class TestEndPoint {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails));
+          datanodeDetails, conf, getContext(datanodeDetails), null);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
       EndpointStateMachine.EndPointStates newState = versionTask.call();
@@ -263,7 +263,7 @@ public class TestEndPoint {
       rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
       DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
       OzoneContainer ozoneContainer = new OzoneContainer(
-          datanodeDetails, conf, getContext(datanodeDetails));
+          datanodeDetails, conf, getContext(datanodeDetails), null);
       VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
           conf, ozoneContainer);
 
@@ -483,7 +483,7 @@ public class TestEndPoint {
 
     // Create a datanode state machine for stateConext used by endpoint task
     try (DatanodeStateMachine stateMachine = new DatanodeStateMachine(
-        TestUtils.randomDatanodeDetails(), conf);
+        TestUtils.randomDatanodeDetails(), conf, null);
          EndpointStateMachine rpcEndPoint =
             createEndpoint(conf, scmAddress, rpcTimeout)) {
       HddsProtos.DatanodeDetailsProto datanodeDetailsProto =
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
index ec230cd..9654419 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMMetadataManager.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.Table;
 
@@ -246,6 +247,13 @@ public interface OMMetadataManager {
   Table<String, OmKeyInfo> getOpenKeyTable();
 
   /**
+   * Gets the DelegationTokenTable.
+   *
+   * @return Table.
+   */
+  Table<OzoneTokenIdentifier, Long> getDelegationTokenTable();
+
+  /**
    * Gets the S3Bucket to Ozone Volume/bucket mapping table.
    *
    * @return Table.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
index 0d50134..6febcaf 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/S3SecretManagerImpl.java
@@ -40,7 +40,6 @@ import static org.apache.hadoop.ozone.security.OzoneSecurityException.ResultCode
 public class S3SecretManagerImpl implements S3SecretManager {
   private static final Logger LOG =
       LoggerFactory.getLogger(S3SecretManagerImpl.class);
-
   /**
    * OMMetadataManager is used for accessing OM MetadataDB and ReadWriteLock.
    */
@@ -110,4 +109,8 @@ public class S3SecretManagerImpl implements S3SecretManager {
     return OzoneManagerProtocolProtos.S3Secret.parseFrom(s3Secret)
         .getAwsSecret();
   }
+
+  public OMMetadataManager getOmMetadataManager() {
+    return omMetadataManager;
+  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/TokenIdentifierCodec.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/TokenIdentifierCodec.java
new file mode 100644
index 0000000..53f3a86
--- /dev/null
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/codec/TokenIdentifierCodec.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.om.codec;
+
+import com.google.common.base.Preconditions;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
+import org.apache.hadoop.utils.db.Codec;
+
+import java.io.IOException;
+
+/**
+ * Codec to encode TokenIdentifierCodec as byte array.
+ */
+public class TokenIdentifierCodec implements Codec<OzoneTokenIdentifier> {
+
+  @Override
+  public byte[] toPersistedFormat(OzoneTokenIdentifier object) {
+    Preconditions
+        .checkNotNull(object, "Null object can't be converted to byte array.");
+    return object.getBytes();
+  }
+
+  @Override
+  public OzoneTokenIdentifier fromPersistedFormat(byte[] rawData)
+      throws IOException {
+    Preconditions.checkNotNull(rawData,
+        "Null byte array can't converted to real object.");
+    try {
+      return OzoneTokenIdentifier.readProtoBuf(rawData);
+    } catch (InvalidProtocolBufferException e) {
+      throw new IllegalArgumentException(
+          "Can't encode the the raw data from the byte array", e);
+    }
+  }
+
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java
index fc40117..b3f607a 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneBlockTokenSecretManager.java
@@ -32,8 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.Map;
+
 /**
  * SecretManager for Ozone Master block tokens.
  */
@@ -172,7 +171,6 @@ public class OzoneBlockTokenSecretManager extends
   @Override
   public synchronized void start(CertificateClient client) throws IOException {
     super.start(client);
-    removeExpiredKeys();
   }
 
   /**
@@ -191,17 +189,4 @@ public class OzoneBlockTokenSecretManager extends
   public synchronized void stop() throws IOException {
     super.stop();
   }
-
-  private synchronized void removeExpiredKeys() {
-    // TODO: handle roll private key/certificate
-    long now = Time.now();
-    for (Iterator<Map.Entry<Integer, OzoneSecretKey>> it = allKeys.entrySet()
-        .iterator(); it.hasNext();) {
-      Map.Entry<Integer, OzoneSecretKey> e = it.next();
-      OzoneSecretKey key = e.getValue();
-      if (key.getExpiryDate() < now) {
-        it.remove();
-      }
-    }
-  }
 }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
index e38d9b7..ba84973 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneDelegationTokenSecretManager.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient
 import org.apache.hadoop.hdds.security.x509.exceptions.CertificateException;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.ozone.om.S3SecretManager;
+import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.security.OzoneSecretStore.OzoneManagerSecretState;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier.TokenInfo;
@@ -39,7 +40,6 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
-import java.security.PrivateKey;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -61,7 +61,7 @@ public class OzoneDelegationTokenSecretManager
       .getLogger(OzoneDelegationTokenSecretManager.class);
   private final Map<OzoneTokenIdentifier, TokenInfo> currentTokens;
   private final OzoneSecretStore store;
-  private final S3SecretManager s3SecretManager;
+  private final S3SecretManagerImpl s3SecretManager;
   private Thread tokenRemoverThread;
   private final long tokenRemoverScanInterval;
   private String omCertificateSerialId;
@@ -90,8 +90,9 @@ public class OzoneDelegationTokenSecretManager
         service, LOG);
     currentTokens = new ConcurrentHashMap();
     this.tokenRemoverScanInterval = dtRemoverScanInterval;
-    this.store = new OzoneSecretStore(conf);
-    this.s3SecretManager = s3SecretManager;
+    this.s3SecretManager = (S3SecretManagerImpl) s3SecretManager;
+    this.store = new OzoneSecretStore(conf,
+        this.s3SecretManager.getOmMetadataManager());
     loadTokenSecretState(store.loadState());
   }
 
@@ -129,12 +130,11 @@ public class OzoneDelegationTokenSecretManager
 
     byte[] password = createPassword(identifier.getBytes(),
         getCurrentKey().getPrivateKey());
-    addToTokenStore(identifier, password);
+    long expiryTime = identifier.getIssueDate() + getTokenRenewInterval();
+    addToTokenStore(identifier, password, expiryTime);
     Token<OzoneTokenIdentifier> token = new Token<>(identifier.getBytes(),
-        password,
-        identifier.getKind(), getService());
+        password, identifier.getKind(), getService());
     if (LOG.isTraceEnabled()) {
-      long expiryTime = identifier.getIssueDate() + getTokenRenewInterval();
       String tokenId = identifier.toStringStable();
       LOG.trace("Issued delegation token -> expiryTime:{},tokenId:{}",
           expiryTime, tokenId);
@@ -149,10 +149,11 @@ public class OzoneDelegationTokenSecretManager
    * @param password
    * @throws IOException
    */
-  private void addToTokenStore(OzoneTokenIdentifier identifier, byte[] password)
+  private void addToTokenStore(OzoneTokenIdentifier identifier,
+      byte[] password, long renewTime)
       throws IOException {
-    TokenInfo tokenInfo = new TokenInfo(identifier.getIssueDate()
-        + getTokenRenewInterval(), password, identifier.getTrackingId());
+    TokenInfo tokenInfo = new TokenInfo(renewTime, password,
+        identifier.getTrackingId());
     currentTokens.put(identifier, tokenInfo);
     store.storeToken(identifier, tokenInfo.getRenewDate());
   }
@@ -222,20 +223,10 @@ public class OzoneDelegationTokenSecretManager
           + " tries to renew a token " + formatTokenId(id)
           + " with non-matching renewer " + id.getRenewer());
     }
-    OzoneSecretKey key = allKeys.get(id.getMasterKeyId());
-    if (key == null) {
-      throw new InvalidToken("Unable to find master key for keyId="
-          + id.getMasterKeyId()
-          + " from cache. Failed to renew an unexpired token "
-          + formatTokenId(id) + " with sequenceNumber="
-          + id.getSequenceNumber());
-    }
-    byte[] password = createPassword(token.getIdentifier(),
-        key.getPrivateKey());
 
     long renewTime = Math.min(id.getMaxDate(), now + getTokenRenewInterval());
     try {
-      addToTokenStore(id, password);
+      addToTokenStore(id, token.getPassword(),  renewTime);
     } catch (IOException e) {
       LOG.error("Unable to update token " + id.getSequenceNumber(), e);
     }
@@ -323,14 +314,8 @@ public class OzoneDelegationTokenSecretManager
   public boolean verifySignature(OzoneTokenIdentifier identifier,
       byte[] password) {
     try {
-      if (identifier.getOmCertSerialId().equals(getOmCertificateSerialId())) {
-        return getCertClient().verifySignature(identifier.getBytes(), password,
-            getCertClient().getCertificate());
-      } else {
-        // TODO: This delegation token was issued by other OM instance. Fetch
-        // certificate from SCM using certificate serial.
-        return false;
-      }
+      return getCertClient().verifySignature(identifier.getBytes(), password,
+          getCertClient().getCertificate(identifier.getOmCertSerialId()));
     } catch (CertificateException e) {
       return false;
     }
@@ -367,57 +352,25 @@ public class OzoneDelegationTokenSecretManager
 
   }
 
-  // TODO: handle roll private key/certificate
-  private synchronized void removeExpiredKeys() {
-    long now = Time.now();
-    for (Iterator<Map.Entry<Integer, OzoneSecretKey>> it = allKeys.entrySet()
-        .iterator(); it.hasNext();) {
-      Map.Entry<Integer, OzoneSecretKey> e = it.next();
-      OzoneSecretKey key = e.getValue();
-      if (key.getExpiryDate() < now && key.getExpiryDate() != -1) {
-        if (!key.equals(getCurrentKey())) {
-          it.remove();
-          try {
-            store.removeTokenMasterKey(key);
-          } catch (IOException ex) {
-            LOG.error("Unable to remove master key " + key.getKeyId(), ex);
-          }
-        }
-      }
-    }
-  }
-
   private void loadTokenSecretState(
       OzoneManagerSecretState<OzoneTokenIdentifier> state) throws IOException {
     LOG.info("Loading token state into token manager.");
-    for (OzoneSecretKey key : state.ozoneManagerSecretState()) {
-      allKeys.putIfAbsent(key.getKeyId(), key);
-      incrementCurrentKeyId();
-    }
     for (Map.Entry<OzoneTokenIdentifier, Long> entry :
         state.getTokenState().entrySet()) {
       addPersistedDelegationToken(entry.getKey(), entry.getValue());
     }
   }
 
-  private void addPersistedDelegationToken(
-      OzoneTokenIdentifier identifier, long renewDate)
-      throws IOException {
+  private void addPersistedDelegationToken(OzoneTokenIdentifier identifier,
+      long renewDate) throws IOException {
     if (isRunning()) {
       // a safety check
       throw new IOException(
           "Can't add persisted delegation token to a running SecretManager.");
     }
-    int keyId = identifier.getMasterKeyId();
-    OzoneSecretKey dKey = allKeys.get(keyId);
-    if (dKey == null) {
-      LOG.warn("No KEY found for persisted identifier "
-          + formatTokenId(identifier));
-      return;
-    }
 
-    PrivateKey privateKey = dKey.getPrivateKey();
-    byte[] password = createPassword(identifier.getBytes(), privateKey);
+    byte[] password = createPassword(identifier.getBytes(),
+        getCertClient().getPrivateKey());
     if (identifier.getSequenceNumber() > getDelegationTokenSeqNum()) {
       setDelegationTokenSeqNum(identifier.getSequenceNumber());
     }
@@ -437,19 +390,10 @@ public class OzoneDelegationTokenSecretManager
   public synchronized void start(CertificateClient certClient)
       throws IOException {
     super.start(certClient);
-    storeKey(getCurrentKey());
-    removeExpiredKeys();
     tokenRemoverThread = new Daemon(new ExpiredTokenRemover());
     tokenRemoverThread.start();
   }
 
-  private void storeKey(OzoneSecretKey key) throws IOException {
-    store.storeTokenMasterKey(key);
-    if (!allKeys.containsKey(key.getKeyId())) {
-      allKeys.put(key.getKeyId(), key);
-    }
-  }
-
   public void stopThreads() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("Stopping expired delegation token remover thread");
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java
index 194f526..45d6e66 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretManager.java
@@ -36,8 +36,6 @@ import java.security.NoSuchAlgorithmException;
 import java.security.PrivateKey;
 import java.security.Signature;
 import java.security.SignatureException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 /**
@@ -62,8 +60,6 @@ public abstract class OzoneSecretManager<T extends TokenIdentifier>
   private OzoneSecretKey currentKey;
   private AtomicInteger currentKeyId;
   private AtomicInteger tokenSequenceNumber;
-  @SuppressWarnings("visibilitymodifier")
-  protected final Map<Integer, OzoneSecretKey> allKeys;
 
   /**
    * Create a secret manager.
@@ -82,7 +78,6 @@ public abstract class OzoneSecretManager<T extends TokenIdentifier>
     this.tokenRenewInterval = tokenRenewInterval;
     currentKeyId = new AtomicInteger();
     tokenSequenceNumber = new AtomicInteger();
-    allKeys = new ConcurrentHashMap<>();
     this.service = service;
     this.logger = logger;
   }
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java
index 0b748df..e81a69f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/security/OzoneSecretStore.java
@@ -17,31 +17,16 @@
 package org.apache.hadoop.ozone.security;
 
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.utils.db.Table.KeyValue;
+import org.apache.hadoop.utils.db.TableIterator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
+
 import java.io.Closeable;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.File;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
-import java.util.Set;
-
-import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
-import static org.apache.hadoop.ozone.OzoneConsts.OZONE_MANAGER_TOKEN_DB_NAME;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_DEFAULT;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_DB_CACHE_SIZE_MB;
 
 /**
  * SecretStore for Ozone Master.
@@ -50,13 +35,15 @@ public class OzoneSecretStore implements Closeable {
 
   private static final Logger LOG = LoggerFactory
       .getLogger(OzoneSecretStore.class);
-  private static final String TOKEN_MASTER_KEY_KEY_PREFIX = "tokens/key_";
-  private static final String TOKEN_STATE_KEY_PREFIX = "tokens/token_";
-
+  private OMMetadataManager omMetadataManager;
   @Override
   public void close() throws IOException {
-    if (store != null) {
-      store.close();
+    if (omMetadataManager != null) {
+      try {
+        omMetadataManager.getDelegationTokenTable().close();
+      } catch (Exception e) {
+        throw new IOException("Error while closing OzoneSecretStore.", e);
+      }
     }
   }
 
@@ -65,185 +52,64 @@ public class OzoneSecretStore implements Closeable {
    * Support class to maintain state of OzoneSecretStore.
    */
   public static class OzoneManagerSecretState<T> {
-
     private Map<T, Long> tokenState = new HashMap<>();
-    private Set<OzoneSecretKey> tokenMasterKeyState = new HashSet<>();
-
     public Map<T, Long> getTokenState() {
       return tokenState;
     }
-
-    public Set<OzoneSecretKey> ozoneManagerSecretState() {
-      return tokenMasterKeyState;
-    }
   }
 
-  private MetadataStore store;
-
-  public OzoneSecretStore(OzoneConfiguration conf)
-      throws IOException {
-    File metaDir = getOzoneMetaDirPath(conf);
-    final int cacheSize = conf.getInt(OZONE_OM_DB_CACHE_SIZE_MB,
-        OZONE_OM_DB_CACHE_SIZE_DEFAULT);
-    File omTokenDBFile = new File(metaDir.getPath(),
-        OZONE_MANAGER_TOKEN_DB_NAME);
-    this.store = MetadataStoreBuilder.newBuilder()
-        .setConf(conf)
-        .setDbFile(omTokenDBFile)
-        .setCacheSize(cacheSize * OzoneConsts.MB)
-        .build();
+  public OzoneSecretStore(OzoneConfiguration conf,
+      OMMetadataManager omMetadataManager) {
+    this.omMetadataManager = omMetadataManager;
   }
 
   public OzoneManagerSecretState loadState() throws IOException {
-    OzoneManagerSecretState state = new OzoneManagerSecretState();
-    int numKeys = loadMasterKeys(state);
-    LOG.info("Loaded " + numKeys + " token master keys");
+    OzoneManagerSecretState<Integer> state = new OzoneManagerSecretState();
     int numTokens = loadTokens(state);
     LOG.info("Loaded " + numTokens + " tokens");
     return state;
   }
 
-  public void storeTokenMasterKey(OzoneSecretKey key) throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing master key " + key.getKeyId());
-    }
-    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
-    DataOutputStream dataStream = new DataOutputStream(memStream);
-    try {
-      key.write(dataStream);
-      dataStream.close();
-      dataStream = null;
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, dataStream);
-    }
-    try {
-      byte[] dbKey = getMasterKeyDBKey(key);
-      store.put(dbKey, memStream.toByteArray());
-    } catch (IOException e) {
-      LOG.error("Unable to store master key " + key.getKeyId(), e);
-      throw e;
-    }
-  }
-
-
-  public void removeTokenMasterKey(OzoneSecretKey key)
-      throws IOException {
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Removing master key " + key.getKeyId());
-    }
-
-    byte[] dbKey = getMasterKeyDBKey(key);
-    try {
-      store.delete(dbKey);
-    } catch (IOException e) {
-      LOG.error("Unable to delete master key " + key.getKeyId(), e);
-      throw e;
-    }
-  }
-
-  public void storeToken(OzoneTokenIdentifier tokenId, Long renewDate)
+  public void storeToken(OzoneTokenIdentifier tokenId, long renewDate)
       throws IOException {
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Storing token " + tokenId.getSequenceNumber());
+      LOG.debug("Storing token {}", tokenId.getSequenceNumber());
     }
 
-    ByteArrayOutputStream memStream = new ByteArrayOutputStream();
-    DataOutputStream dataStream = new DataOutputStream(memStream);
     try {
-      tokenId.write(dataStream);
-      dataStream.writeLong(renewDate);
-      dataStream.close();
-      dataStream = null;
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, dataStream);
-    }
-
-    byte[] dbKey = getTokenDBKey(tokenId);
-    try {
-      store.put(dbKey, memStream.toByteArray());
+      omMetadataManager.getDelegationTokenTable().put(tokenId, renewDate);
     } catch (IOException e) {
       LOG.error("Unable to store token " + tokenId.toString(), e);
       throw e;
     }
   }
 
-  public void updateToken(OzoneTokenIdentifier tokenId, Long renewDate)
+  public void updateToken(OzoneTokenIdentifier tokenId, long renewDate)
       throws IOException {
     storeToken(tokenId, renewDate);
   }
 
-  public void removeToken(OzoneTokenIdentifier tokenId)
-      throws IOException {
-    byte[] dbKey = getTokenDBKey(tokenId);
+  public void removeToken(OzoneTokenIdentifier tokenId) throws IOException {
     try {
-      store.delete(dbKey);
+      omMetadataManager.getDelegationTokenTable().delete(tokenId);
     } catch (IOException e) {
-      LOG.error("Unable to remove token " + tokenId.toString(), e);
+      LOG.error("Unable to remove token {}", tokenId.toString(), e);
       throw e;
     }
   }
 
-  public int loadMasterKeys(OzoneManagerSecretState state) throws IOException {
-    MetadataKeyFilters.MetadataKeyFilter filter =
-        (preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
-            .startsWith(TOKEN_MASTER_KEY_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> kvs = store
-        .getRangeKVs(null, Integer.MAX_VALUE, filter);
-    kvs.forEach(entry -> {
-      try {
-        loadTokenMasterKey(state, entry.getValue());
-      } catch (IOException e) {
-        LOG.warn("Failed to load master key ",
-            DFSUtil.bytes2String(entry.getKey()), e);
-      }
-    });
-    return kvs.size();
-  }
-
-  private void loadTokenMasterKey(OzoneManagerSecretState state, byte[] data)
-      throws IOException {
-    OzoneSecretKey key = OzoneSecretKey.readProtoBuf(data);
-    state.tokenMasterKeyState.add(key);
-  }
-
   public int loadTokens(OzoneManagerSecretState state) throws IOException {
-    MetadataKeyFilters.MetadataKeyFilter filter =
-        (preKey, currentKey, nextKey) -> DFSUtil.bytes2String(currentKey)
-            .startsWith(TOKEN_STATE_KEY_PREFIX);
-    List<Map.Entry<byte[], byte[]>> kvs =
-        store.getRangeKVs(null, Integer.MAX_VALUE, filter);
-    kvs.forEach(entry -> {
-      try {
-        loadToken(state, entry.getValue());
-      } catch (IOException e) {
-        LOG.warn("Failed to load token ",
-            DFSUtil.bytes2String(entry.getKey()), e);
+    int loadedToken = 0;
+    try (TableIterator<OzoneTokenIdentifier, ? extends
+        KeyValue<OzoneTokenIdentifier, Long>> iterator =
+             omMetadataManager.getDelegationTokenTable().iterator()){
+      iterator.seekToFirst();
+      while(iterator.hasNext()) {
+        KeyValue<OzoneTokenIdentifier, Long> kv = iterator.next();
+        state.tokenState.put(kv.getKey(), kv.getValue());
+        loadedToken++;
       }
-    });
-    return kvs.size();
-  }
-
-  private void loadToken(OzoneManagerSecretState state, byte[] data)
-      throws IOException {
-    long renewDate;
-    DataInputStream in = new DataInputStream(new ByteArrayInputStream(data));
-    OzoneTokenIdentifier tokenId = OzoneTokenIdentifier.readProtoBuf(in);
-    try {
-      tokenId.readFields(in);
-      renewDate = in.readLong();
-    } finally {
-      IOUtils.cleanupWithLogger(LOG, in);
     }
-    state.tokenState.put(tokenId, renewDate);
-  }
-
-  private byte[] getMasterKeyDBKey(OzoneSecretKey masterKey) {
-    return DFSUtil.string2Bytes(
-        TOKEN_MASTER_KEY_KEY_PREFIX + masterKey.getKeyId());
-  }
-
-  private byte[] getTokenDBKey(OzoneTokenIdentifier tokenId) {
-    return DFSUtil.string2Bytes(
-        TOKEN_STATE_KEY_PREFIX + tokenId.getSequenceNumber());
+    return loadedToken;
   }
 }
diff --git a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
index bded322..e866c01 100644
--- a/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
+++ b/hadoop-ozone/dist/src/main/compose/ozonesecure/docker-compose.yaml
@@ -51,8 +51,8 @@ services:
     ports:
       - 9874:9874
     environment:
-      ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
       WAITFOR: scm:9876
+      ENSURE_OM_INITIALIZED: /data/metadata/om/current/VERSION
     env_file:
       - docker-config
     command: ["/opt/hadoop/bin/ozone","om"]
diff --git a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure.robot b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure.robot
index 9c481e6..e4cf246 100644
--- a/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure.robot
+++ b/hadoop-ozone/dist/src/main/smoketest/security/ozone-secure.robot
@@ -149,5 +149,6 @@ Secure S3 test Failure
 Secure S3 test Success
     Run Keyword         Setup credentials
     ${output} =         Execute          aws s3api --endpoint-url ${ENDPOINT_URL} create-bucket --bucket bucket-test123
-                        Should contain    ${result}         Volume pqrs is not found
+    ${output} =         Execute          aws s3api --endpoint-url ${ENDPOINT_URL} list-buckets
+                        Should contain    ${output}         bucket-test123
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
index dbeb0b5..521a4f1 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneClusterImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.StorageUnit;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ipc.Client;
@@ -95,6 +96,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
 
   // Timeout for the cluster to be ready
   private int waitForClusterToBeReadyTimeout = 60000; // 1 min
+  private CertificateClient caClient;
 
   /**
    * Creates a new MiniOzoneCluster.
@@ -364,7 +366,18 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
    */
   @Override
   public void startHddsDatanodes() {
-    hddsDatanodes.forEach((datanode) -> datanode.start(null));
+    hddsDatanodes.forEach((datanode) -> {
+      datanode.setCertificateClient(getCAClient());
+      datanode.start(null);
+    });
+  }
+
+  private CertificateClient getCAClient() {
+    return this.caClient;
+  }
+
+  private void setCAClient(CertificateClient client) {
+    this.caClient = client;
   }
 
 
@@ -403,6 +416,7 @@ public class MiniOzoneClusterImpl implements MiniOzoneCluster {
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
       MiniOzoneClusterImpl cluster = new MiniOzoneClusterImpl(conf, om, scm,
           hddsDatanodes);
+      cluster.setCAClient(certClient);
       if (startDataNodes) {
         cluster.startHddsDatanodes();
       }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
index 6427dae..0cbea52 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestMiniOzoneCluster.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -123,6 +124,10 @@ public class TestMiniOzoneCluster {
     id2.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 2));
     id3.setPort(DatanodeDetails.newPort(Port.Name.STANDALONE, 3));
 
+    // Add certificate serial  id.
+    String certSerialId = "" + RandomUtils.nextLong();
+    id1.setCertSerialId(certSerialId);
+
     // Write a single ID to the file and read it out
     File validIdsFile = new File(WRITE_TMP, "valid-values.id");
     validIdsFile.delete();
@@ -130,6 +135,7 @@ public class TestMiniOzoneCluster {
     DatanodeDetails validId = ContainerUtils.readDatanodeDetailsFrom(
         validIdsFile);
 
+    assertEquals(validId.getCertSerialId(), certSerialId);
     assertEquals(id1, validId);
     assertEquals(id1.getProtoBufMessage(), validId.getProtoBufMessage());
 
@@ -169,11 +175,11 @@ public class TestMiniOzoneCluster {
         true);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf)
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null)
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
@@ -192,11 +198,11 @@ public class TestMiniOzoneCluster {
     ozoneConf.setBoolean(OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
     try (
         DatanodeStateMachine sm1 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
         DatanodeStateMachine sm2 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf);
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null);
         DatanodeStateMachine sm3 = new DatanodeStateMachine(
-            TestUtils.randomDatanodeDetails(), ozoneConf)
+            TestUtils.randomDatanodeDetails(), ozoneConf,  null)
     ) {
       HashSet<Integer> ports = new HashSet<Integer>();
       assertTrue(ports.add(sm1.getContainer().getReadChannel().getIPCPort()));
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
index 8281c26..d7cfd37 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestSecureOzoneCluster.java
@@ -17,18 +17,6 @@
  */
 package org.apache.hadoop.ozone;
 
-import static junit.framework.TestCase.assertNotNull;
-import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_EXPIRED;
-import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
-import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
-import static org.slf4j.event.Level.INFO;
-
 import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
@@ -100,6 +88,20 @@ import java.time.LocalDateTime;
 import java.time.temporal.ChronoUnit;
 import java.util.Date;
 
+import static junit.framework.TestCase.assertNotNull;
+import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ADMINISTRATORS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.INVALID_AUTH_METHOD;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_EXPIRED;
+import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.VOLUME_NOT_FOUND;
+import static org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod.KERBEROS;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.slf4j.event.Level.INFO;
 
 /**
  * Test class to for security enabled Ozone cluster.
@@ -138,6 +140,7 @@ public final class TestSecureOzoneCluster {
   private Path metaDirPath;
   @Rule
   public TemporaryFolder folder= new TemporaryFolder();
+  private String omCertSerialId = "9879877970576";
 
   @Before
   public void init() {
@@ -375,7 +378,6 @@ public final class TestSecureOzoneCluster {
     initSCM();
     // Create a secure SCM instance as om client will connect to it
     scm = StorageContainerManager.createSCM(null, conf);
-
     setupOm(conf);
     conf.set(OMConfigKeys.OZONE_OM_KERBEROS_PRINCIPAL_KEY,
         "non-existent-user@EXAMPLE.com");
@@ -401,7 +403,7 @@ public final class TestSecureOzoneCluster {
     } catch (Exception ex) {
       // Expects timeout failure from scmClient in om but om user login via
       // kerberos should succeed.
-      Assert.assertTrue(logs.getOutput().contains("Ozone Manager login"
+      assertTrue(logs.getOutput().contains("Ozone Manager login"
           + " successful"));
     }
   }
@@ -445,7 +447,7 @@ public final class TestSecureOzoneCluster {
               CLIENT_TIMEOUT), RandomStringUtils.randomAscii(5));
 
       // Assert if auth was successful via Kerberos
-      Assert.assertFalse(logs.getOutput().contains(
+      assertFalse(logs.getOutput().contains(
           "Auth successful for " + username + " (auth:KERBEROS)"));
 
       // Case 1: Test successful delegation token.
@@ -454,7 +456,7 @@ public final class TestSecureOzoneCluster {
 
       // Case 2: Test successful token renewal.
       long renewalTime = omClient.renewDelegationToken(token);
-      Assert.assertTrue(renewalTime > 0);
+      assertTrue(renewalTime > 0);
 
       // Check if token is of right kind and renewer is running om instance
       Assert.assertEquals(token.getKind().toString(), "OzoneToken");
@@ -483,11 +485,11 @@ public final class TestSecureOzoneCluster {
       });
 
       // Case 3: Test Client can authenticate using token.
-      Assert.assertFalse(logs.getOutput().contains(
+      assertFalse(logs.getOutput().contains(
           "Auth successful for " + username + " (auth:TOKEN)"));
       OzoneTestUtils.expectOmException(VOLUME_NOT_FOUND,
           () -> omClient.deleteVolume("vol1"));
-      Assert.assertTrue(logs.getOutput().contains("Auth successful for "
+      assertTrue(logs.getOutput().contains("Auth successful for "
           + username + " (auth:TOKEN)"));
 
       // Case 4: Test failure of token renewal.
@@ -500,11 +502,11 @@ public final class TestSecureOzoneCluster {
             try {
               omClient.renewDelegationToken(token);
             } catch (OMException ex) {
-              Assert.assertTrue(ex.getResult().equals(INVALID_AUTH_METHOD));
+              assertTrue(ex.getResult().equals(INVALID_AUTH_METHOD));
               throw ex;
             }
           });
-      Assert.assertTrue(logs.getOutput().contains(
+      assertTrue(logs.getOutput().contains(
           "Auth successful for " + username + " (auth:TOKEN)"));
       omLogs.clearOutput();
       //testUser.setAuthenticationMethod(AuthMethod.KERBEROS);
@@ -522,7 +524,7 @@ public final class TestSecureOzoneCluster {
       // Wait for client to timeout
       Thread.sleep(CLIENT_TIMEOUT);
 
-      Assert.assertFalse(logs.getOutput().contains("Auth failed for"));
+      assertFalse(logs.getOutput().contains("Auth failed for"));
 
       // Case 6: Test failure of token cancellation.
       // Get Om client, this time authentication using Token will fail as
@@ -538,12 +540,12 @@ public final class TestSecureOzoneCluster {
             try {
               omClient.cancelDelegationToken(token);
             } catch (OMException ex) {
-              Assert.assertTrue(ex.getResult().equals(TOKEN_ERROR_OTHER));
+              assertTrue(ex.getResult().equals(TOKEN_ERROR_OTHER));
               throw ex;
             }
           });
 
-      Assert.assertTrue(logs.getOutput().contains("Auth failed for"));
+      assertTrue(logs.getOutput().contains("Auth failed for"));
     } finally {
       om.stop();
       om.join();
@@ -600,7 +602,7 @@ public final class TestSecureOzoneCluster {
 
       // Renew delegation token
       long expiryTime = omClient.renewDelegationToken(token);
-      Assert.assertTrue(expiryTime > 0);
+      assertTrue(expiryTime > 0);
       omLogs.clearOutput();
 
       // Test failure of delegation renewal
@@ -612,7 +614,7 @@ public final class TestSecureOzoneCluster {
             try {
               omClient.renewDelegationToken(token);
             } catch (OMException ex) {
-              Assert.assertTrue(ex.getResult().equals(TOKEN_EXPIRED));
+              assertTrue(ex.getResult().equals(TOKEN_EXPIRED));
               throw ex;
             }
           });
@@ -625,7 +627,7 @@ public final class TestSecureOzoneCluster {
       LambdaTestUtils.intercept(OMException.class,
           "Delegation token renewal failed",
           () -> omClient.renewDelegationToken(token2));
-      Assert.assertTrue(omLogs.getOutput().contains(" with non-matching " +
+      assertTrue(omLogs.getOutput().contains(" with non-matching " +
           "renewer randomService"));
       omLogs.clearOutput();
 
@@ -640,7 +642,7 @@ public final class TestSecureOzoneCluster {
       LambdaTestUtils.intercept(OMException.class,
           "Delegation token renewal failed",
           () -> omClient.renewDelegationToken(tamperedToken));
-      Assert.assertTrue(omLogs.getOutput().contains("can't be found in " +
+      assertTrue(omLogs.getOutput().contains("can't be found in " +
           "cache"));
       omLogs.clearOutput();
 
@@ -654,6 +656,7 @@ public final class TestSecureOzoneCluster {
     OMStorage omStore = new OMStorage(config);
     omStore.setClusterId("testClusterId");
     omStore.setScmId("testScmId");
+    omStore.setOmCertSerialId(omCertSerialId);
     // writes the version file properties
     omStore.initialize();
     OzoneManager.setTestSecureOmFlag(true);
@@ -690,11 +693,11 @@ public final class TestSecureOzoneCluster {
           .getS3Secret("HADOOP/JOHNDOE");
 
       //secret fetched on both attempts must be same
-      Assert.assertTrue(firstAttempt.getAwsSecret()
+      assertTrue(firstAttempt.getAwsSecret()
           .equals(secondAttempt.getAwsSecret()));
 
       //access key fetched on both attempts must be same
-      Assert.assertTrue(firstAttempt.getAwsAccessKey()
+      assertTrue(firstAttempt.getAwsAccessKey()
           .equals(secondAttempt.getAwsAccessKey()));
 
     } finally {
@@ -705,6 +708,52 @@ public final class TestSecureOzoneCluster {
   }
 
   /**
+   * Tests functionality to init secure OM when it is already initialized.
+   */
+  @Test
+  public void testSecureOmReInit() throws Exception {
+    LogCapturer omLogs =
+        LogCapturer.captureLogs(OzoneManager.getLogger());
+    omLogs.clearOutput();
+    initSCM();
+    try {
+      scm = StorageContainerManager.createSCM(null, conf);
+      scm.start();
+      conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, false);
+      OMStorage omStore = new OMStorage(conf);
+      initializeOmStorage(omStore);
+      OzoneManager.setTestSecureOmFlag(true);
+      om = OzoneManager.createOm(null, conf);
+
+      assertNull(om.getCertificateClient());
+      assertFalse(omLogs.getOutput().contains("Init response: GETCERT"));
+      assertFalse(omLogs.getOutput().contains("Successfully stored " +
+          "SCM signed certificate"));
+
+      conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
+      OzoneManager.omInit(conf);
+      om.stop();
+      om = OzoneManager.createOm(null, conf);
+
+      Assert.assertNotNull(om.getCertificateClient());
+      Assert.assertNotNull(om.getCertificateClient().getPublicKey());
+      Assert.assertNotNull(om.getCertificateClient().getPrivateKey());
+      Assert.assertNotNull(om.getCertificateClient().getCertificate());
+      assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
+      assertTrue(omLogs.getOutput().contains("Successfully stored " +
+          "SCM signed certificate"));
+      X509Certificate certificate = om.getCertificateClient().getCertificate();
+      validateCertificate(certificate);
+
+    } finally {
+      if (scm != null) {
+        scm.stop();
+      }
+    }
+
+  }
+
+  /**
    * Test functionality to get SCM signed certificate for OM.
    */
   @Test
@@ -726,8 +775,8 @@ public final class TestSecureOzoneCluster {
       Assert.assertNotNull(om.getCertificateClient().getPublicKey());
       Assert.assertNotNull(om.getCertificateClient().getPrivateKey());
       Assert.assertNotNull(om.getCertificateClient().getCertificate());
-      Assert.assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
-      Assert.assertTrue(omLogs.getOutput().contains("Successfully stored " +
+      assertTrue(omLogs.getOutput().contains("Init response: GETCERT"));
+      assertTrue(omLogs.getOutput().contains("Successfully stored " +
           "SCM signed certificate"));
       X509Certificate certificate = om.getCertificateClient().getCertificate();
       validateCertificate(certificate);
@@ -761,17 +810,17 @@ public final class TestSecureOzoneCluster {
 
     // Make sure the end date is honored.
     invalidDate = java.sql.Date.valueOf(today.plus(1, ChronoUnit.DAYS));
-    Assert.assertTrue(cert.getNotAfter().after(invalidDate));
+    assertTrue(cert.getNotAfter().after(invalidDate));
 
     invalidDate = java.sql.Date.valueOf(today.plus(400, ChronoUnit.DAYS));
-    Assert.assertTrue(cert.getNotAfter().before(invalidDate));
+    assertTrue(cert.getNotAfter().before(invalidDate));
 
-    Assert.assertTrue(cert.getSubjectDN().toString().contains(scmId));
-    Assert.assertTrue(cert.getSubjectDN().toString().contains(clusterId));
+    assertTrue(cert.getSubjectDN().toString().contains(scmId));
+    assertTrue(cert.getSubjectDN().toString().contains(clusterId));
 
-    Assert.assertTrue(cert.getIssuerDN().toString().contains(scmUser));
-    Assert.assertTrue(cert.getIssuerDN().toString().contains(scmId));
-    Assert.assertTrue(cert.getIssuerDN().toString().contains(clusterId));
+    assertTrue(cert.getIssuerDN().toString().contains(scmUser));
+    assertTrue(cert.getIssuerDN().toString().contains(scmId));
+    assertTrue(cert.getIssuerDN().toString().contains(clusterId));
 
     // Verify that certificate matches the public key.
     String encodedKey1 = cert.getPublicKey().toString();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
index 7640f40..87fe706 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/CertificateClientTestImpl.java
@@ -27,7 +27,6 @@ import org.apache.hadoop.hdds.security.x509.keys.HDDSKeyGenerator;
 import org.bouncycastle.cert.X509CertificateHolder;
 import org.bouncycastle.cert.jcajce.JcaX509CertificateConverter;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.security.KeyPair;
 import java.security.PrivateKey;
@@ -48,13 +47,28 @@ public class CertificateClientTestImpl implements CertificateClient {
   private final SecurityConfig securityConfig;
   private final KeyPair keyPair;
   private final Configuration config;
+  private final X509Certificate x509Certificate;
 
-  public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception{
+  public CertificateClientTestImpl(OzoneConfiguration conf) throws Exception {
     securityConfig = new SecurityConfig(conf);
     HDDSKeyGenerator keyGen =
         new HDDSKeyGenerator(securityConfig.getConfiguration());
     keyPair = keyGen.generateKey();
     config = conf;
+    SelfSignedCertificate.Builder builder =
+        SelfSignedCertificate.newBuilder()
+            .setBeginDate(LocalDate.now())
+            .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
+            .setClusterID("cluster1")
+            .setKey(keyPair)
+            .setSubject("TestCertSub")
+            .setConfiguration(config)
+            .setScmID("TestScmId1")
+            .makeCA();
+    X509CertificateHolder certificateHolder = null;
+    certificateHolder = builder.build();
+    x509Certificate = new JcaX509CertificateConverter().getCertificate(
+        certificateHolder);
   }
 
   @Override
@@ -67,26 +81,21 @@ public class CertificateClientTestImpl implements CertificateClient {
     return keyPair.getPublic();
   }
 
+  /**
+   * Returns the certificate  of the specified component if it exists on the
+   * local system.
+   *
+   * @return certificate or Null if there is no data.
+   */
+  @Override
+  public X509Certificate getCertificate(String certSerialId)
+      throws CertificateException {
+    return x509Certificate;
+  }
+
   @Override
   public X509Certificate getCertificate() {
-    SelfSignedCertificate.Builder builder =
-        SelfSignedCertificate.newBuilder()
-            .setBeginDate(LocalDate.now())
-            .setEndDate(LocalDate.now().plus(365, ChronoUnit.DAYS))
-            .setClusterID("cluster1")
-            .setKey(keyPair)
-            .setSubject("TestCertSub")
-            .setConfiguration(config)
-            .setScmID("TestScmId1")
-            .makeCA();
-    X509CertificateHolder certificateHolder = null;
-    try {
-      certificateHolder = builder.build();
-      return new JcaX509CertificateConverter().getCertificate(
-          certificateHolder);
-    } catch (IOException | java.security.cert.CertificateException e) {
-    }
-    return null;
+    return x509Certificate;
   }
 
   @Override
@@ -107,13 +116,13 @@ public class CertificateClientTestImpl implements CertificateClient {
 
   @Override
   public boolean verifySignature(InputStream stream, byte[] signature,
-      X509Certificate x509Certificate) throws CertificateException {
+      X509Certificate cert) throws CertificateException {
     return true;
   }
 
   @Override
   public boolean verifySignature(byte[] data, byte[] signature,
-      X509Certificate x509Certificate) throws CertificateException {
+      X509Certificate cert) throws CertificateException {
     return true;
   }
 
@@ -128,7 +137,7 @@ public class CertificateClientTestImpl implements CertificateClient {
   }
 
   @Override
-  public void storeCertificate(X509Certificate certificate)
+  public void storeCertificate(String cert, boolean force)
       throws CertificateException {
 
   }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
index 147bee2..d0163b8 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestSecureOzoneRpcClient.java
@@ -23,10 +23,13 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdds.security.token.BlockTokenVerifier;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.client.ObjectStore;
@@ -41,16 +44,21 @@ import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.UUID;
 
@@ -71,6 +79,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
   private static final String SCM_ID = UUID.randomUUID().toString();
   private static File testDir;
   private static OzoneConfiguration conf;
+  private static OzoneBlockTokenSecretManager secretManager;
 
   /**
    * Create a MiniOzoneCluster for testing.
@@ -96,6 +105,14 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
         .setScmId(SCM_ID)
         .setCertificateClient(certificateClientTest)
         .build();
+    String user = UserGroupInformation.getCurrentUser().getShortUserName();
+    secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
+        60 *60, certificateClientTest.getCertificate().
+        getSerialNumber().toString());
+    secretManager.start(certificateClientTest);
+    Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
+        user, EnumSet.allOf(AccessModeProto.class), 60*60);
+    UserGroupInformation.getCurrentUser().addToken(token);
     cluster.getOzoneManager().startSecretManager();
     cluster.waitForClusterToBeReady();
     ozClient = OzoneClientFactory.getRpcClient(conf);
@@ -163,6 +180,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
    * 2. writeChunk
    * */
   @Test
+  @Ignore("Needs to be moved out of this class as  client setup is static")
   public void testKeyOpFailureWithoutBlockToken() throws Exception {
     String volumeName = UUID.randomUUID().toString();
     String bucketName = UUID.randomUUID().toString();
@@ -176,7 +194,7 @@ public class TestSecureOzoneRpcClient extends TestOzoneRpcClient {
     for (int i = 0; i < 10; i++) {
       String keyName = UUID.randomUUID().toString();
 
-      try(OzoneOutputStream out = bucket.createKey(keyName,
+      try (OzoneOutputStream out = bucket.createKey(keyName,
           value.getBytes().length, ReplicationType.STAND_ALONE,
           ReplicationFactor.ONE, new HashMap<>())) {
         LambdaTestUtils.intercept(IOException.class, "UNAUTHENTICATED: Fail " +
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
index d06cee7..0b97a00 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ContainerTestHelper.java
@@ -376,6 +376,20 @@ public final class ContainerTestHelper {
     return getContainerCommandRequestBuilder(containerID, pipeline).build();
   }
 
+  /**
+   * Returns a create container command with token. There are a bunch of
+   * tests where we need to just send a request and get a reply.
+   *
+   * @return ContainerCommandRequestProto.
+   */
+  public static ContainerCommandRequestProto getCreateContainerRequest(
+      long containerID, Pipeline pipeline, Token token) throws IOException {
+    LOG.trace("addContainer: {}", containerID);
+    return getContainerCommandRequestBuilder(containerID, pipeline)
+        .setEncodedToken(token.encodeToUrlString())
+        .build();
+  }
+
   private static Builder getContainerCommandRequestBuilder(long containerID,
       Pipeline pipeline) throws IOException {
     Builder request =
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
index d2f2c91..dc9e133 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/TestCSMMetrics.java
@@ -158,7 +158,7 @@ public class TestCSMMetrics {
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
     return XceiverServerRatis.newXceiverServerRatis(dn, conf, dispatcher,
-        null);
+        null, null);
   }
 
   private static class TestContainerDispatcher implements ContainerDispatcher {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
index 44c1172..e06a9e9 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/metrics/TestContainerMetrics.java
@@ -111,7 +111,7 @@ public class TestContainerMetrics {
           volumeSet, handlers, context, metrics);
       dispatcher.setScmId(UUID.randomUUID().toString());
 
-      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
+      server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher, null,
           createReplicationService(new ContainerController(
               containerSet, handlers)));
       client = new XceiverClientGrpc(pipeline, conf);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index 4b6eb4c..27777de 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -81,7 +81,7 @@ public class TestOzoneContainer {
       DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
       Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
       Mockito.when(context.getParent()).thenReturn(dsm);
-      container = new OzoneContainer(datanodeDetails, conf, context);
+      container = new OzoneContainer(datanodeDetails, conf, context, null);
       //Setting scmId, as we start manually ozone container.
       container.getDispatcher().setScmId(UUID.randomUUID().toString());
       container.start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
index c2475e5..fcfc762 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainerWithTLS.java
@@ -154,7 +154,7 @@ public class TestOzoneContainerWithTLS {
       conf.setBoolean(
           OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
-      container = new OzoneContainer(dn, conf, getContext(dn));
+      container = new OzoneContainer(dn, conf, getContext(dn), null);
       //Setting scmId, as we start manually ozone container.
       container.getDispatcher().setScmId(UUID.randomUUID().toString());
       container.start();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
index b43570a..6cb1ebe 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestSecureOzoneContainer.java
@@ -18,15 +18,17 @@
 
 package org.apache.hadoop.ozone.container.ozoneimpl;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
 import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
 import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.XceiverClientGrpc;
@@ -34,7 +36,7 @@ import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
-import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
@@ -52,7 +54,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.Arrays;
 import java.util.Collection;
@@ -61,6 +62,7 @@ import java.util.UUID;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -85,6 +87,8 @@ public class TestSecureOzoneContainer {
   private Boolean requireBlockToken;
   private Boolean hasBlockToken;
   private Boolean blockTokeExpired;
+  private CertificateClientTestImpl caClient;
+  private OzoneBlockTokenSecretManager secretManager;
 
 
   public TestSecureOzoneContainer(Boolean requireBlockToken,
@@ -105,14 +109,16 @@ public class TestSecureOzoneContainer {
   }
 
   @Before
-  public void setup() throws IOException{
+  public void setup() throws Exception {
     conf = new OzoneConfiguration();
     String ozoneMetaPath =
         GenericTestUtils.getTempPath("ozoneMeta");
     conf.set(OZONE_METADATA_DIRS, ozoneMetaPath);
-
     secConfig = new SecurityConfig(conf);
-
+    caClient = new CertificateClientTestImpl(conf);
+    secretManager = new OzoneBlockTokenSecretManager(new SecurityConfig(conf),
+        60 * 60 * 24, caClient.getCertificate().
+        getSerialNumber().toString());
   }
 
   @Test
@@ -136,7 +142,7 @@ public class TestSecureOzoneContainer {
           OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
 
       DatanodeDetails dn = TestUtils.randomDatanodeDetails();
-      container = new OzoneContainer(dn, conf, getContext(dn));
+      container = new OzoneContainer(dn, conf, getContext(dn), caClient);
       //Setting scmId, as we start manually ozone container.
       container.getDispatcher().setScmId(UUID.randomUUID().toString());
       container.start();
@@ -148,54 +154,47 @@ public class TestSecureOzoneContainer {
 
       OzoneBlockTokenIdentifier tokenId = new OzoneBlockTokenIdentifier(
           "testUser", "cid:lud:bcsid",
-          EnumSet.allOf(HddsProtos.BlockTokenSecretProto.AccessModeProto.class),
+          EnumSet.allOf(AccessModeProto.class),
           expiryDate, "1234", 128L);
 
       int port = dn.getPort(DatanodeDetails.Port.Name.STANDALONE).getValue();
       if (port == 0) {
         port = secConfig.getConfiguration().getInt(OzoneConfigKeys
-                .DFS_CONTAINER_IPC_PORT,
-            OzoneConfigKeys.DFS_CONTAINER_IPC_PORT_DEFAULT);
+                .DFS_CONTAINER_IPC_PORT, DFS_CONTAINER_IPC_PORT_DEFAULT);
       }
-      InetSocketAddress addr =
-          new InetSocketAddress(dn.getIpAddress(), port);
-
-      Token<OzoneBlockTokenIdentifier> token =
-          new Token(tokenId.getBytes(), new byte[50], tokenId.getKind(),
-              SecurityUtil.buildTokenService(addr));
+      secretManager.start(caClient);
+      Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken(
+          "123", EnumSet.allOf(AccessModeProto.class), RandomUtils.nextLong());
       if (hasBlockToken) {
         ugi.addToken(token);
       }
 
-      ugi.doAs(new PrivilegedAction<Void>() {
-        @Override
-        public Void run() {
-          try {
-            XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
-            client.connect(token.encodeToUrlString());
-            if (hasBlockToken) {
-              createContainerForTesting(client, containerID, token);
-            } else {
-              createContainerForTesting(client, containerID, null);
-            }
-
-          } catch (Exception e) {
-            if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
-              LOG.error("Unexpected error. ", e);
-              fail("Client with BlockToken should succeed when block token is" +
-                  " required.");
-            }
-            if (requireBlockToken && hasBlockToken && blockTokeExpired) {
-              assertTrue("Receive expected exception",
-                  e instanceof SCMSecurityException);
-            }
-            if (requireBlockToken && !hasBlockToken) {
-              assertTrue("Receive expected exception", e instanceof
-                  IOException);
-            }
+      ugi.doAs((PrivilegedAction<Void>) () -> {
+        try {
+          XceiverClientGrpc client = new XceiverClientGrpc(pipeline, conf);
+          client.connect(token.encodeToUrlString());
+          if (hasBlockToken) {
+            createContainerForTesting(client, containerID, token);
+          } else {
+            createContainerForTesting(client, containerID, null);
+          }
+
+        } catch (Exception e) {
+          if (requireBlockToken && hasBlockToken && !blockTokeExpired) {
+            LOG.error("Unexpected error. ", e);
+            fail("Client with BlockToken should succeed when block token is" +
+                " required.");
+          }
+          if (requireBlockToken && hasBlockToken && blockTokeExpired) {
+            assertTrue("Receive expected exception",
+                e instanceof SCMSecurityException);
+          }
+          if (requireBlockToken && !hasBlockToken) {
+            assertTrue("Receive expected exception", e instanceof
+                IOException);
           }
-          return null;
         }
+        return null;
       });
     } finally {
       if (container != null) {
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
index 63abd36..fac7b50 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestContainerServer.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.ozone.container.server;
 import com.google.common.collect.Maps;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
+import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
+import org.apache.hadoop.hdds.security.x509.certificate.client.DNCertificateClient;
 import org.apache.hadoop.ozone.container.common.helpers.ContainerMetrics;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.impl.HddsDispatcher;
@@ -82,6 +85,7 @@ public class TestContainerServer {
   static final String TEST_DIR = GenericTestUtils.getTestDir("dfs")
       .getAbsolutePath() + File.separator;
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
+  private static CertificateClient caClient;
 
   private GrpcReplicationService createReplicationService(
       ContainerController containerController) {
@@ -92,6 +96,7 @@ public class TestContainerServer {
   @BeforeClass
   static public void setup() {
     CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
+    caClient = new DNCertificateClient(new SecurityConfig(CONF));
   }
 
   @Test
@@ -106,7 +111,7 @@ public class TestContainerServer {
                     .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
         XceiverClientGrpc::new,
         (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
-            new TestContainerDispatcher(),
+            new TestContainerDispatcher(), caClient,
             createReplicationService(controller)), (dn, p) -> {
         });
   }
@@ -137,7 +142,7 @@ public class TestContainerServer {
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
     return XceiverServerRatis
-        .newXceiverServerRatis(dn, conf, dispatcher, null);
+        .newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
   }
 
   static void runTestClientServerRatis(RpcType rpc, int numNodes)
@@ -229,7 +234,7 @@ public class TestContainerServer {
       dispatcher.init();
 
       server = new XceiverServerGrpc(datanodeDetails, conf, dispatcher,
-          createReplicationService(
+          caClient, createReplicationService(
               new ContainerController(containerSet, null)));
       client = new XceiverClientGrpc(pipeline, conf);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
index 3d631cc..11201b5 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/server/TestSecureContainerServer.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.ozone.container.server;
 
+import org.apache.commons.lang3.RandomUtils;
 import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -30,9 +31,12 @@ import org.apache.hadoop.hdds.scm.XceiverClientRatis;
 import org.apache.hadoop.hdds.scm.XceiverClientSpi;
 import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.storage.ContainerProtocolCalls;
+import org.apache.hadoop.hdds.security.exception.SCMSecurityException;
+import org.apache.hadoop.hdds.security.token.OzoneBlockTokenIdentifier;
+import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.RatisTestHelper;
+import org.apache.hadoop.ozone.client.CertificateClientTestImpl;
 import org.apache.hadoop.ozone.container.ContainerTestHelper;
 import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
 import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher;
@@ -44,10 +48,14 @@ import org.apache.hadoop.ozone.container.common.transport.server.ratis.XceiverSe
 import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
 import org.apache.hadoop.ozone.container.replication.GrpcReplicationService;
 import org.apache.hadoop.ozone.container.replication.OnDemandContainerReplicationSource;
+import org.apache.hadoop.ozone.security.OzoneBlockTokenSecretManager;
 import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.BlockTokenSecretProto.AccessModeProto;
+
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.util.function.CheckedBiConsumer;
 import org.junit.Assert;
@@ -58,13 +66,19 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.EnumSet;
 import java.util.List;
 import java.util.Set;
 
 import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_BLOCK_TOKEN_ENABLED;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.BLOCK_TOKEN_VERIFICATION_FAILED;
+import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.SUCCESS;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SECURITY_ENABLED_KEY;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getCreateContainerRequest;
+import static org.apache.hadoop.ozone.container.ContainerTestHelper.getTestContainerID;
 import static org.apache.ratis.rpc.SupportedRpcType.GRPC;
 import static org.apache.ratis.rpc.SupportedRpcType.NETTY;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Test Container servers when security is enabled.
@@ -73,6 +87,7 @@ public class TestSecureContainerServer {
   static final String TEST_DIR
       = GenericTestUtils.getTestDir("dfs").getAbsolutePath() + File.separator;
   private static final OzoneConfiguration CONF = new OzoneConfiguration();
+  private static CertificateClientTestImpl caClient;
 
   private GrpcReplicationService createReplicationService(
       ContainerController containerController) {
@@ -81,10 +96,11 @@ public class TestSecureContainerServer {
   }
 
   @BeforeClass
-  static public void setup() {
+  static public void setup() throws Exception {
     CONF.set(HddsConfigKeys.HDDS_METADATA_DIR_NAME, TEST_DIR);
     CONF.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     CONF.setBoolean(HDDS_BLOCK_TOKEN_ENABLED, true);
+    caClient = new CertificateClientTestImpl(CONF);
   }
 
   @Test
@@ -99,7 +115,7 @@ public class TestSecureContainerServer {
                     .getPort(DatanodeDetails.Port.Name.STANDALONE).getValue()),
         XceiverClientGrpc::new,
         (dn, conf) -> new XceiverServerGrpc(datanodeDetails, conf,
-            new TestContainerDispatcher(),
+            new TestContainerDispatcher(), caClient,
             createReplicationService(controller)), (dn, p) -> {
         });
   }
@@ -131,7 +147,7 @@ public class TestSecureContainerServer {
 
     final ContainerDispatcher dispatcher = new TestContainerDispatcher();
     return XceiverServerRatis
-        .newXceiverServerRatis(dn, conf, dispatcher, null);
+        .newXceiverServerRatis(dn, conf, dispatcher, null, caClient);
   }
 
   static void runTestClientServerRatis(RpcType rpc, int numNodes)
@@ -173,28 +189,50 @@ public class TestSecureContainerServer {
 
       // Test 1: Test failure in request without block token.
       final ContainerCommandRequestProto request =
-          ContainerTestHelper
-              .getCreateContainerRequest(
-                  ContainerTestHelper.getTestContainerID(), pipeline);
+          getCreateContainerRequest(
+                  getTestContainerID(), pipeline);
       Assert.assertNotNull(request.getTraceID());
 
       XceiverClientSpi finalClient = client;
-      LambdaTestUtils.intercept(IOException.class,
-          () -> ContainerProtocolCalls
-              .validateContainerResponse(finalClient.sendCommand(request)));
+      // Validation is different for grpc and ratis client.
+      if(client instanceof XceiverClientGrpc) {
+        LambdaTestUtils.intercept(SCMSecurityException.class, "Failed to" +
+                " authenticate with GRPC XceiverServer with Ozone block token",
+            () -> finalClient.sendCommand(request));
+      } else {
+        ContainerCommandResponseProto response = finalClient.
+            sendCommand(request);
+        assertEquals(BLOCK_TOKEN_VERIFICATION_FAILED, response.getResult());
+      }
 
       // Test 2: Test success in request with valid block token.
+      long expiryTime = Time.monotonicNow() + 60 * 60 * 24;
+
+      String omCertSerialId =
+          caClient.getCertificate().getSerialNumber().toString();
+      OzoneBlockTokenSecretManager secretManager =
+          new OzoneBlockTokenSecretManager(new SecurityConfig(CONF),
+          expiryTime, omCertSerialId);
+      secretManager.start(caClient);
+      Token<OzoneBlockTokenIdentifier> token = secretManager.generateToken("1",
+          EnumSet.allOf(AccessModeProto.class), RandomUtils.nextLong());
       final ContainerCommandRequestProto request2 =
           ContainerTestHelper
               .getCreateContainerSecureRequest(
-                  ContainerTestHelper.getTestContainerID(), pipeline,
-                  new Token<>());
+                  getTestContainerID(), pipeline,
+                  token);
       Assert.assertNotNull(request2.getTraceID());
+      XceiverClientSpi finalClient2 = createClient.apply(pipeline, CONF);
+      if(finalClient2 instanceof XceiverClientGrpc) {
+        finalClient2.connect(token.encodeToUrlString());
+      } else {
+        finalClient2.connect();
+      }
 
-      XceiverClientSpi finalClient2 = client;
-      LambdaTestUtils.intercept(IOException.class, "",
-          () -> ContainerProtocolCalls
-              .validateContainerResponse(finalClient2.sendCommand(request)));
+      ContainerCommandRequestProto request3 = getCreateContainerRequest(
+          getTestContainerID(), pipeline, token);
+      ContainerCommandResponseProto resp = finalClient2.sendCommand(request3);
+      assertEquals(SUCCESS, resp.getResult());
     } finally {
       if (client != null) {
         client.close();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
index 0213934..57e697e 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestSecureOzoneManager.java
@@ -46,6 +46,7 @@ import java.security.cert.X509Certificate;
 import java.util.UUID;
 
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY;
 import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
@@ -83,6 +84,7 @@ public class TestSecureOzoneManager {
     conf.setBoolean(OZONE_SECURITY_ENABLED_KEY, true);
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
     conf.set(HADOOP_SECURITY_AUTHENTICATION, KERBEROS.toString());
+    conf.setInt(IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 2);
     conf.set(OZONE_SCM_NAMES, "localhost");
     final String path = getTempPath(UUID.randomUUID().toString());
     metaDir = Paths.get(path, "om-meta");
@@ -175,7 +177,6 @@ public class TestSecureOzoneManager {
     omLogs.clearOutput();
 
     // Case 5: When only certificate is present.
-    client = new OMCertificateClient(securityConfig);
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
     CertificateCodec certCodec = new CertificateCodec(securityConfig);
@@ -184,6 +185,9 @@ public class TestSecureOzoneManager {
         securityConfig.getSignatureAlgo());
     certCodec.writeCertificate(new X509CertificateHolder(
         x509Certificate.getEncoded()));
+    client = new OMCertificateClient(securityConfig,
+        x509Certificate.getSerialNumber().toString());
+    omStorage.setOmCertSerialId(x509Certificate.getSerialNumber().toString());
     LambdaTestUtils.intercept(RuntimeException.class, " OM security" +
             " initialization failed",
         () -> OzoneManager.initializeSecurity(conf, omStorage));
@@ -194,7 +198,8 @@ public class TestSecureOzoneManager {
     omLogs.clearOutput();
 
     // Case 6: When private key and certificate is present.
-    client = new OMCertificateClient(securityConfig);
+    client = new OMCertificateClient(securityConfig,
+        x509Certificate.getSerialNumber().toString());
     FileUtils.deleteQuietly(Paths.get(securityConfig.getKeyLocation()
         .toString(), securityConfig.getPublicKeyFileName()).toFile());
     keyCodec.writePrivateKey(privateKey);
@@ -206,7 +211,8 @@ public class TestSecureOzoneManager {
     omLogs.clearOutput();
 
     // Case 7 When keypair and certificate is present.
-    client = new OMCertificateClient(securityConfig);
+    client = new OMCertificateClient(securityConfig,
+        x509Certificate.getSerialNumber().toString());
     OzoneManager.initializeSecurity(conf, omStorage);
     Assert.assertNotNull(client.getPrivateKey());
     Assert.assertNotNull(client.getPublicKey());
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
index 3ed04f1..b84cc5d 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OMStorage.java
@@ -36,6 +36,7 @@ public class OMStorage extends Storage {
 
   public static final String STORAGE_DIR = "om";
   public static final String OM_ID = "omUuid";
+  public static final String OM_CERT_SERIAL_ID = "omCertSerialId";
 
   /**
    * Construct OMStorage.
@@ -53,6 +54,10 @@ public class OMStorage extends Storage {
     }
   }
 
+  public void setOmCertSerialId(String certSerialId) throws IOException {
+    getStorageInfo().setProperty(OM_CERT_SERIAL_ID, certSerialId);
+  }
+
   public void setOmId(String omId) throws IOException {
     if (getState() == StorageState.INITIALIZED) {
       throw new IOException("OM is already initialized.");
@@ -77,6 +82,14 @@ public class OMStorage extends Storage {
     return getStorageInfo().getProperty(OM_ID);
   }
 
+  /**
+   * Retrieves the serial id of certificate issued by SCM.
+   * @return OM_ID
+   */
+  public String getOmCertSerialId() {
+    return getStorageInfo().getProperty(OM_CERT_SERIAL_ID);
+  }
+
   @Override
   protected Properties getNodeProperties() {
     String omId = getOmId();
@@ -85,6 +98,10 @@ public class OMStorage extends Storage {
     }
     Properties omProperties = new Properties();
     omProperties.setProperty(OM_ID, omId);
+
+    if (getOmCertSerialId() != null) {
+      omProperties.setProperty(OM_CERT_SERIAL_ID, getOmCertSerialId());
+    }
     return omProperties;
   }
 }
\ No newline at end of file
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
index 829f35e..51d51b8 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OmMetadataManagerImpl.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.ozone.om.codec.OmBucketInfoCodec;
 import org.apache.hadoop.ozone.om.codec.OmKeyInfoCodec;
 import org.apache.hadoop.ozone.om.codec.OmMultipartKeyInfoCodec;
 import org.apache.hadoop.ozone.om.codec.OmVolumeArgsCodec;
+import org.apache.hadoop.ozone.om.codec.TokenIdentifierCodec;
 import org.apache.hadoop.ozone.om.codec.VolumeListCodec;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
@@ -41,6 +42,7 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfoGroup;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeList;
+import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
 import org.apache.hadoop.utils.db.DBStore;
 import org.apache.hadoop.utils.db.DBStoreBuilder;
 import org.apache.hadoop.utils.db.Table;
@@ -91,6 +93,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
    * |-------------------------------------------------------------------|
    * | s3SecretTable      | s3g_access_key_id -> s3Secret                |
    * |-------------------------------------------------------------------|
+   * | dTokenTable        | s3g_access_key_id -> s3Secret                |
+   * |-------------------------------------------------------------------|
    */
 
   private static final String USER_TABLE = "userTable";
@@ -102,6 +106,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private static final String S3_TABLE = "s3Table";
   private static final String MULTIPARTINFO_TABLE = "multipartInfoTable";
   private static final String S3_SECRET_TABLE = "s3SecretTable";
+  private static final String DELEGATION_TOKEN_TABLE = "dTokenTable";
 
   private DBStore store;
 
@@ -117,6 +122,7 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
   private Table s3Table;
   private Table<String, OmMultipartKeyInfo> multipartInfoTable;
   private Table s3SecretTable;
+  private Table dTokenTable;
 
   public OmMetadataManagerImpl(OzoneConfiguration conf) throws IOException {
     this.lock = new OzoneManagerLock(conf);
@@ -131,6 +137,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
     return userTable;
   }
 
+  public Table<OzoneTokenIdentifier, Long> getDelegationTokenTable() {
+    return dTokenTable;
+  }
+
   @Override
   public Table<String, OmVolumeArgs> getVolumeTable() {
     return volumeTable;
@@ -200,6 +210,8 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
           .addTable(S3_TABLE)
           .addTable(MULTIPARTINFO_TABLE)
           .addTable(S3_SECRET_TABLE)
+          .addTable(DELEGATION_TOKEN_TABLE)
+          .addCodec(OzoneTokenIdentifier.class, new TokenIdentifierCodec())
           .addCodec(OmKeyInfo.class, new OmKeyInfoCodec())
           .addCodec(OmBucketInfo.class, new OmBucketInfoCodec())
           .addCodec(OmVolumeArgs.class, new OmVolumeArgsCodec())
@@ -234,6 +246,10 @@ public class OmMetadataManagerImpl implements OMMetadataManager {
       s3Table = this.store.getTable(S3_TABLE);
       checkTableStatus(s3Table, S3_TABLE);
 
+      dTokenTable = this.store.getTable(DELEGATION_TOKEN_TABLE,
+          OzoneTokenIdentifier.class, Long.class);
+      checkTableStatus(dTokenTable, DELEGATION_TOKEN_TABLE);
+
       multipartInfoTable = this.store.getTable(MULTIPARTINFO_TABLE,
           String.class, OmMultipartKeyInfo.class);
       checkTableStatus(multipartInfoTable, MULTIPARTINFO_TABLE);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index 0559762..2571b3f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -26,9 +26,8 @@ import com.google.protobuf.BlockingService;
 
 import java.security.PrivateKey;
 import java.security.PublicKey;
-import java.security.cert.CertificateException;
-import java.security.cert.X509Certificate;
 import java.security.KeyPair;
+import java.security.cert.CertificateException;
 import java.util.Collection;
 import java.util.Objects;
 
@@ -221,7 +220,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
   private static boolean securityEnabled = false;
   private OzoneDelegationTokenSecretManager delegationTokenMgr;
   private OzoneBlockTokenSecretManager blockTokenMgr;
-  private KeyPair keyPair;
   private CertificateClient certClient;
   private static boolean testSecureOmFlag = false;
   private final Text omRpcAddressTxt;
@@ -325,13 +323,19 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         volumeManager, bucketManager);
     if (secConfig.isSecurityEnabled()) {
       omComponent = OM_DAEMON + "-" + omId;
-      certClient = new OMCertificateClient(new SecurityConfig(conf));
+      if(omStorage.getOmCertSerialId() == null) {
+        throw new RuntimeException("OzoneManager started in secure mode but " +
+            "doesn't have SCM signed certificate.");
+      }
+      certClient = new OMCertificateClient(new SecurityConfig(conf),
+          omStorage.getOmCertSerialId());
       s3SecretManager = new S3SecretManagerImpl(configuration, metadataManager);
       delegationTokenMgr = createDelegationTokenSecretManager(configuration);
     }
     if (secConfig.isBlockTokenEnabled()) {
       blockTokenMgr = createBlockTokenSecretManager(configuration);
     }
+
     omRpcServer = getRpcServer(conf);
     omRpcAddress = updateRPCListenAddress(configuration,
         OZONE_OM_ADDRESS_KEY, omNodeRpcAddr, omRpcServer);
@@ -693,8 +697,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       Objects.requireNonNull(pubKey);
       Objects.requireNonNull(pvtKey);
       Objects.requireNonNull(certClient.getCertificate());
-
-      keyPair = new KeyPair(pubKey, pvtKey);
     } catch (Exception e) {
       throw new OzoneSecurityException("Error reading keypair & certificate "
           + "OzoneManager.", e, OzoneSecurityException
@@ -950,7 +952,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
    *                     accessible
    */
   @VisibleForTesting
-  static boolean omInit(OzoneConfiguration conf) throws IOException {
+  public static boolean omInit(OzoneConfiguration conf) throws IOException {
     OMStorage omStorage = new OMStorage(conf);
     StorageState state = omStorage.getState();
     if (state != StorageState.INITIALIZED) {
@@ -966,22 +968,27 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         }
         omStorage.setClusterId(clusterId);
         omStorage.setScmId(scmId);
+        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
+          initializeSecurity(conf, omStorage);
+        }
         omStorage.initialize();
         System.out.println(
             "OM initialization succeeded.Current cluster id for sd="
                 + omStorage.getStorageDir() + ";cid=" + omStorage
                 .getClusterID());
 
-        if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
-          initializeSecurity(conf, omStorage);
-        }
-
         return true;
       } catch (IOException ioe) {
         LOG.error("Could not initialize OM version file", ioe);
         return false;
       }
     } else {
+      if(OzoneSecurityUtil.isSecurityEnabled(conf) &&
+          omStorage.getOmCertSerialId() == null) {
+        LOG.info("OM storage is already initialized. Initializing security");
+        initializeSecurity(conf, omStorage);
+        omStorage.persistCurrentState();
+      }
       System.out.println(
           "OM already initialized.Reusing existing cluster id for sd="
               + omStorage.getStorageDir() + ";cid=" + omStorage
@@ -1000,7 +1007,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     LOG.info("Initializing secure OzoneManager.");
 
     CertificateClient certClient =
-        new OMCertificateClient(new SecurityConfig(conf));
+        new OMCertificateClient(new SecurityConfig(conf),
+            omStore.getOmCertSerialId());
     CertificateClient.InitResponse response = certClient.init();
     LOG.info("Init response: {}", response);
     switch (response) {
@@ -1313,7 +1321,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
       isOmRpcServerRunning = false;
       keyManager.stop();
       stopSecretManager();
-      httpServer.stop();
+      if (httpServer != null) {
+        httpServer.stop();
+      }
       metadataManager.stop();
       metrics.unRegister();
       unregisterMXBean();
@@ -1397,9 +1407,10 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
         getEncodedString(csr));
 
     try {
-      X509Certificate x509Certificate =
-          CertificateCodec.getX509Cert(pemEncodedCert);
-      client.storeCertificate(x509Certificate);
+      client.storeCertificate(pemEncodedCert, true);
+      // Persist om cert serial id.
+      omStore.setOmCertSerialId(CertificateCodec.
+          getX509Certificate(pemEncodedCert).getSerialNumber().toString());
     } catch (IOException | CertificateException e) {
       LOG.error("Error while storing SCM signed certificate.", e);
       throw new RuntimeException(e);
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java
similarity index 100%
rename from hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java
rename to hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneBlockTokenSecretManager.java
diff --git a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
similarity index 92%
rename from hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
rename to hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
index 81d9952..f05a1e8 100644
--- a/hadoop-ozone/common/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/security/TestOzoneDelegationTokenSecretManager.java
@@ -18,26 +18,29 @@
 
 package org.apache.hadoop.ozone.security;
 
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.hdds.HddsConfigKeys;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.security.x509.SecurityConfig;
 import org.apache.hadoop.hdds.security.x509.certificate.client.CertificateClient;
 import org.apache.hadoop.hdds.security.x509.certificate.client.OMCertificateClient;
+import org.apache.hadoop.hdds.server.ServerUtils;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ozone.om.OMMetadataManager;
+import org.apache.hadoop.ozone.om.OmMetadataManagerImpl;
 import org.apache.hadoop.ozone.om.S3SecretManager;
+import org.apache.hadoop.ozone.om.S3SecretManagerImpl;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.ssl.KeyStoreTestUtil;
 import org.apache.hadoop.security.token.SecretManager;
 import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.test.LambdaTestUtils;
 import org.apache.hadoop.util.Time;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
@@ -63,18 +66,18 @@ public class TestOzoneDelegationTokenSecretManager {
   private long expiryTime;
   private Text serviceRpcAdd;
   private OzoneConfiguration conf;
-  private static final String BASEDIR = GenericTestUtils.getTempPath(
-      TestOzoneDelegationTokenSecretManager.class.getSimpleName());
   private final static Text TEST_USER = new Text("testUser");
   private long tokenMaxLifetime = 1000 * 20;
   private long tokenRemoverScanInterval = 1000 * 20;
   private S3SecretManager s3SecretManager;
   private String s3Secret = "dbaksbzljandlkandlsd";
 
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
   @Before
   public void setUp() throws Exception {
-    conf = new OzoneConfiguration();
-    conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, BASEDIR);
+    conf = createNewTestPath();
     securityConfig = new SecurityConfig(conf);
     certificateClient = setupCertificateClient();
     certificateClient.init();
@@ -83,7 +86,8 @@ public class TestOzoneDelegationTokenSecretManager {
     final Map<String, String> s3Secrets = new HashMap<>();
     s3Secrets.put("testuser1", s3Secret);
     s3Secrets.put("abc", "djakjahkd");
-    s3SecretManager = new S3SecretManager() {
+    OMMetadataManager metadataManager = new OmMetadataManagerImpl(conf);
+    s3SecretManager = new S3SecretManagerImpl(conf, metadataManager) {
       @Override
       public S3SecretValue getS3Secret(String kerberosID) {
         if(s3Secrets.containsKey(kerberosID)) {
@@ -102,6 +106,16 @@ public class TestOzoneDelegationTokenSecretManager {
     };
   }
 
+  private OzoneConfiguration createNewTestPath() throws IOException {
+    OzoneConfiguration config = new OzoneConfiguration();
+    File newFolder = folder.newFolder();
+    if (!newFolder.exists()) {
+      Assert.assertTrue(newFolder.mkdirs());
+    }
+    ServerUtils.setOzoneMetaDirPath(config, newFolder.toString());
+    return config;
+  }
+
   /**
    * Helper function to create certificate client.
    * */
@@ -125,13 +139,17 @@ public class TestOzoneDelegationTokenSecretManager {
       public PublicKey getPublicKey() {
         return keyPair.getPublic();
       }
+
+      @Override
+      public X509Certificate getCertificate(String serialId) {
+        return cert;
+      }
     };
   }
 
   @After
   public void tearDown() throws IOException {
     secretManager.stop();
-    FileUtils.deleteQuietly(new File(BASEDIR));
   }
 
   @Test
@@ -140,8 +158,7 @@ public class TestOzoneDelegationTokenSecretManager {
         expiryTime, tokenRemoverScanInterval);
     secretManager.start(certificateClient);
     Token<OzoneTokenIdentifier> token = secretManager.createToken(TEST_USER,
-        TEST_USER,
-        TEST_USER);
+        TEST_USER, TEST_USER);
     OzoneTokenIdentifier identifier =
         OzoneTokenIdentifier.readProtoBuf(token.getIdentifier());
     // Check basic details.
@@ -276,8 +293,7 @@ public class TestOzoneDelegationTokenSecretManager {
     id.setOmCertSerialId("1927393");
     id.setMaxDate(Time.now() + 60*60*24);
     id.setOwner(new Text("test"));
-    Assert.assertFalse(secretManager.verifySignature(id,
-        certificateClient.signData(id.getBytes())));
+    Assert.assertFalse(secretManager.verifySignature(id, id.getBytes()));
   }
 
   @Test


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message