hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aengin...@apache.org
Subject [hadoop] branch trunk updated: HDDS-2072. Make StorageContainerLocationProtocolService message based Contributed by Elek, Marton.
Date Thu, 03 Oct 2019 00:01:17 GMT
This is an automated email from the ASF dual-hosted git repository.

aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c24f24  HDDS-2072. Make StorageContainerLocationProtocolService message based Contributed by Elek, Marton.
4c24f24 is described below

commit 4c24f2434dd8c09bb104ee660975855eca287fe6
Author: Anu Engineer <aengineer@apache.org>
AuthorDate: Wed Oct 2 16:15:31 2019 -0700

    HDDS-2072. Make StorageContainerLocationProtocolService message based
    Contributed by Elek, Marton.
---
 ...inerLocationProtocolClientSideTranslatorPB.java | 411 +++++++++---------
 .../src/main/proto/ScmBlockLocationProtocol.proto  |   2 +-
 .../proto/StorageContainerLocationProtocol.proto   | 185 ++++----
 ...inerLocationProtocolServerSideTranslatorPB.java | 476 +++++++++++----------
 .../hdds/scm/server/SCMClientProtocolServer.java   |  12 +-
 .../ozone/insight/BaseInsightSubCommand.java       |   3 +
 .../scm/ScmProtocolBlockLocationInsight.java       |   2 +-
 ...va => ScmProtocolContainerLocationInsight.java} |  22 +-
 8 files changed, 570 insertions(+), 543 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
index ab3fcd1..01db597 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/StorageContainerLocationProtocolClientSideTranslatorPB.java
@@ -16,64 +16,57 @@
  */
 package org.apache.hadoop.hdds.scm.protocolPB;
 
-import com.google.common.base.Preconditions;
-import com.google.protobuf.RpcController;
-import com.google.protobuf.ServiceException;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest.Builder;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
 import org.apache.hadoop.hdds.scm.ScmInfo;
-import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.PipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.PipelineResponseProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
-import org.apache.hadoop.hdds.protocol.proto
-    .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
 import org.apache.hadoop.ipc.RPC;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
+import com.google.common.base.Preconditions;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
 
 /**
  * This class is the client-side translator to translate the requests made on
@@ -102,13 +95,34 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
   }
 
   /**
+   * Helper method to wrap the request and send the message.
+   */
+  private ScmContainerLocationResponse submitRequest(
+      StorageContainerLocationProtocolProtos.Type type,
+      Consumer<Builder> builderConsumer) throws IOException {
+    final ScmContainerLocationResponse response;
+    try {
+
+      Builder builder = ScmContainerLocationRequest.newBuilder()
+          .setCmdType(type)
+          .setTraceID(TracingUtil.exportCurrentSpan());
+      builderConsumer.accept(builder);
+      ScmContainerLocationRequest wrapper = builder.build();
+
+      response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
+    } catch (ServiceException ex) {
+      throw ProtobufHelper.getRemoteException(ex);
+    }
+    return response;
+  }
+
+  /**
    * Asks SCM where a container should be allocated. SCM responds with the set
    * of datanodes that should be used creating this container. Ozone/SCM only
    * supports replication factor of either 1 or 3.
-   * @param type - Replication Type
+   *
+   * @param type   - Replication Type
    * @param factor - Replication Count
-   * @return
-   * @throws IOException
    */
   @Override
   public ContainerWithPipeline allocateContainer(
@@ -122,12 +136,11 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         .setOwner(owner)
         .build();
 
-    final ContainerResponseProto response;
-    try {
-      response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    ContainerResponseProto response =
+        submitRequest(Type.AllocateContainer,
+            builder -> builder.setContainerRequest(request))
+            .getContainerResponse();
+    //TODO should be migrated to use the top level status structure.
     if (response.getErrorCode() != ContainerResponseProto.Error.success) {
       throw new IOException(response.hasErrorMessage() ?
           response.getErrorMessage() : "Allocate container failed.");
@@ -144,13 +157,12 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         .setContainerID(containerID)
         .setTraceID(TracingUtil.exportCurrentSpan())
         .build();
-    try {
-      GetContainerResponseProto response =
-          rpcProxy.getContainer(NULL_RPC_CONTROLLER, request);
-      return ContainerInfo.fromProtobuf(response.getContainerInfo());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    ScmContainerLocationResponse response =
+        submitRequest(Type.GetContainer,
+            (builder) -> builder.setGetContainerRequest(request));
+    return ContainerInfo
+        .fromProtobuf(response.getGetContainerResponse().getContainerInfo());
+
   }
 
   /**
@@ -164,14 +176,15 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         GetContainerWithPipelineRequestProto.newBuilder()
             .setTraceID(TracingUtil.exportCurrentSpan())
             .setContainerID(containerID).build();
-    try {
-      GetContainerWithPipelineResponseProto response =
-          rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
-      return ContainerWithPipeline.fromProtobuf(
-          response.getContainerWithPipeline());
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    ScmContainerLocationResponse response =
+        submitRequest(Type.GetContainerWithPipeline,
+            (builder) -> builder.setGetContainerWithPipelineRequest(request));
+
+    return ContainerWithPipeline.fromProtobuf(
+        response.getGetContainerWithPipelineResponse()
+            .getContainerWithPipeline());
+
   }
 
   /**
@@ -191,26 +204,22 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
     builder.setTraceID(TracingUtil.exportCurrentSpan());
     SCMListContainerRequestProto request = builder.build();
 
-    try {
-      SCMListContainerResponseProto response =
-          rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
-      List<ContainerInfo> containerList = new ArrayList<>();
-      for (HddsProtos.ContainerInfoProto containerInfoProto : response
-          .getContainersList()) {
-        containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
-      }
-      return containerList;
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
+    SCMListContainerResponseProto response =
+        submitRequest(Type.ListContainer,
+            builder1 -> builder1.setScmListContainerRequest(request))
+            .getScmListContainerResponse();
+    List<ContainerInfo> containerList = new ArrayList<>();
+    for (HddsProtos.ContainerInfoProto containerInfoProto : response
+        .getContainersList()) {
+      containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
     }
+    return containerList;
+
   }
 
   /**
    * Ask SCM to delete a container by name. SCM will remove
    * the container mapping in its database.
-   *
-   * @param containerID
-   * @throws IOException
    */
   @Override
   public void deleteContainer(long containerID)
@@ -222,18 +231,13 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         .setTraceID(TracingUtil.exportCurrentSpan())
         .setContainerID(containerID)
         .build();
-    try {
-      rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    submitRequest(Type.DeleteContainer,
+        builder -> builder.setScmDeleteContainerRequest(request));
+
   }
 
   /**
    * Queries a list of Node Statuses.
-   *
-   * @param nodeStatuses
-   * @return List of Datanodes.
    */
   @Override
   public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
@@ -246,21 +250,18 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         .setState(nodeStatuses)
         .setTraceID(TracingUtil.exportCurrentSpan())
         .setScope(queryScope).setPoolName(poolName).build();
-    try {
-      NodeQueryResponseProto response =
-          rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
-      return response.getDatanodesList();
-    } catch (ServiceException e) {
-      throw  ProtobufHelper.getRemoteException(e);
-    }
+    NodeQueryResponseProto response = submitRequest(Type.QueryNode,
+        builder -> builder.setNodeQueryRequest(request)).getNodeQueryResponse();
+    return response.getDatanodesList();
 
   }
 
   /**
    * Notify from client that creates object on datanodes.
-   * @param type object type
-   * @param id object id
-   * @param op operation type (e.g., create, close, delete)
+   *
+   * @param type  object type
+   * @param id    object id
+   * @param op    operation type (e.g., create, close, delete)
    * @param stage object creation stage : begin/complete
    */
   @Override
@@ -278,20 +279,17 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
             .setOp(op)
             .setStage(stage)
             .build();
-    try {
-      rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request);
-    } catch(ServiceException e){
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    submitRequest(Type.NotifyObjectStageChange,
+        builder -> builder.setObjectStageChangeRequest(request));
+
   }
 
   /**
    * Creates a replication pipeline of a specified type.
    *
    * @param replicationType - replication type
-   * @param factor - factor 1 or 3
-   * @param nodePool - optional machine list to build a pipeline.
-   * @throws IOException
+   * @param factor          - factor 1 or 3
+   * @param nodePool        - optional machine list to build a pipeline.
    */
   @Override
   public Pipeline createReplicationPipeline(HddsProtos.ReplicationType
@@ -303,87 +301,82 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         .setReplicationFactor(factor)
         .setReplicationType(replicationType)
         .build();
-    try {
-      PipelineResponseProto response =
-          rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request);
-      if (response.getErrorCode() ==
-          PipelineResponseProto.Error.success) {
-        Preconditions.checkState(response.hasPipeline(), "With success, " +
-            "must come a pipeline");
-        return Pipeline.getFromProtobuf(response.getPipeline());
-      } else {
-        String errorMessage = String.format("create replication pipeline " +
-                "failed. code : %s Message: %s", response.getErrorCode(),
-            response.hasErrorMessage() ? response.getErrorMessage() : "");
-        throw new IOException(errorMessage);
-      }
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
+
+    PipelineResponseProto response =
+        submitRequest(Type.AllocatePipeline,
+            builder -> builder.setPipelineRequest(request))
+            .getPipelineResponse();
+    if (response.getErrorCode() ==
+        PipelineResponseProto.Error.success) {
+      Preconditions.checkState(response.hasPipeline(), "With success, " +
+          "must come a pipeline");
+      return Pipeline.getFromProtobuf(response.getPipeline());
+    } else {
+      String errorMessage = String.format("create replication pipeline " +
+              "failed. code : %s Message: %s", response.getErrorCode(),
+          response.hasErrorMessage() ? response.getErrorMessage() : "");
+      throw new IOException(errorMessage);
     }
+
   }
 
   @Override
   public List<Pipeline> listPipelines() throws IOException {
-    try {
-      ListPipelineRequestProto request = ListPipelineRequestProto
-          .newBuilder().setTraceID(TracingUtil.exportCurrentSpan())
-          .build();
-      ListPipelineResponseProto response = rpcProxy.listPipelines(
-          NULL_RPC_CONTROLLER, request);
-      List<Pipeline> list = new ArrayList<>();
-      for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) {
-        Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline);
-        list.add(fromProtobuf);
-      }
-      return list;
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
+    ListPipelineRequestProto request = ListPipelineRequestProto
+        .newBuilder().setTraceID(TracingUtil.exportCurrentSpan())
+        .build();
+
+    ListPipelineResponseProto response = submitRequest(Type.ListPipelines,
+        builder -> builder.setListPipelineRequest(request))
+        .getListPipelineResponse();
+
+    List<Pipeline> list = new ArrayList<>();
+    for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) {
+      Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline);
+      list.add(fromProtobuf);
     }
+    return list;
+
   }
 
   @Override
   public void activatePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
-    try {
-      ActivatePipelineRequestProto request =
-          ActivatePipelineRequestProto.newBuilder()
-              .setTraceID(TracingUtil.exportCurrentSpan())
-              .setPipelineID(pipelineID)
-              .build();
-      rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    ActivatePipelineRequestProto request =
+        ActivatePipelineRequestProto.newBuilder()
+            .setTraceID(TracingUtil.exportCurrentSpan())
+            .setPipelineID(pipelineID)
+            .build();
+    submitRequest(Type.ActivatePipeline,
+        builder -> builder.setActivatePipelineRequest(request));
+
   }
 
   @Override
   public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
-    try {
-      DeactivatePipelineRequestProto request =
-          DeactivatePipelineRequestProto.newBuilder()
-              .setTraceID(TracingUtil.exportCurrentSpan())
-              .setPipelineID(pipelineID)
-              .build();
-      rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    DeactivatePipelineRequestProto request =
+        DeactivatePipelineRequestProto.newBuilder()
+            .setTraceID(TracingUtil.exportCurrentSpan())
+            .setPipelineID(pipelineID)
+            .build();
+    submitRequest(Type.DeactivatePipeline,
+        builder -> builder.setDeactivatePipelineRequest(request));
   }
 
   @Override
   public void closePipeline(HddsProtos.PipelineID pipelineID)
       throws IOException {
-    try {
-      ClosePipelineRequestProto request =
-          ClosePipelineRequestProto.newBuilder()
-              .setTraceID(TracingUtil.exportCurrentSpan())
-              .setPipelineID(pipelineID)
-          .build();
-      rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    ClosePipelineRequestProto request =
+        ClosePipelineRequestProto.newBuilder()
+            .setTraceID(TracingUtil.exportCurrentSpan())
+            .setPipelineID(pipelineID)
+            .build();
+    submitRequest(Type.ClosePipeline,
+        builder -> builder.setClosePipelineRequest(request));
+
   }
 
   @Override
@@ -392,16 +385,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
         HddsProtos.GetScmInfoRequestProto.newBuilder()
             .setTraceID(TracingUtil.exportCurrentSpan())
             .build();
-    try {
-      HddsProtos.GetScmInfoResponseProto resp = rpcProxy.getScmInfo(
-          NULL_RPC_CONTROLLER, request);
-      ScmInfo.Builder builder = new ScmInfo.Builder()
-          .setClusterId(resp.getClusterId())
-          .setScmId(resp.getScmId());
-      return builder.build();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    GetScmInfoResponseProto resp = submitRequest(Type.GetScmInfo,
+        builder -> builder.setGetScmInfoRequest(request))
+        .getGetScmInfoResponse();
+    ScmInfo.Builder builder = new ScmInfo.Builder()
+        .setClusterId(resp.getClusterId())
+        .setScmId(resp.getScmId());
+    return builder.build();
 
   }
 
@@ -409,73 +400,67 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
    * Check if SCM is in safe mode.
    *
    * @return Returns true if SCM is in safe mode else returns false.
-   * @throws IOException
    */
   @Override
   public boolean inSafeMode() throws IOException {
     InSafeModeRequestProto request =
         InSafeModeRequestProto.getDefaultInstance();
-    try {
-      InSafeModeResponseProto resp = rpcProxy.inSafeMode(
-          NULL_RPC_CONTROLLER, request);
-      return resp.getInSafeMode();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    return submitRequest(Type.InSafeMode,
+        builder -> builder.setInSafeModeRequest(request))
+        .getInSafeModeResponse().getInSafeMode();
+
   }
 
   /**
    * Force SCM out of Safe mode.
    *
    * @return returns true if operation is successful.
-   * @throws IOException
    */
   @Override
   public boolean forceExitSafeMode() throws IOException {
     ForceExitSafeModeRequestProto request =
         ForceExitSafeModeRequestProto.getDefaultInstance();
-    try {
-      ForceExitSafeModeResponseProto resp = rpcProxy
-          .forceExitSafeMode(NULL_RPC_CONTROLLER, request);
-      return resp.getExitedSafeMode();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+    ForceExitSafeModeResponseProto resp =
+        submitRequest(Type.ForceExitSafeMode,
+            builder -> builder.setForceExitSafeModeRequest(request))
+            .getForceExitSafeModeResponse();
+
+    return resp.getExitedSafeMode();
+
   }
 
   @Override
   public void startReplicationManager() throws IOException {
-    try {
-      StartReplicationManagerRequestProto request =
-          StartReplicationManagerRequestProto.getDefaultInstance();
-      rpcProxy.startReplicationManager(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    StartReplicationManagerRequestProto request =
+        StartReplicationManagerRequestProto.getDefaultInstance();
+    submitRequest(Type.StartReplicationManager,
+        builder -> builder.setStartReplicationManagerRequest(request));
+
   }
 
   @Override
   public void stopReplicationManager() throws IOException {
-    try {
-      StopReplicationManagerRequestProto request =
-          StopReplicationManagerRequestProto.getDefaultInstance();
-      rpcProxy.stopReplicationManager(NULL_RPC_CONTROLLER, request);
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    StopReplicationManagerRequestProto request =
+        StopReplicationManagerRequestProto.getDefaultInstance();
+    submitRequest(Type.StopReplicationManager,
+        builder -> builder.setStopReplicationManagerRequest(request));
+
   }
 
   @Override
   public boolean getReplicationManagerStatus() throws IOException {
-    try {
-      ReplicationManagerStatusRequestProto request =
-          ReplicationManagerStatusRequestProto.getDefaultInstance();
-      ReplicationManagerStatusResponseProto response =
-          rpcProxy.getReplicationManagerStatus(NULL_RPC_CONTROLLER, request);
-      return response.getIsRunning();
-    } catch (ServiceException e) {
-      throw ProtobufHelper.getRemoteException(e);
-    }
+
+    ReplicationManagerStatusRequestProto request =
+        ReplicationManagerStatusRequestProto.getDefaultInstance();
+    ReplicationManagerStatusResponseProto response =
+        submitRequest(Type.GetReplicationManagerStatus,
+            builder -> builder.setSeplicationManagerStatusRequest(request))
+            .getReplicationManagerStatusResponse();
+    return response.getIsRunning();
+
   }
 
   @Override
diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
index ded0d02..fc7a598 100644
--- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
@@ -26,7 +26,7 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto";
 option java_outer_classname = "ScmBlockLocationProtocolProtos";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
-package hadoop.hdds;
+package hadoop.hdds.block;
 
 import "hdds.proto";
 
diff --git a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
index 0c35876..8ea72b6 100644
--- a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -26,11 +26,101 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto";
 option java_outer_classname = "StorageContainerLocationProtocolProtos";
 option java_generic_services = true;
 option java_generate_equals_and_hash = true;
-package hadoop.hdds;
+package hadoop.hdds.container;
 
 import "hdds.proto";
 
 /**
+  All functions are dispatched as Request/Response under Ozone.
+  if you add newe functions, please add them using same pattern.
+*/
+message ScmContainerLocationRequest {
+  required Type cmdType = 1; // Type of the command
+
+  // A string that identifies this command, we generate  Trace ID in Ozone
+  // frontend and this allows us to trace that command all over ozone.
+  optional string traceID = 2;
+
+  optional ContainerRequestProto containerRequest = 6;
+  optional GetContainerRequestProto getContainerRequest = 7;
+  optional GetContainerWithPipelineRequestProto getContainerWithPipelineRequest = 8;
+  optional SCMListContainerRequestProto scmListContainerRequest = 9;
+  optional SCMDeleteContainerRequestProto scmDeleteContainerRequest = 10;
+  optional NodeQueryRequestProto nodeQueryRequest = 11;
+  optional ObjectStageChangeRequestProto objectStageChangeRequest = 12;
+  optional PipelineRequestProto pipelineRequest = 13;
+  optional ListPipelineRequestProto listPipelineRequest = 14;
+  optional ActivatePipelineRequestProto activatePipelineRequest = 15;
+  optional DeactivatePipelineRequestProto deactivatePipelineRequest = 16;
+  optional ClosePipelineRequestProto closePipelineRequest = 17;
+  optional GetScmInfoRequestProto getScmInfoRequest = 18;
+  optional InSafeModeRequestProto inSafeModeRequest = 19;
+  optional ForceExitSafeModeRequestProto forceExitSafeModeRequest = 20;
+  optional StartReplicationManagerRequestProto startReplicationManagerRequest = 21;
+  optional StopReplicationManagerRequestProto stopReplicationManagerRequest = 22;
+  optional ReplicationManagerStatusRequestProto seplicationManagerStatusRequest = 23;
+
+}
+
+message ScmContainerLocationResponse {
+  required Type cmdType = 1; // Type of the command
+
+  optional string traceID = 2;
+
+  optional bool success = 3 [default = true];
+
+  optional string message = 4;
+
+  required Status status = 5;
+
+  optional ContainerResponseProto containerResponse = 6;
+  optional GetContainerResponseProto getContainerResponse = 7;
+  optional GetContainerWithPipelineResponseProto getContainerWithPipelineResponse = 8;
+  optional SCMListContainerResponseProto scmListContainerResponse = 9;
+  optional SCMDeleteContainerResponseProto scmDeleteContainerResponse = 10;
+  optional NodeQueryResponseProto nodeQueryResponse = 11;
+  optional ObjectStageChangeResponseProto objectStageChangeResponse = 12;
+  optional PipelineResponseProto pipelineResponse = 13;
+  optional ListPipelineResponseProto listPipelineResponse = 14;
+  optional ActivatePipelineResponseProto activatePipelineResponse = 15;
+  optional DeactivatePipelineResponseProto deactivatePipelineResponse = 16;
+  optional ClosePipelineResponseProto closePipelineResponse = 17;
+  optional GetScmInfoResponseProto getScmInfoResponse = 18;
+  optional InSafeModeResponseProto inSafeModeResponse = 19;
+  optional ForceExitSafeModeResponseProto forceExitSafeModeResponse = 20;
+  optional StartReplicationManagerResponseProto startReplicationManagerResponse = 21;
+  optional StopReplicationManagerResponseProto stopReplicationManagerResponse = 22;
+  optional ReplicationManagerStatusResponseProto replicationManagerStatusResponse = 23;
+  enum Status {
+    OK = 1;
+    CONTAINER_ALREADY_EXISTS = 2;
+    CONTAINER_IS_MISSING = 3;
+  }
+}
+
+enum Type {
+
+  AllocateContainer = 1;
+  GetContainer = 2;
+  GetContainerWithPipeline = 3;
+  ListContainer = 4;
+  DeleteContainer = 5;
+  QueryNode = 6;
+  NotifyObjectStageChange = 7;
+  AllocatePipeline = 8;
+  ListPipelines = 9;
+  ActivatePipeline = 10;
+  DeactivatePipeline = 11;
+  ClosePipeline = 12;
+  GetScmInfo = 13;
+  InSafeMode = 14;
+  ForceExitSafeMode = 15;
+  StartReplicationManager = 16;
+  StopReplicationManager = 17;
+  GetReplicationManagerStatus = 18;
+}
+
+/**
 * Request send to SCM asking where the container should be created.
 */
 message ContainerRequestProto {
@@ -235,97 +325,6 @@ message ReplicationManagerStatusResponseProto {
  * and response messages for details of the RPC calls.
  */
 service StorageContainerLocationProtocolService {
+  rpc submitRequest (ScmContainerLocationRequest) returns (ScmContainerLocationResponse);
 
-  /**
-   * Creates a container entry in SCM.
-   */
-  rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
-
-  /**
-   * Returns the pipeline for a given container.
-   */
-  rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
-
-  /**
-   * Returns the pipeline for a given container.
-   */
-  rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
-
-  rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
-
-  /**
-   * Deletes a container in SCM.
-   */
-  rpc deleteContainer(SCMDeleteContainerRequestProto) returns (SCMDeleteContainerResponseProto);
-
-  /**
-  * Returns a set of Nodes that meet a criteria.
-  */
-  rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
-
-  /**
-  * Notify from client when begin or finish container or pipeline operations on datanodes.
-  */
-  rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns (ObjectStageChangeResponseProto);
-
-  /*
-  *  Apis that Manage Pipelines.
-  *
-  * Pipelines are abstractions offered by SCM and Datanode that allows users
-  * to create a replication pipeline.
-  *
-  *  These following APIs allow command line programs like SCM CLI to list
-  * and manage pipelines.
-  */
-
-  /**
-  *  Creates a replication pipeline.
-  */
-  rpc allocatePipeline(PipelineRequestProto)
-      returns (PipelineResponseProto);
-
-  /**
-   * Returns the list of Pipelines managed by SCM.
-   */
-  rpc listPipelines(ListPipelineRequestProto)
-      returns (ListPipelineResponseProto);
-
-  rpc activatePipeline(ActivatePipelineRequestProto)
-      returns (ActivatePipelineResponseProto);
-
-  rpc deactivatePipeline(DeactivatePipelineRequestProto)
-      returns (DeactivatePipelineResponseProto);
-
-  /**
-   * Closes a pipeline.
-   */
-  rpc closePipeline(ClosePipelineRequestProto)
-      returns (ClosePipelineResponseProto);
-
-  /**
-  *  Returns information about SCM.
-  */
-  rpc getScmInfo(GetScmInfoRequestProto)
-      returns (GetScmInfoResponseProto);
-
-  /**
-  *  Checks if SCM is in SafeMode.
-  */
-  rpc inSafeMode(InSafeModeRequestProto)
-  returns (InSafeModeResponseProto);
-
-  /**
-  *  Returns information about SCM.
-  */
-  rpc forceExitSafeMode(ForceExitSafeModeRequestProto)
-  returns (ForceExitSafeModeResponseProto);
-
-  rpc startReplicationManager(StartReplicationManagerRequestProto)
-  returns (StartReplicationManagerResponseProto);
-
-  rpc stopReplicationManager(StopReplicationManagerRequestProto)
-  returns (StopReplicationManagerResponseProto);
-
-  rpc getReplicationManagerStatus(ReplicationManagerStatusRequestProto)
-  returns (ReplicationManagerStatusResponseProto);
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
index 9d53dbf..0d2f470 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -1,4 +1,3 @@
-
 /**
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
@@ -7,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- *
+ * <p>
  * http://www.apache.org/licenses/LICENSE-2.0
- *
+ * <p>
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -42,16 +41,18 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
-import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
 import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
@@ -61,11 +62,13 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.hdds.tracing.TracingUtil;
+import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import io.opentracing.Scope;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is the server-side translator that forwards requests received on
@@ -76,288 +79,315 @@ import io.opentracing.Scope;
 public final class StorageContainerLocationProtocolServerSideTranslatorPB
     implements StorageContainerLocationProtocolPB {
 
+  private static final Logger LOG =
+      LoggerFactory.getLogger(
+          StorageContainerLocationProtocolServerSideTranslatorPB.class);
+
   private final StorageContainerLocationProtocol impl;
 
+  private OzoneProtocolMessageDispatcher<ScmContainerLocationRequest,
+      ScmContainerLocationResponse>
+      dispatcher;
+
   /**
    * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
    *
-   * @param impl {@link StorageContainerLocationProtocol} server implementation
+   * @param impl            {@link StorageContainerLocationProtocol} server
+   *                        implementation
+   * @param protocolMetrics
    */
   public StorageContainerLocationProtocolServerSideTranslatorPB(
-      StorageContainerLocationProtocol impl) throws IOException {
+      StorageContainerLocationProtocol impl,
+      ProtocolMessageMetrics protocolMetrics) throws IOException {
     this.impl = impl;
+    this.dispatcher =
+        new OzoneProtocolMessageDispatcher<>("ScmContainerLocation",
+            protocolMetrics, LOG);
   }
 
   @Override
-  public ContainerResponseProto allocateContainer(RpcController unused,
-      ContainerRequestProto request) throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("allocateContainer", request.getTraceID())) {
-      ContainerWithPipeline containerWithPipeline = impl
-          .allocateContainer(request.getReplicationType(),
-              request.getReplicationFactor(), request.getOwner());
-      return ContainerResponseProto.newBuilder()
-          .setContainerWithPipeline(containerWithPipeline.getProtobuf())
-          .setErrorCode(ContainerResponseProto.Error.success)
-          .build();
+  public ScmContainerLocationResponse submitRequest(RpcController controller,
+      ScmContainerLocationRequest request) throws ServiceException {
+    return dispatcher
+        .processRequest(request, this::processRequest, request.getCmdType(),
+            request.getTraceID());
+  }
+
+  public ScmContainerLocationResponse processRequest(
+      ScmContainerLocationRequest request) throws ServiceException {
+    try {
+      switch (request.getCmdType()) {
+      case AllocateContainer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setContainerResponse(
+                allocateContainer(request.getContainerRequest()))
+            .build();
+      case GetContainer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetContainerResponse(
+                getContainer(request.getGetContainerRequest()))
+            .build();
+      case GetContainerWithPipeline:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetContainerWithPipelineResponse(getContainerWithPipeline(
+                request.getGetContainerWithPipelineRequest()))
+            .build();
+      case ListContainer:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setScmListContainerResponse(listContainer(
+                request.getScmListContainerRequest()))
+            .build();
+      case QueryNode:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setNodeQueryResponse(queryNode(request.getNodeQueryRequest()))
+            .build();
+      case NotifyObjectStageChange:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setObjectStageChangeResponse(notifyObjectStageChange(
+                request.getObjectStageChangeRequest()))
+            .build();
+      case ListPipelines:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setListPipelineResponse(listPipelines(
+                request.getListPipelineRequest()))
+            .build();
+      case ActivatePipeline:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setActivatePipelineResponse(activatePipeline(
+                request.getActivatePipelineRequest()))
+            .build();
+      case GetScmInfo:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setGetScmInfoResponse(getScmInfo(
+                request.getGetScmInfoRequest()))
+            .build();
+      case InSafeMode:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setInSafeModeResponse(inSafeMode(
+                request.getInSafeModeRequest()))
+            .build();
+      case ForceExitSafeMode:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setForceExitSafeModeResponse(forceExitSafeMode(
+                request.getForceExitSafeModeRequest()))
+            .build();
+      case StartReplicationManager:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setStartReplicationManagerResponse(startReplicationManager(
+                request.getStartReplicationManagerRequest()))
+            .build();
+      case StopReplicationManager:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setStopReplicationManagerResponse(stopReplicationManager(
+                request.getStopReplicationManagerRequest()))
+            .build();
+      case GetReplicationManagerStatus:
+        return ScmContainerLocationResponse.newBuilder()
+            .setCmdType(request.getCmdType())
+            .setStatus(Status.OK)
+            .setReplicationManagerStatusResponse(getReplicationManagerStatus(
+                request.getSeplicationManagerStatusRequest()))
+            .build();
+      default:
+        throw new IllegalArgumentException(
+            "Unknown command type: " + request.getCmdType());
+      }
 
     } catch (IOException e) {
       throw new ServiceException(e);
     }
   }
 
-  @Override
+  public ContainerResponseProto allocateContainer(ContainerRequestProto request)
+      throws IOException {
+    ContainerWithPipeline containerWithPipeline = impl
+        .allocateContainer(request.getReplicationType(),
+            request.getReplicationFactor(), request.getOwner());
+    return ContainerResponseProto.newBuilder()
+        .setContainerWithPipeline(containerWithPipeline.getProtobuf())
+        .setErrorCode(ContainerResponseProto.Error.success)
+        .build();
+
+  }
+
   public GetContainerResponseProto getContainer(
-      RpcController controller, GetContainerRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("getContainer", request.getTraceID())) {
-      ContainerInfo container = impl.getContainer(request.getContainerID());
-      return GetContainerResponseProto.newBuilder()
-          .setContainerInfo(container.getProtobuf())
-          .build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      GetContainerRequestProto request) throws IOException {
+    ContainerInfo container = impl.getContainer(request.getContainerID());
+    return GetContainerResponseProto.newBuilder()
+        .setContainerInfo(container.getProtobuf())
+        .build();
   }
 
-  @Override
   public GetContainerWithPipelineResponseProto getContainerWithPipeline(
-      RpcController controller, GetContainerWithPipelineRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("getContainerWithPipeline",
-            request.getTraceID())) {
-      ContainerWithPipeline container = impl
-          .getContainerWithPipeline(request.getContainerID());
-      return GetContainerWithPipelineResponseProto.newBuilder()
-          .setContainerWithPipeline(container.getProtobuf())
-          .build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      GetContainerWithPipelineRequestProto request)
+      throws IOException {
+    ContainerWithPipeline container = impl
+        .getContainerWithPipeline(request.getContainerID());
+    return GetContainerWithPipelineResponseProto.newBuilder()
+        .setContainerWithPipeline(container.getProtobuf())
+        .build();
   }
 
-  @Override
-  public SCMListContainerResponseProto listContainer(RpcController controller,
-      SCMListContainerRequestProto request) throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("listContainer", request.getTraceID())) {
-      long startContainerID = 0;
-      int count = -1;
-
-      // Arguments check.
-      if (request.hasStartContainerID()) {
-        // End container name is given.
-        startContainerID = request.getStartContainerID();
-      }
-      count = request.getCount();
-      List<ContainerInfo> containerList =
-          impl.listContainer(startContainerID, count);
-      SCMListContainerResponseProto.Builder builder =
-          SCMListContainerResponseProto.newBuilder();
-      for (ContainerInfo container : containerList) {
-        builder.addContainers(container.getProtobuf());
-      }
-      return builder.build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
+  public SCMListContainerResponseProto listContainer(
+      SCMListContainerRequestProto request) throws IOException {
+
+    long startContainerID = 0;
+    int count = -1;
+
+    // Arguments check.
+    if (request.hasStartContainerID()) {
+      // End container name is given.
+      startContainerID = request.getStartContainerID();
     }
+    count = request.getCount();
+    List<ContainerInfo> containerList =
+        impl.listContainer(startContainerID, count);
+    SCMListContainerResponseProto.Builder builder =
+        SCMListContainerResponseProto.newBuilder();
+    for (ContainerInfo container : containerList) {
+      builder.addContainers(container.getProtobuf());
+    }
+    return builder.build();
   }
 
-  @Override
   public SCMDeleteContainerResponseProto deleteContainer(
-      RpcController controller, SCMDeleteContainerRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("deleteContainer", request.getTraceID())) {
-      impl.deleteContainer(request.getContainerID());
-      return SCMDeleteContainerResponseProto.newBuilder().build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      SCMDeleteContainerRequestProto request)
+      throws IOException {
+    impl.deleteContainer(request.getContainerID());
+    return SCMDeleteContainerResponseProto.newBuilder().build();
+
   }
 
-  @Override
-  public StorageContainerLocationProtocolProtos.NodeQueryResponseProto
-      queryNode(RpcController controller,
+  public NodeQueryResponseProto queryNode(
       StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("queryNode", request.getTraceID())) {
-      HddsProtos.NodeState nodeState = request.getState();
-      List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
-          request.getScope(), request.getPoolName());
-      return StorageContainerLocationProtocolProtos
-          .NodeQueryResponseProto.newBuilder()
-          .addAllDatanodes(datanodes)
-          .build();
-    } catch (Exception e) {
-      throw new ServiceException(e);
-    }
-  }
+      throws IOException {
+
+    HddsProtos.NodeState nodeState = request.getState();
+    List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
+        request.getScope(), request.getPoolName());
+    return NodeQueryResponseProto.newBuilder()
+        .addAllDatanodes(datanodes)
+        .build();
 
-  @Override
-  public ObjectStageChangeResponseProto notifyObjectStageChange(
-      RpcController controller, ObjectStageChangeRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("notifyObjectStageChange",
-            request.getTraceID())) {
-      impl.notifyObjectStageChange(request.getType(), request.getId(),
-          request.getOp(), request.getStage());
-      return ObjectStageChangeResponseProto.newBuilder().build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
   }
 
-  @Override
-  public PipelineResponseProto allocatePipeline(
-      RpcController controller, PipelineRequestProto request)
-      throws ServiceException {
-    // TODO : Wiring this up requires one more patch.
-    return null;
+  public ObjectStageChangeResponseProto notifyObjectStageChange(
+      ObjectStageChangeRequestProto request)
+      throws IOException {
+    impl.notifyObjectStageChange(request.getType(), request.getId(),
+        request.getOp(), request.getStage());
+    return ObjectStageChangeResponseProto.newBuilder().build();
   }
 
-  @Override
   public ListPipelineResponseProto listPipelines(
-      RpcController controller, ListPipelineRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("listPipelines", request.getTraceID())) {
-      ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
-          .newBuilder();
-      List<Pipeline> pipelines = impl.listPipelines();
-      for (Pipeline pipeline : pipelines) {
-        HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage();
-        builder.addPipelines(protobufMessage);
-      }
-      return builder.build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
+      ListPipelineRequestProto request)
+      throws IOException {
+    ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
+        .newBuilder();
+    List<Pipeline> pipelines = impl.listPipelines();
+    for (Pipeline pipeline : pipelines) {
+      HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage();
+      builder.addPipelines(protobufMessage);
     }
+    return builder.build();
   }
 
-  @Override
   public ActivatePipelineResponseProto activatePipeline(
-      RpcController controller, ActivatePipelineRequestProto request)
-      throws ServiceException {
-    try (Scope ignored = TracingUtil
-        .importAndCreateScope("activatePipeline", request.getTraceID())) {
-      impl.activatePipeline(request.getPipelineID());
-      return ActivatePipelineResponseProto.newBuilder().build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      ActivatePipelineRequestProto request)
+      throws IOException {
+    impl.activatePipeline(request.getPipelineID());
+    return ActivatePipelineResponseProto.newBuilder().build();
   }
 
-  @Override
   public DeactivatePipelineResponseProto deactivatePipeline(
-      RpcController controller, DeactivatePipelineRequestProto request)
-      throws ServiceException {
-    try (Scope ignored = TracingUtil
-        .importAndCreateScope("deactivatePipeline", request.getTraceID())) {
-      impl.deactivatePipeline(request.getPipelineID());
-      return DeactivatePipelineResponseProto.newBuilder().build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      DeactivatePipelineRequestProto request)
+      throws IOException {
+    impl.deactivatePipeline(request.getPipelineID());
+    return DeactivatePipelineResponseProto.newBuilder().build();
   }
 
-  @Override
   public ClosePipelineResponseProto closePipeline(
       RpcController controller, ClosePipelineRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("closePipeline", request.getTraceID())) {
-      impl.closePipeline(request.getPipelineID());
-      return ClosePipelineResponseProto.newBuilder().build();
-    } catch (IOException e) {
-      throw new ServiceException(e);
-    }
+      throws IOException {
+
+    impl.closePipeline(request.getPipelineID());
+    return ClosePipelineResponseProto.newBuilder().build();
+
   }
 
-  @Override
   public HddsProtos.GetScmInfoResponseProto getScmInfo(
-      RpcController controller, HddsProtos.GetScmInfoRequestProto req)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("getScmInfo", req.getTraceID())) {
-      ScmInfo scmInfo = impl.getScmInfo();
-      return HddsProtos.GetScmInfoResponseProto.newBuilder()
-          .setClusterId(scmInfo.getClusterId())
-          .setScmId(scmInfo.getScmId())
-          .build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      HddsProtos.GetScmInfoRequestProto req)
+      throws IOException {
+    ScmInfo scmInfo = impl.getScmInfo();
+    return HddsProtos.GetScmInfoResponseProto.newBuilder()
+        .setClusterId(scmInfo.getClusterId())
+        .setScmId(scmInfo.getScmId())
+        .build();
 
   }
 
-  @Override
   public InSafeModeResponseProto inSafeMode(
-      RpcController controller,
-      InSafeModeRequestProto request) throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("inSafeMode", request.getTraceID())) {
-      return InSafeModeResponseProto.newBuilder()
-          .setInSafeMode(impl.inSafeMode()).build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      InSafeModeRequestProto request) throws IOException {
+
+    return InSafeModeResponseProto.newBuilder()
+        .setInSafeMode(impl.inSafeMode()).build();
+
   }
 
-  @Override
   public ForceExitSafeModeResponseProto forceExitSafeMode(
-      RpcController controller, ForceExitSafeModeRequestProto request)
-      throws ServiceException {
-    try (Scope scope = TracingUtil
-        .importAndCreateScope("forceExitSafeMode", request.getTraceID())) {
-      return ForceExitSafeModeResponseProto.newBuilder()
-          .setExitedSafeMode(impl.forceExitSafeMode()).build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      ForceExitSafeModeRequestProto request)
+      throws IOException {
+    return ForceExitSafeModeResponseProto.newBuilder()
+        .setExitedSafeMode(impl.forceExitSafeMode()).build();
+
   }
 
-  @Override
   public StartReplicationManagerResponseProto startReplicationManager(
-      RpcController controller, StartReplicationManagerRequestProto request)
-      throws ServiceException {
-    try (Scope ignored = TracingUtil.importAndCreateScope(
-        "startReplicationManager", request.getTraceID())) {
-      impl.startReplicationManager();
-      return StartReplicationManagerResponseProto.newBuilder().build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      StartReplicationManagerRequestProto request)
+      throws IOException {
+    impl.startReplicationManager();
+    return StartReplicationManagerResponseProto.newBuilder().build();
   }
 
-  @Override
   public StopReplicationManagerResponseProto stopReplicationManager(
-      RpcController controller, StopReplicationManagerRequestProto request)
-      throws ServiceException {
-    try (Scope ignored = TracingUtil.importAndCreateScope(
-        "stopReplicationManager", request.getTraceID())) {
-      impl.stopReplicationManager();
-      return StopReplicationManagerResponseProto.newBuilder().build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      StopReplicationManagerRequestProto request)
+      throws IOException {
+    impl.stopReplicationManager();
+    return StopReplicationManagerResponseProto.newBuilder().build();
+
   }
 
-  @Override
   public ReplicationManagerStatusResponseProto getReplicationManagerStatus(
-      RpcController controller, ReplicationManagerStatusRequestProto request)
-      throws ServiceException {
-    try (Scope ignored = TracingUtil.importAndCreateScope(
-        "getReplicationManagerStatus", request.getTraceID())) {
-      return ReplicationManagerStatusResponseProto.newBuilder()
-          .setIsRunning(impl.getReplicationManagerStatus()).build();
-    } catch (IOException ex) {
-      throw new ServiceException(ex);
-    }
+      ReplicationManagerStatusRequestProto request)
+      throws IOException {
+    return ReplicationManagerStatusResponseProto.newBuilder()
+        .setIsRunning(impl.getReplicationManagerStatus()).build();
   }
 
 }
diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
index e0136e8..9c27f6a 100644
--- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
+++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.audit.AuditMessage;
 import org.apache.hadoop.ozone.audit.Auditor;
 import org.apache.hadoop.ozone.audit.SCMAction;
 import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
+import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -103,6 +104,7 @@ public class SCMClientProtocolServer implements
   private final StorageContainerManager scm;
   private final OzoneConfiguration conf;
   private SafeModePrecheck safeModePrecheck;
+  private final ProtocolMessageMetrics protocolMetrics;
 
   public SCMClientProtocolServer(OzoneConfiguration conf,
       StorageContainerManager scm) throws IOException {
@@ -115,10 +117,16 @@ public class SCMClientProtocolServer implements
     RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
         ProtobufRpcEngine.class);
 
+    protocolMetrics = ProtocolMessageMetrics
+        .create("ScmContainerLocationProtocol",
+            "SCM ContainerLocation protocol metrics",
+            StorageContainerLocationProtocolProtos.Type.values());
+
     // SCM Container Service RPC
     BlockingService storageProtoPbService =
         newReflectiveBlockingService(
-            new StorageContainerLocationProtocolServerSideTranslatorPB(this));
+            new StorageContainerLocationProtocolServerSideTranslatorPB(this,
+                protocolMetrics));
 
     final InetSocketAddress scmAddress = HddsServerUtil
         .getScmClientBindAddress(conf);
@@ -147,6 +155,7 @@ public class SCMClientProtocolServer implements
   }
 
   public void start() {
+    protocolMetrics.register();
     LOG.info(
         StorageContainerManager.buildRpcServerStartMessage(
             "RPC server for Client ", getClientRpcAddress()));
@@ -154,6 +163,7 @@ public class SCMClientProtocolServer implements
   }
 
   public void stop() {
+    protocolMetrics.unregister();
     try {
       LOG.info("Stopping the RPC server for Client Protocol");
       getClientRpcServer().stop();
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
index a9f4b94..4c3875c 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/BaseInsightSubCommand.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight;
 import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
 import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
+import org.apache.hadoop.ozone.insight.scm.ScmProtocolContainerLocationInsight;
 import org.apache.hadoop.ozone.insight.scm.ScmProtocolSecurityInsight;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
 
@@ -89,6 +90,8 @@ public class BaseInsightSubCommand {
     insights.put("scm.event-queue", new EventQueueInsight());
     insights.put("scm.protocol.block-location",
         new ScmProtocolBlockLocationInsight());
+    insights.put("scm.protocol.container-location",
+        new ScmProtocolContainerLocationInsight());
     insights.put("scm.protocol.security",
              new ScmProtocolSecurityInsight());
     insights.put("om.key-manager", new KeyManagerInsight());
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
index 31c73c0..f67f641 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
@@ -23,12 +23,12 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
 import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
 
 /**
  * Insight metric to check the SCM block location protocol behaviour.
diff --git a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java
similarity index 70%
copy from hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
copy to hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java
index 31c73c0..d6db589 100644
--- a/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolBlockLocationInsight.java
+++ b/hadoop-ozone/insight/src/main/java/org/apache/hadoop/ozone/insight/scm/ScmProtocolContainerLocationInsight.java
@@ -22,9 +22,9 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
-import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
-import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
 import org.apache.hadoop.ozone.insight.BaseInsightPoint;
 import org.apache.hadoop.ozone.insight.Component.Type;
 import org.apache.hadoop.ozone.insight.LoggerSource;
@@ -33,18 +33,18 @@ import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
 /**
  * Insight metric to check the SCM block location protocol behaviour.
  */
-public class ScmProtocolBlockLocationInsight extends BaseInsightPoint {
+public class ScmProtocolContainerLocationInsight extends BaseInsightPoint {
 
   @Override
   public List<LoggerSource> getRelatedLoggers(boolean verbose) {
     List<LoggerSource> loggers = new ArrayList<>();
     loggers.add(
         new LoggerSource(Type.SCM,
-            ScmBlockLocationProtocolServerSideTranslatorPB.class,
+            StorageContainerLocationProtocolServerSideTranslatorPB.class,
             defaultLevel(verbose)));
-    loggers.add(new LoggerSource(Type.SCM,
-        SCMBlockProtocolServer.class,
-        defaultLevel(verbose)));
+    new LoggerSource(Type.SCM,
+        StorageContainerLocationProtocolService.class,
+        defaultLevel(verbose));
     return loggers;
   }
 
@@ -57,15 +57,15 @@ public class ScmProtocolBlockLocationInsight extends BaseInsightPoint {
 
     addRpcMetrics(metrics, Type.SCM, filter);
 
-    addProtocolMessageMetrics(metrics, "scm_block_location_protocol",
-        Type.SCM, ScmBlockLocationProtocolProtos.Type.values());
+    addProtocolMessageMetrics(metrics, "scm_container_location_protocol",
+        Type.SCM, StorageContainerLocationProtocolProtos.Type.values());
 
     return metrics;
   }
 
   @Override
   public String getDescription() {
-    return "SCM Block location protocol endpoint";
+    return "SCM Container location protocol endpoint";
   }
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message