hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aajis...@apache.org
Subject [hadoop] 02/05: Revert "HDDS-1072. Implement RetryProxy and FailoverProxy for OM client."
Date Mon, 04 Mar 2019 08:00:17 GMT
This is an automated email from the ASF dual-hosted git repository.

aajisaka pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit b18c1c22ea238c4b783031402496164f0351b531
Author: Hanisha Koneru <hanishakoneru@apache.org>
AuthorDate: Fri Mar 1 20:05:12 2019 -0800

    Revert "HDDS-1072. Implement RetryProxy and FailoverProxy for OM client."
    
    This reverts commit 8e1225991d8da7d6801fc3753319139873f23bc9.
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |  17 --
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   3 -
 .../common/src/main/resources/ozone-default.xml    |  43 +---
 .../ozone/client/protocol/ClientProtocol.java      |   4 +-
 .../hadoop/ozone/client/rest/RestClient.java       |   4 +-
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  18 +-
 .../hadoop/ozone/client/rpc/ha/OMProxyInfo.java}   |  32 ++-
 .../ozone/client/rpc/ha/OMProxyProvider.java       | 177 ++++++++++++++
 .../hadoop/ozone/client/rpc}/ha/package-info.java  |   2 +-
 .../ozone/om/ha/OMFailoverProxyProvider.java       | 266 ---------------------
 .../ozone/om/protocol/OzoneManagerProtocol.java    |   7 -
 ...OzoneManagerProtocolClientSideTranslatorPB.java | 105 +-------
 .../src/main/proto/OzoneManagerProtocol.proto      |   2 -
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  38 ++-
 .../client/rpc/TestOzoneRpcClientAbstract.java     |   8 +-
 .../apache/hadoop/ozone/om/TestOzoneManagerHA.java | 184 +++-----------
 .../org/apache/hadoop/ozone/om/OzoneManager.java   |   6 -
 .../hadoop/ozone/om/ratis/OMRatisHelper.java       |   9 +-
 .../ozone/om/ratis/OzoneManagerRatisClient.java    |  27 +--
 ...OzoneManagerProtocolServerSideTranslatorPB.java |   3 +-
 .../om/ratis/TestOzoneManagerRatisServer.java      |  23 ++
 21 files changed, 322 insertions(+), 656 deletions(-)

diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index 0d73905..cd40f7c 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -379,23 +379,6 @@ public final class OzoneConfigKeys {
   public static final String OZONE_FS_ISOLATED_CLASSLOADER =
       "ozone.fs.isolated-classloader";
 
-  // Ozone Client Retry and Failover configurations
-  public static final String OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY =
-      "ozone.client.retry.max.attempts";
-  public static final int OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT =
-      10;
-  public static final String OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY =
-      "ozone.client.failover.max.attempts";
-  public static final int OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT =
-      15;
-  public static final String OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY =
-      "ozone.client.failover.sleep.base.millis";
-  public static final int OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT =
-      500;
-  public static final String OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY =
-      "ozone.client.failover.sleep.max.millis";
-  public static final int OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT =
-      15000;
 
   public static final String OZONE_FREON_HTTP_ENABLED_KEY =
       "ozone.freon.http.enabled";
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index 8e3b02a..45b46b8 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -276,7 +276,4 @@ public final class OzoneConsts {
 
   // Default OMServiceID for OM Ratis servers to use as RaftGroupId
   public static final String OM_SERVICE_ID_DEFAULT = "omServiceIdDefault";
-
-  // Dummy OMNodeID for OM Clients to use for a non-HA OM setup
-  public static final String OM_NODE_ID_DUMMY = "omNodeIdDummy";
 }
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index f7fecb7..8469fdc 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2029,45 +2029,4 @@
     </description>
   </property>
 
-  <property>
-    <name>ozone.client.retry.max.attempts</name>
-    <value>10</value>
-    <description>
-      Max retry attempts for Ozone RpcClient talking to OzoneManagers.
-    </description>
-  </property>
-  <property>
-    <name>ozone.client.failover.max.attempts</name>
-    <value>15</value>
-    <description>
-      Expert only. The number of client failover attempts that should be
-      made before the failover is considered failed.
-    </description>
-  </property>
-  <property>
-    <name>ozone.client.failover.sleep.base.millis</name>
-    <value>500</value>
-    <description>
-      Expert only. The time to wait, in milliseconds, between failover
-      attempts increases exponentially as a function of the number of
-      attempts made so far, with a random factor of +/- 50%. This option
-      specifies the base value used in the failover calculation. The
-      first failover will retry immediately. The 2nd failover attempt
-      will delay at least ozone.client.failover.sleep.base.millis
-      milliseconds. And so on.
-    </description>
-  </property>
-  <property>
-    <name>ozone.client.failover.sleep.max.millis</name>
-    <value>15000</value>
-    <description>
-      Expert only. The time to wait, in milliseconds, between failover
-      attempts increases exponentially as a function of the number of
-      attempts made so far, with a random factor of +/- 50%. This option
-      specifies the maximum value to wait between failovers.
-      Specifically, the time between two failover attempts will not
-      exceed +/- 50% of ozone.client.failover.sleep.max.millis
-      milliseconds.
-    </description>
-  </property>
-</configuration>
\ No newline at end of file
+</configuration>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
index 2bf9089..494afae 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/protocol/ClientProtocol.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 
@@ -510,5 +510,5 @@ public interface ClientProtocol {
   S3SecretValue getS3Secret(String kerberosID) throws IOException;
 
   @VisibleForTesting
-  OMFailoverProxyProvider getOMProxyProvider();
+  OMProxyProvider getOMProxyProvider();
 }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
index eea2809..b69d972 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rest/RestClient.java
@@ -42,8 +42,8 @@ import org.apache.hadoop.ozone.client.rest.headers.Header;
 import org.apache.hadoop.ozone.client.rest.response.BucketInfo;
 import org.apache.hadoop.ozone.client.rest.response.KeyInfoDetails;
 import org.apache.hadoop.ozone.client.rest.response.VolumeInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
@@ -725,7 +725,7 @@ public class RestClient implements ClientProtocol {
   }
 
   @Override
-  public OMFailoverProxyProvider getOMProxyProvider() {
+  public OMProxyProvider getOMProxyProvider() {
     return null;
   }
 
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 4b44770..0875046 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -50,8 +50,8 @@ import org.apache.hadoop.ozone.client.io.LengthInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.BucketEncryptionKeyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
@@ -66,8 +66,6 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
 import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
-import org.apache.hadoop.ozone.om.protocolPB
-    .OzoneManagerProtocolClientSideTranslatorPB;
 import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.ozone.OzoneAcl;
@@ -87,7 +85,6 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.io.Text;
 import org.apache.logging.log4j.util.Strings;
-import org.apache.ratis.protocol.ClientId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -110,6 +107,7 @@ public class RpcClient implements ClientProtocol {
   private final OzoneConfiguration conf;
   private final StorageContainerLocationProtocol
       storageContainerLocationClient;
+  private final OMProxyProvider omProxyProvider;
   private final OzoneManagerProtocol ozoneManagerClient;
   private final XceiverClientManager xceiverClientManager;
   private final int chunkSize;
@@ -123,7 +121,6 @@ public class RpcClient implements ClientProtocol {
   private final long streamBufferMaxSize;
   private final long blockSize;
   private final long watchTimeout;
-  private final ClientId clientId = ClientId.randomId();
 
    /**
     * Creates RpcClient instance with the given configuration.
@@ -140,8 +137,11 @@ public class RpcClient implements ClientProtocol {
         OMConfigKeys.OZONE_OM_GROUP_RIGHTS_DEFAULT);
     RPC.setProtocolEngine(conf, OzoneManagerProtocolPB.class,
         ProtobufRpcEngine.class);
-    this.ozoneManagerClient = new OzoneManagerProtocolClientSideTranslatorPB(
-        this.conf, clientId.toString(), ugi);
+    this.omProxyProvider = new OMProxyProvider(conf, ugi);
+    this.ozoneManagerClient =
+        TracingUtil.createProxy(
+            this.omProxyProvider.getProxy(),
+            OzoneManagerProtocol.class);
 
     long scmVersion =
         RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
@@ -492,8 +492,8 @@ public class RpcClient implements ClientProtocol {
 
   @Override
   @VisibleForTesting
-  public OMFailoverProxyProvider getOMProxyProvider() {
-    return ozoneManagerClient.getOMFailoverProxyProvider();
+  public OMProxyProvider getOMProxyProvider() {
+    return omProxyProvider;
   }
 
   @Override
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
similarity index 53%
copy from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
copy to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
index a95f09f..01e5562 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyInfo.java
@@ -16,8 +16,34 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.ha;
+package org.apache.hadoop.ozone.client.rpc.ha;
+
+import org.apache.hadoop.ozone.om.protocolPB
+    .OzoneManagerProtocolClientSideTranslatorPB;
+
+import java.net.InetSocketAddress;
 
 /**
- * This package contains Ozone Client's OM Proxy classes.
- */
\ No newline at end of file
+ * Proxy information of OM.
+ */
+public final class OMProxyInfo {
+  private InetSocketAddress address;
+  private OzoneManagerProtocolClientSideTranslatorPB omClient;
+
+  public OMProxyInfo(InetSocketAddress addr) {
+    this.address = addr;
+  }
+
+  public InetSocketAddress getAddress() {
+    return address;
+  }
+
+  public OzoneManagerProtocolClientSideTranslatorPB getOMProxy() {
+    return omClient;
+  }
+
+  public void setOMProxy(
+      OzoneManagerProtocolClientSideTranslatorPB clientProxy) {
+    this.omClient = clientProxy;
+  }
+}
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
new file mode 100644
index 0000000..574cb5f
--- /dev/null
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/OMProxyProvider.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.client.rpc.ha;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.protocolPB
+    .OzoneManagerProtocolClientSideTranslatorPB;
+import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.ratis.protocol.ClientId;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
+import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
+
+/**
+ * A failover proxy provider implementation which allows clients to configure
+ * multiple OMs to connect to. In case of OM failover, client can try
+ * connecting to another OM node from the list of proxies.
+ */
+public class OMProxyProvider implements Closeable {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(OMProxyProvider.class);
+
+  private List<OMProxyInfo> omProxies;
+
+  private int currentProxyIndex = 0;
+
+  private final Configuration conf;
+  private final long omVersion;
+  private final UserGroupInformation ugi;
+  private ClientId clientId = ClientId.randomId();
+
+  public OMProxyProvider(Configuration configuration,
+      UserGroupInformation ugi) {
+    this.conf = configuration;
+    this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
+    this.ugi = ugi;
+    loadOMClientConfigs(conf);
+  }
+
+  private void loadOMClientConfigs(Configuration config) {
+    this.omProxies = new ArrayList<>();
+
+    Collection<String> omServiceIds = config.getTrimmedStringCollection(
+        OZONE_OM_SERVICE_IDS_KEY);
+
+    if (omServiceIds.size() > 1) {
+      throw new IllegalArgumentException("Multi-OM Services is not supported." +
+          " Please configure only one OM Service ID in " +
+          OZONE_OM_SERVICE_IDS_KEY);
+    }
+
+    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
+      Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
+
+      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
+
+        String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
+            serviceId, nodeId);
+        String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
+        if (rpcAddrStr == null) {
+          continue;
+        }
+
+        InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
+
+        // Add the OM client proxy info to list of proxies
+        if (addr != null) {
+          OMProxyInfo omProxyInfo = new OMProxyInfo(addr);
+          omProxies.add(omProxyInfo);
+        } else {
+          LOG.error("Failed to create OM proxy at address {}", rpcAddrStr);
+        }
+      }
+    }
+
+    if (omProxies.isEmpty()) {
+      throw new IllegalArgumentException("Could not find any configured " +
+          "addresses for OM. Please configure the system with "
+          + OZONE_OM_ADDRESS_KEY);
+    }
+  }
+
+  private OzoneManagerProtocolClientSideTranslatorPB getOMClient(
+      InetSocketAddress omAddress) throws IOException {
+    return new OzoneManagerProtocolClientSideTranslatorPB(
+        RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
+            conf, NetUtils.getDefaultSocketFactory(conf),
+            Client.getRpcTimeout(conf)), clientId.toString());
+  }
+
+  /**
+   * Get the proxy object which should be used until the next failover event
+   * occurs. RPC proxy object is intialized lazily.
+   * @return the OM proxy object to invoke methods upon
+   */
+  public synchronized OzoneManagerProtocolClientSideTranslatorPB getProxy() {
+    OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyIndex);
+    return createOMClientIfNeeded(currentOMProxyInfo);
+  }
+
+  private OzoneManagerProtocolClientSideTranslatorPB createOMClientIfNeeded(
+      OMProxyInfo proxyInfo) {
+    if (proxyInfo.getOMProxy() == null) {
+      try {
+        proxyInfo.setOMProxy(getOMClient(proxyInfo.getAddress()));
+      } catch (IOException ioe) {
+        LOG.error("{} Failed to create RPC proxy to OM at {}",
+            this.getClass().getSimpleName(), proxyInfo.getAddress(), ioe);
+        throw new RuntimeException(ioe);
+      }
+    }
+    return proxyInfo.getOMProxy();
+  }
+
+  /**
+   * Called whenever an error warrants failing over. It is determined by the
+   * retry policy.
+   */
+  public void performFailover() {
+    incrementProxyIndex();
+  }
+
+  synchronized void incrementProxyIndex() {
+    currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
+  }
+
+  /**
+   * Close all the proxy objects which have been opened over the lifetime of
+   * the proxy provider.
+   */
+  @Override
+  public synchronized void close() throws IOException {
+    for (OMProxyInfo proxy : omProxies) {
+      OzoneManagerProtocolClientSideTranslatorPB omProxy = proxy.getOMProxy();
+      if (omProxy != null) {
+        RPC.stopProxy(omProxy);
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public List<OMProxyInfo> getOMProxies() {
+    return omProxies;
+  }
+}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
similarity index 94%
rename from hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
rename to hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
index a95f09f..df0e69c 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/package-info.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/ha/package-info.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.hadoop.ozone.om.ha;
+package org.apache.hadoop.ozone.client.rpc.ha;
 
 /**
  * This package contains Ozone Client's OM Proxy classes.
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
deleted file mode 100644
index f5fdf6f..0000000
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/ha/OMFailoverProxyProvider.java
+++ /dev/null
@@ -1,266 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.ozone.om.ha;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.io.retry.FailoverProxyProvider;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.OmUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_ADDRESS_KEY;
-import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_SERVICE_IDS_KEY;
-
-/**
- * A failover proxy provider implementation which allows clients to configure
- * multiple OMs to connect to. In case of OM failover, client can try
- * connecting to another OM node from the list of proxies.
- */
-public class OMFailoverProxyProvider implements
-    FailoverProxyProvider<OzoneManagerProtocolPB>, Closeable {
-
-  public static final Logger LOG =
-      LoggerFactory.getLogger(OMFailoverProxyProvider.class);
-
-  // Map of OMNodeID to its proxy
-  private Map<String, OMProxyInfo> omProxies;
-  private List<String> omNodeIDList;
-
-  private String currentProxyOMNodeId;
-  private int currentProxyIndex;
-
-  private final Configuration conf;
-  private final long omVersion;
-  private final UserGroupInformation ugi;
-
-  public OMFailoverProxyProvider(OzoneConfiguration configuration,
-      UserGroupInformation ugi) throws IOException {
-    this.conf = configuration;
-    this.omVersion = RPC.getProtocolVersion(OzoneManagerProtocolPB.class);
-    this.ugi = ugi;
-    loadOMClientConfigs(conf);
-
-    currentProxyIndex = 0;
-    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
-  }
-
-  /**
-   * Class to store proxy information.
-   */
-  public final class OMProxyInfo
-      extends FailoverProxyProvider.ProxyInfo<OzoneManagerProtocolPB> {
-    private InetSocketAddress address;
-
-    OMProxyInfo(OzoneManagerProtocolPB proxy, String proxyInfoStr,
-        InetSocketAddress addr) {
-      super(proxy, proxyInfoStr);
-      this.address = addr;
-    }
-
-    public InetSocketAddress getAddress() {
-      return address;
-    }
-  }
-
-  private void loadOMClientConfigs(Configuration config) throws IOException {
-    this.omProxies = new HashMap<>();
-    this.omNodeIDList = new ArrayList<>();
-
-    Collection<String> omServiceIds = config.getTrimmedStringCollection(
-        OZONE_OM_SERVICE_IDS_KEY);
-
-    if (omServiceIds.size() > 1) {
-      throw new IllegalArgumentException("Multi-OM Services is not supported." +
-          " Please configure only one OM Service ID in " +
-          OZONE_OM_SERVICE_IDS_KEY);
-    }
-
-    for (String serviceId : OmUtils.emptyAsSingletonNull(omServiceIds)) {
-      Collection<String> omNodeIds = OmUtils.getOMNodeIds(config, serviceId);
-
-      for (String nodeId : OmUtils.emptyAsSingletonNull(omNodeIds)) {
-
-        String rpcAddrKey = OmUtils.addKeySuffixes(OZONE_OM_ADDRESS_KEY,
-            serviceId, nodeId);
-        String rpcAddrStr = OmUtils.getOmRpcAddress(config, rpcAddrKey);
-        if (rpcAddrStr == null) {
-          continue;
-        }
-
-        InetSocketAddress addr = NetUtils.createSocketAddr(rpcAddrStr);
-
-        // Add the OM client proxy info to list of proxies
-        if (addr != null) {
-          StringBuilder proxyInfo = new StringBuilder()
-              .append(nodeId).append("(")
-              .append(NetUtils.getHostPortString(addr)).append(")");
-          OMProxyInfo omProxyInfo = new OMProxyInfo(null,
-              proxyInfo.toString(), addr);
-
-          // For a non-HA OM setup, nodeId might be null. If so, we assign it
-          // a dummy value
-          if (nodeId == null) {
-            nodeId = OzoneConsts.OM_NODE_ID_DUMMY;
-          }
-          omProxies.put(nodeId, omProxyInfo);
-          omNodeIDList.add(nodeId);
-
-        } else {
-          LOG.error("Failed to create OM proxy for {} at address {}",
-              nodeId, rpcAddrStr);
-        }
-      }
-    }
-
-    if (omProxies.isEmpty()) {
-      throw new IllegalArgumentException("Could not find any configured " +
-          "addresses for OM. Please configure the system with "
-          + OZONE_OM_ADDRESS_KEY);
-    }
-  }
-
-  @VisibleForTesting
-  public synchronized String getCurrentProxyOMNodeId() {
-    return currentProxyOMNodeId;
-  }
-
-  private OzoneManagerProtocolPB createOMProxy(InetSocketAddress omAddress)
-      throws IOException {
-    return RPC.getProxy(OzoneManagerProtocolPB.class, omVersion, omAddress, ugi,
-        conf, NetUtils.getDefaultSocketFactory(conf),
-        Client.getRpcTimeout(conf));
-  }
-
-  /**
-   * Get the proxy object which should be used until the next failover event
-   * occurs. RPC proxy object is intialized lazily.
-   * @return the OM proxy object to invoke methods upon
-   */
-  @Override
-  public synchronized OMProxyInfo getProxy() {
-    OMProxyInfo currentOMProxyInfo = omProxies.get(currentProxyOMNodeId);
-    createOMProxyIfNeeded(currentOMProxyInfo);
-    return currentOMProxyInfo;
-  }
-
-  /**
-   * Creates OM proxy object if it does not already exist.
-   */
-  private OMProxyInfo createOMProxyIfNeeded(OMProxyInfo proxyInfo) {
-    if (proxyInfo.proxy == null) {
-      try {
-        proxyInfo.proxy = createOMProxy(proxyInfo.address);
-      } catch (IOException ioe) {
-        LOG.error("{} Failed to create RPC proxy to OM at {}",
-            this.getClass().getSimpleName(), proxyInfo.address, ioe);
-        throw new RuntimeException(ioe);
-      }
-    }
-    return proxyInfo;
-  }
-
-  /**
-   * Called whenever an error warrants failing over. It is determined by the
-   * retry policy.
-   */
-  @Override
-  public void performFailover(OzoneManagerProtocolPB currentProxy) {
-    int newProxyIndex = incrementProxyIndex();
-    LOG.debug("Failing over OM proxy to index: {}, nodeId: {}",
-        newProxyIndex, omNodeIDList.get(newProxyIndex));
-  }
-
-  /**
-   * Update the proxy index to the next proxy in the list.
-   * @return the new proxy index
-   */
-  private synchronized int incrementProxyIndex() {
-    currentProxyIndex = (currentProxyIndex + 1) % omProxies.size();
-    currentProxyOMNodeId = omNodeIDList.get(currentProxyIndex);
-    return currentProxyIndex;
-  }
-
-  @Override
-  public Class<OzoneManagerProtocolPB> getInterface() {
-    return OzoneManagerProtocolPB.class;
-  }
-
-  /**
-   * Performs failover if the leaderOMNodeId returned through OMReponse does
-   * not match the current leaderOMNodeId cached by the proxy provider.
-   */
-  public void performFailoverIfRequired(String newLeaderOMNodeId) {
-    if (updateLeaderOMNodeId(newLeaderOMNodeId)) {
-      LOG.debug("Failing over OM proxy to nodeId: {}", newLeaderOMNodeId);
-    }
-  }
-
-  /**
-   * Failover to the OM proxy specified by the new leader OMNodeId.
-   * @param newLeaderOMNodeId OMNodeId to failover to.
-   * @return true if failover is successful, false otherwise.
-   */
-  synchronized boolean updateLeaderOMNodeId(String newLeaderOMNodeId) {
-    if (!currentProxyOMNodeId.equals(newLeaderOMNodeId)) {
-      if (omProxies.containsKey(newLeaderOMNodeId)) {
-        currentProxyOMNodeId = newLeaderOMNodeId;
-        currentProxyIndex = omNodeIDList.indexOf(currentProxyOMNodeId);
-        return true;
-      }
-    }
-    return false;
-  }
-
-  /**
-   * Close all the proxy objects which have been opened over the lifetime of
-   * the proxy provider.
-   */
-  @Override
-  public synchronized void close() throws IOException {
-    for (OMProxyInfo proxy : omProxies.values()) {
-      OzoneManagerProtocolPB omProxy = proxy.proxy;
-      if (omProxy != null) {
-        RPC.stopProxy(omProxy);
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public List<OMProxyInfo> getOMProxies() {
-    return new ArrayList<>(omProxies.values());
-  }
-}
-
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
index 54f4e82..1573682 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocol/OzoneManagerProtocol.java
@@ -16,7 +16,6 @@
  * limitations under the License.
  */
 package org.apache.hadoop.ozone.om.protocol;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
 import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
@@ -385,11 +384,5 @@ public interface OzoneManagerProtocol
    * @throws IOException
    */
   S3SecretValue getS3Secret(String kerberosID) throws IOException;
-
-  /**
-   * Get the OM Client's Retry and Failover Proxy provider.
-   * @return OMFailoverProxyProvider
-   */
-  OMFailoverProxyProvider getOMFailoverProxyProvider();
 }
 
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
index ff7a1d8..51ce94f 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/protocolPB/OzoneManagerProtocolClientSideTranslatorPB.java
@@ -17,26 +17,18 @@
  */
 package org.apache.hadoop.ozone.om.protocolPB;
 
-import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.stream.Collectors;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
 import org.apache.hadoop.hdds.tracing.TracingUtil;
 import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
 import org.apache.hadoop.ipc.ProtobufHelper;
 import org.apache.hadoop.ipc.ProtocolTranslator;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.KeyValueUtil;
 import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
 import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
@@ -111,7 +103,6 @@ import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.VolumeInfo;
 import org.apache.hadoop.ozone.protocolPB.OMPBHelper;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
-import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.proto.SecurityProtos.CancelDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.GetDelegationTokenRequestProto;
 import org.apache.hadoop.security.proto.SecurityProtos.RenewDelegationTokenRequestProto;
@@ -121,9 +112,6 @@ import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
 import static org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes.TOKEN_ERROR_OTHER;
 import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status.ACCESS_DENIED;
@@ -144,89 +132,20 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
    */
   private static final RpcController NULL_RPC_CONTROLLER = null;
 
-  private final OMFailoverProxyProvider omFailoverProxyProvider;
   private final OzoneManagerProtocolPB rpcProxy;
   private final String clientID;
-  private static final Logger FAILOVER_PROXY_PROVIDER_LOG =
-      LoggerFactory.getLogger(OMFailoverProxyProvider.class);
-
-  public OzoneManagerProtocolClientSideTranslatorPB(
-      OzoneManagerProtocolPB proxy, String clientId) {
-    this.rpcProxy = proxy;
-    this.clientID = clientId;
-    this.omFailoverProxyProvider = null;
-  }
 
   /**
-   * Constructor for OM Protocol Client. This creates a {@link RetryProxy}
-   * over {@link OMFailoverProxyProvider} proxy. OMFailoverProxyProvider has
-   * one {@link OzoneManagerProtocolPB} proxy pointing to each OM node in the
-   * cluster.
+   * Constructor for KeySpaceManger Client.
+   * @param rpcProxy
    */
-  public OzoneManagerProtocolClientSideTranslatorPB(OzoneConfiguration conf,
-      String clientId, UserGroupInformation ugi) throws IOException {
-    this.omFailoverProxyProvider = new OMFailoverProxyProvider(conf, ugi);
-
-    int maxRetries = conf.getInt(
-        OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_RETRY_MAX_ATTEMPTS_DEFAULT);
-    int maxFailovers = conf.getInt(
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
-    int sleepBase = conf.getInt(
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
-    int sleepMax = conf.getInt(
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_KEY,
-        OzoneConfigKeys.OZONE_CLIENT_FAILOVER_SLEEP_MAX_MILLIS_DEFAULT);
-
-    this.rpcProxy = createRetryProxy(omFailoverProxyProvider,
-        maxRetries, maxFailovers, sleepBase, sleepMax);
+  public OzoneManagerProtocolClientSideTranslatorPB(
+      OzoneManagerProtocolPB rpcProxy, String clientId) {
+    this.rpcProxy = rpcProxy;
     this.clientID = clientId;
   }
 
   /**
-   * Creates a {@link RetryProxy} encapsulating the
-   * {@link OMFailoverProxyProvider}. The retry proxy fails over on network
-   * exception or if the current proxy is not the leader OM.
-   */
-  private OzoneManagerProtocolPB createRetryProxy(
-      OMFailoverProxyProvider failoverProxyProvider,
-      int maxRetries, int maxFailovers, int delayMillis, int maxDelayBase) {
-    RetryPolicy retryPolicyOnNetworkException = RetryPolicies
-        .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-            maxFailovers, maxRetries, delayMillis, maxDelayBase);
-    RetryPolicy retryPolicy = new RetryPolicy() {
-      @Override
-      public RetryAction shouldRetry(Exception exception, int retries,
-          int failovers, boolean isIdempotentOrAtMostOnce)
-          throws Exception {
-        if (exception instanceof EOFException ||
-            exception instanceof  ServiceException) {
-          if (retries < maxRetries && failovers < maxFailovers) {
-            return RetryAction.FAILOVER_AND_RETRY;
-          } else {
-            FAILOVER_PROXY_PROVIDER_LOG.error("Failed to connect to OM. " +
-                "Attempted {} retries and {} failovers", retries, failovers);
-            return RetryAction.FAIL;
-          }
-        } else {
-          return retryPolicyOnNetworkException.shouldRetry(
-                  exception, retries, failovers, isIdempotentOrAtMostOnce);
-        }
-      }
-    };
-    OzoneManagerProtocolPB proxy = (OzoneManagerProtocolPB) RetryProxy.create(
-        OzoneManagerProtocolPB.class, failoverProxyProvider, retryPolicy);
-    return proxy;
-  }
-
-  @VisibleForTesting
-  public OMFailoverProxyProvider getOMFailoverProxyProvider() {
-    return omFailoverProxyProvider;
-  }
-
-  /**
    * Closes this stream and releases any system resources associated
    * with it. If the stream is already closed then invoking this
    * method has no effect.
@@ -277,19 +196,7 @@ public final class OzoneManagerProtocolClientSideTranslatorPB
       OMRequest payload = OMRequest.newBuilder(omRequest)
           .setTraceID(TracingUtil.exportCurrentSpan())
           .build();
-
-      OMResponse omResponse =
-          rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
-
-      if (omResponse.hasLeaderOMNodeId() && omFailoverProxyProvider != null) {
-        String leaderOmId = omResponse.getLeaderOMNodeId();
-
-        // Failover to the OM node returned by OMReponse leaderOMNodeId if
-        // current proxy is not pointing to that node.
-        omFailoverProxyProvider.performFailoverIfRequired(leaderOmId);
-      }
-
-      return omResponse;
+      return rpcProxy.submitRequest(NULL_RPC_CONTROLLER, payload);
     } catch (ServiceException e) {
       throw ProtobufHelper.getRemoteException(e);
     }
diff --git a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
index b116826..aaf3c85 100644
--- a/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
+++ b/hadoop-ozone/common/src/main/proto/OzoneManagerProtocol.proto
@@ -140,8 +140,6 @@ message OMResponse {
 
   required Status status = 5;
 
-  optional string leaderOMNodeId = 6;
-
   optional CreateVolumeResponse              createVolumeResponse          = 11;
   optional SetVolumePropertyResponse         setVolumePropertyResponse     = 12;
   optional CheckVolumeAccessResponse         checkVolumeAccessResponse     = 13;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
index f84f95e..a1ef1f6 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java
@@ -32,9 +32,7 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.net.BindException;
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
@@ -50,7 +48,6 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
   private static final Logger LOG =
       LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class);
 
-  private Map<String, OzoneManager> ozoneManagerMap;
   private List<OzoneManager> ozoneManagers;
 
   private static final Random RANDOM = new Random();
@@ -66,12 +63,11 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
   private MiniOzoneHAClusterImpl(
       OzoneConfiguration conf,
-      Map<String, OzoneManager> omMap,
+      List<OzoneManager> omList,
       StorageContainerManager scm,
       List<HddsDatanodeService> hddsDatanodes) {
     super(conf, scm, hddsDatanodes);
-    this.ozoneManagerMap = omMap;
-    this.ozoneManagers = new ArrayList<>(omMap.values());
+    this.ozoneManagers = omList;
   }
 
   /**
@@ -111,10 +107,6 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
     ozoneManagers.get(index).stop();
   }
 
-  public void stopOzoneManager(String omNodeId) {
-    ozoneManagerMap.get(omNodeId).stop();
-  }
-
   /**
    * Builder for configuring the MiniOzoneCluster to run.
    */
@@ -136,17 +128,17 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
       DefaultMetricsSystem.setMiniClusterMode(true);
       initializeConfiguration();
       StorageContainerManager scm;
-      Map<String, OzoneManager> omMap;
+      List<OzoneManager> omList;
       try {
         scm = createSCM();
         scm.start();
-        omMap = createOMService();
+        omList = createOMService();
       } catch (AuthenticationException ex) {
         throw new IOException("Unable to build MiniOzoneCluster. ", ex);
       }
 
       final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes(scm);
-      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omMap,
+      MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, omList,
           scm, hddsDatanodes);
       if (startDataNodes) {
         cluster.startHddsDatanodes();
@@ -179,10 +171,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
      * @throws IOException
      * @throws AuthenticationException
      */
-    private Map<String, OzoneManager> createOMService() throws IOException,
+    private List<OzoneManager> createOMService() throws IOException,
         AuthenticationException {
 
-      Map<String, OzoneManager> omMap = new HashMap<>();
+      List<OzoneManager> omList = new ArrayList<>(numOfOMs);
 
       int retryCount = 0;
       int basePort = 10000;
@@ -194,11 +186,10 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
           for (int i = 1; i<= numOfOMs; i++) {
             // Set nodeId
-            String nodeId = nodeIdBaseStr + i;
-            conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId);
+            conf.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeIdBaseStr + i);
 
             // Set metadata/DB dir base path
-            String metaDirPath = path + "/" + nodeId;
+            String metaDirPath = path + "/" + nodeIdBaseStr + i;
             conf.set(OZONE_METADATA_DIRS, metaDirPath);
             OMStorage omStore = new OMStorage(conf);
             initializeOmStorage(omStore);
@@ -210,7 +201,7 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
 
             OzoneManager om = OzoneManager.createOm(null, conf);
             om.setCertClient(certClient);
-            omMap.put(nodeId, om);
+            omList.add(om);
 
             om.start();
             LOG.info("Started OzoneManager RPC server at " +
@@ -220,24 +211,23 @@ public final class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl {
           // Set default OM address to point to the first OM. Clients would
           // try connecting to this address by default
           conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY,
-              NetUtils.getHostPortString(omMap.get(nodeIdBaseStr + 1)
-                  .getOmRpcServerAddr()));
+              NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr()));
 
           break;
         } catch (BindException e) {
-          for (OzoneManager om : omMap.values()) {
+          for (OzoneManager om : omList) {
             om.stop();
             om.join();
             LOG.info("Stopping OzoneManager server at " +
                 om.getOmRpcServerAddr());
           }
-          omMap.clear();
+          omList.clear();
           ++retryCount;
           LOG.info("MiniOzoneHACluster port conflicts, retried " +
               retryCount + " times");
         }
       }
-      return omMap;
+      return omList;
     }
 
     /**
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
index 0828fe8..32792ae 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneRpcClientAbstract.java
@@ -65,6 +65,8 @@ import org.apache.hadoop.ozone.client.VolumeArgs;
 import org.apache.hadoop.ozone.client.io.OzoneInputStream;
 import org.apache.hadoop.ozone.client.io.OzoneOutputStream;
 import org.apache.hadoop.ozone.client.rest.OzoneException;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
 import org.apache.hadoop.ozone.common.OzoneChecksumException;
 import org.apache.hadoop.ozone.container.common.helpers.BlockData;
 import org.apache.hadoop.ozone.container.common.interfaces.Container;
@@ -74,7 +76,6 @@ import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocat
 import org.apache.hadoop.ozone.om.OzoneManager;
 import org.apache.hadoop.ozone.om.exceptions.OMException;
 import org.apache.hadoop.ozone.om.exceptions.OMException.ResultCodes;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
 import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
 import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
@@ -188,10 +189,9 @@ public abstract class TestOzoneRpcClientAbstract {
    */
   @Test
   public void testOMClientProxyProvider() {
-    OMFailoverProxyProvider omFailoverProxyProvider = store.getClientProxy()
+    OMProxyProvider omProxyProvider = store.getClientProxy()
         .getOMProxyProvider();
-    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
-        omFailoverProxyProvider.getOMProxies();
+    List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
 
     // For a non-HA OM service, there should be only one OM proxy.
     Assert.assertEquals(1, omProxies.size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
index da8f870..62cda91 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/om/TestOzoneManagerHA.java
@@ -18,29 +18,30 @@ package org.apache.hadoop.ozone.om;
 
 
 import org.apache.commons.lang3.RandomStringUtils;
+import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdfs.LogVerificationAppender;
-import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
 import org.apache.hadoop.ozone.MiniOzoneHAClusterImpl;
-import org.apache.hadoop.ozone.client.ObjectStore;
 import org.apache.hadoop.ozone.client.OzoneClient;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyInfo;
+import org.apache.hadoop.ozone.client.rpc.ha.OMProxyProvider;
+import org.apache.hadoop.ozone.om.exceptions.OMException;
+import org.apache.hadoop.ozone.web.handlers.UserArgs;
+import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
+import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
+import org.apache.hadoop.ozone.web.response.VolumeInfo;
+import org.apache.hadoop.ozone.web.utils.OzoneUtils;
 import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.hadoop.ozone.client.OzoneClientFactory;
-import org.apache.hadoop.ozone.client.OzoneVolume;
-import org.apache.hadoop.ozone.client.VolumeArgs;
-import org.apache.log4j.Logger;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ExpectedException;
 import org.junit.rules.Timeout;
 
 import java.io.IOException;
-import java.net.ConnectException;
 import java.net.InetSocketAddress;
 import java.util.List;
 import java.util.UUID;
@@ -49,14 +50,6 @@ import static org.apache.hadoop.ozone.MiniOzoneHAClusterImpl
     .NODE_FAILURE_TIMEOUT;
 import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_ACL_ENABLED;
 import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
-    .OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
     .OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS;
 
 /**
@@ -65,7 +58,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys
 public class TestOzoneManagerHA {
 
   private MiniOzoneHAClusterImpl cluster = null;
-  private ObjectStore objectStore;
+  private StorageHandler storageHandler;
+  private UserArgs userArgs;
   private OzoneConfiguration conf;
   private String clusterId;
   private String scmId;
@@ -75,7 +69,7 @@ public class TestOzoneManagerHA {
   public ExpectedException exception = ExpectedException.none();
 
   @Rule
-  public Timeout timeout = new Timeout(120_000);
+  public Timeout timeout = new Timeout(60_000);
 
   /**
    * Create a MiniDFSCluster for testing.
@@ -91,9 +85,6 @@ public class TestOzoneManagerHA {
     scmId = UUID.randomUUID().toString();
     conf.setBoolean(OZONE_ACL_ENABLED, true);
     conf.setInt(OZONE_OPEN_KEY_EXPIRE_THRESHOLD_SECONDS, 2);
-    conf.setInt(OZONE_CLIENT_RETRY_MAX_ATTEMPTS_KEY, 3);
-    conf.setInt(OZONE_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY, 3);
-    conf.setInt(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_KEY, 50);
 
     cluster = (MiniOzoneHAClusterImpl) MiniOzoneCluster.newHABuilder(conf)
         .setClusterId(clusterId)
@@ -102,7 +93,9 @@ public class TestOzoneManagerHA {
         .setNumOfOzoneManagers(numOfOMs)
         .build();
     cluster.waitForClusterToBeReady();
-    objectStore = OzoneClientFactory.getRpcClient(conf).getObjectStore();
+    storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
+    userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
+        null, null, null, null);
   }
 
   /**
@@ -122,7 +115,7 @@ public class TestOzoneManagerHA {
    */
   @Test
   public void testAllOMNodesRunning() throws Exception {
-    createVolumeTest(true);
+    testCreateVolume(true);
   }
 
   /**
@@ -133,56 +126,52 @@ public class TestOzoneManagerHA {
     cluster.stopOzoneManager(1);
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
-    createVolumeTest(true);
+    testCreateVolume(true);
   }
 
   /**
    * Test client request fails when 2 OMs are down.
    */
   @Test
+  @Ignore("TODO:HDDS-1158")
   public void testTwoOMNodesDown() throws Exception {
     cluster.stopOzoneManager(1);
     cluster.stopOzoneManager(2);
     Thread.sleep(NODE_FAILURE_TIMEOUT * 2);
 
-    createVolumeTest(false);
+    testCreateVolume(false);
   }
 
   /**
    * Create a volume and test its attribute.
    */
-  private void createVolumeTest(boolean checkSuccess) throws Exception {
+  private void testCreateVolume(boolean checkSuccess) throws Exception {
     String userName = "user" + RandomStringUtils.randomNumeric(5);
     String adminName = "admin" + RandomStringUtils.randomNumeric(5);
     String volumeName = "volume" + RandomStringUtils.randomNumeric(5);
 
-    VolumeArgs createVolumeArgs = VolumeArgs.newBuilder()
-        .setOwner(userName)
-        .setAdmin(adminName)
-        .build();
+    VolumeArgs createVolumeArgs = new VolumeArgs(volumeName, userArgs);
+    createVolumeArgs.setUserName(userName);
+    createVolumeArgs.setAdminName(adminName);
 
     try {
-      objectStore.createVolume(volumeName, createVolumeArgs);
+      storageHandler.createVolume(createVolumeArgs);
 
-      OzoneVolume retVolumeinfo = objectStore.getVolume(volumeName);
+      VolumeArgs getVolumeArgs = new VolumeArgs(volumeName, userArgs);
+      VolumeInfo retVolumeinfo = storageHandler.getVolumeInfo(getVolumeArgs);
 
       if (checkSuccess) {
-        Assert.assertTrue(retVolumeinfo.getName().equals(volumeName));
-        Assert.assertTrue(retVolumeinfo.getOwner().equals(userName));
-        Assert.assertTrue(retVolumeinfo.getAdmin().equals(adminName));
+        Assert.assertTrue(retVolumeinfo.getVolumeName().equals(volumeName));
+        Assert.assertTrue(retVolumeinfo.getOwner().getName().equals(userName));
       } else {
         // Verify that the request failed
+        Assert.assertTrue(retVolumeinfo.getVolumeName().isEmpty());
         Assert.fail("There is no quorum. Request should have failed");
       }
-    } catch (ConnectException | RemoteException e) {
+    } catch (OMException e) {
       if (!checkSuccess) {
-        // If the last OM to be tried by the RetryProxy is down, we would get
-        // ConnectException. Otherwise, we would get a RemoteException from the
-        // last running OM as it would fail to get a quorum.
-        if (e instanceof RemoteException) {
-          GenericTestUtils.assertExceptionContains(
-              "RaftRetryFailureException", e);
-        }
+        GenericTestUtils.assertExceptionContains(
+            "RaftRetryFailureException", e);
       } else {
         throw e;
       }
@@ -190,16 +179,14 @@ public class TestOzoneManagerHA {
   }
 
   /**
-   * Test that OMFailoverProxyProvider creates an OM proxy for each OM in the
-   * cluster.
+   * Test that OMProxyProvider creates an OM proxy for each OM in the cluster.
    */
   @Test
-  public void testOMProxyProviderInitialization() throws Exception {
+  public void testOMClientProxyProvide() throws Exception {
     OzoneClient rpcClient = cluster.getRpcClient();
-    OMFailoverProxyProvider omFailoverProxyProvider =
+    OMProxyProvider omProxyProvider =
         rpcClient.getObjectStore().getClientProxy().getOMProxyProvider();
-    List<OMFailoverProxyProvider.OMProxyInfo> omProxies =
-        omFailoverProxyProvider.getOMProxies();
+    List<OMProxyInfo> omProxies = omProxyProvider.getOMProxies();
 
     Assert.assertEquals(numOfOMs, omProxies.size());
 
@@ -207,7 +194,7 @@ public class TestOzoneManagerHA {
       InetSocketAddress omRpcServerAddr =
           cluster.getOzoneManager(i).getOmRpcServerAddr();
       boolean omClientProxyExists = false;
-      for (OMFailoverProxyProvider.OMProxyInfo omProxyInfo : omProxies) {
+      for (OMProxyInfo omProxyInfo : omProxies) {
         if (omProxyInfo.getAddress().equals(omRpcServerAddr)) {
           omClientProxyExists = true;
           break;
@@ -218,99 +205,4 @@ public class TestOzoneManagerHA {
           omClientProxyExists);
     }
   }
-
-  /**
-   * Test OMFailoverProxyProvider failover on connection exception to OM client.
-   */
-  @Test
-  public void testOMProxyProviderFailoverOnConnectionFailure()
-      throws Exception {
-    OMFailoverProxyProvider omFailoverProxyProvider =
-        objectStore.getClientProxy().getOMProxyProvider();
-    String firstProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    createVolumeTest(true);
-
-    // On stopping the current OM Proxy, the next connection attempt should
-    // failover to a another OM proxy.
-    cluster.stopOzoneManager(firstProxyNodeId);
-    Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT * 4);
-
-    // Next request to the proxy provider should result in a failover
-    createVolumeTest(true);
-    Thread.sleep(OZONE_CLIENT_FAILOVER_SLEEP_BASE_MILLIS_DEFAULT);
-
-    // Get the new OM Proxy NodeId
-    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // Verify that a failover occured. the new proxy nodeId should be
-    // different from the old proxy nodeId.
-    Assert.assertNotEquals("Failover did not occur as expected",
-        firstProxyNodeId, newProxyNodeId);
-  }
-
-  /**
-   * Test OMFailoverProxyProvider failover when current OM proxy is not
-   * the current OM Leader.
-   */
-  @Test
-  public void testOMProxyProviderFailoverToCurrentLeader() throws Exception {
-    OMFailoverProxyProvider omFailoverProxyProvider =
-        objectStore.getClientProxy().getOMProxyProvider();
-
-    // Run couple of createVolume tests to discover the current Leader OM
-    createVolumeTest(true);
-    createVolumeTest(true);
-
-    // The OMFailoverProxyProvider will point to the current leader OM node.
-    String leaderOMNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // Perform a manual failover of the proxy provider to move the
-    // currentProxyIndex to a node other than the leader OM.
-    omFailoverProxyProvider.performFailover(
-        omFailoverProxyProvider.getProxy().proxy);
-
-    String newProxyNodeId = omFailoverProxyProvider.getCurrentProxyOMNodeId();
-    Assert.assertNotEquals(leaderOMNodeId, newProxyNodeId);
-
-    // Once another request is sent to this new proxy node, the leader
-    // information must be returned via the response and a failover must
-    // happen to the leader proxy node.
-    createVolumeTest(true);
-    Thread.sleep(2000);
-
-    String newLeaderOMNodeId =
-        omFailoverProxyProvider.getCurrentProxyOMNodeId();
-
-    // The old and new Leader OM NodeId must match since there was no new
-    // election in the Ratis ring.
-    Assert.assertEquals(leaderOMNodeId, newLeaderOMNodeId);
-  }
-
-  @Test
-  public void testOMRetryProxy() throws Exception {
-    // Stop all the OMs. After making 5 (set maxRetries value) attempts at
-    // connection, the RpcClient should give up.
-    for (int i = 0; i < numOfOMs; i++) {
-      cluster.stopOzoneManager(i);
-    }
-
-    final LogVerificationAppender appender = new LogVerificationAppender();
-    final org.apache.log4j.Logger logger = Logger.getRootLogger();
-    logger.addAppender(appender);
-
-    try {
-      createVolumeTest(true);
-      Assert.fail("TestOMRetryProxy should fail when there are no OMs running");
-    } catch (ConnectException e) {
-      // Each retry attempt tries upto 10 times to connect. So there should be
-      // 3*10 "Retrying connect to server" messages
-      Assert.assertEquals(30,
-          appender.countLinesWithMessage("Retrying connect to server:"));
-
-      Assert.assertEquals(1,
-          appender.countLinesWithMessage("Failed to connect to OM. Attempted " +
-              "3 retries and 3 failovers"));
-    }
-  }
 }
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index cacdca8..ff94935 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -70,7 +70,6 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.ozone.OzoneIllegalArgumentException;
 import org.apache.hadoop.ozone.OzoneSecurityUtil;
-import org.apache.hadoop.ozone.om.ha.OMFailoverProxyProvider;
 import org.apache.hadoop.ozone.om.helpers.S3SecretValue;
 import org.apache.hadoop.ozone.security.OzoneSecurityException;
 import org.apache.hadoop.ozone.security.OzoneTokenIdentifier;
@@ -2604,11 +2603,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
     return peerNodes;
   }
 
-  @Override
-  public OMFailoverProxyProvider getOMFailoverProxyProvider() {
-    return null;
-  }
-
   @VisibleForTesting
   public CertificateClient getCertificateClient() {
     return certClient;
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
index 8e4582d..9115421 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
@@ -32,7 +32,6 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.protocol.Message;
-import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.retry.RetryPolicy;
@@ -101,12 +100,10 @@ public final class OMRatisHelper {
     return Message.valueOf(ByteString.copyFrom(requestBytes));
   }
 
-  static OMResponse getOMResponseFromRaftClientReply(RaftClientReply reply)
+  static OMResponse convertByteStringToOMResponse(ByteString byteString)
       throws InvalidProtocolBufferException {
-    byte[] bytes = reply.getMessage().getContent().toByteArray();
-    return OMResponse.newBuilder(OMResponse.parseFrom(bytes))
-        .setLeaderOMNodeId(reply.getReplierId())
-        .build();
+    byte[] bytes = byteString.toByteArray();
+    return OMResponse.parseFrom(bytes);
   }
 
   static OMResponse getErrorResponse(Type cmdType, Exception e) {
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
index 1b4c634..9e1cafc 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -23,10 +23,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-
 import com.google.protobuf.InvalidProtocolBufferException;
-import com.google.protobuf.ServiceException;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.ozone.OmUtils;
 import org.apache.hadoop.ozone.om.OMConfigKeys;
@@ -56,24 +53,24 @@ public final class OzoneManagerRatisClient implements Closeable {
       OzoneManagerRatisClient.class);
 
   private final RaftGroup raftGroup;
-  private final String omNodeID;
+  private final String omID;
   private final RpcType rpcType;
   private RaftClient raftClient;
   private final RetryPolicy retryPolicy;
   private final Configuration conf;
 
-  private OzoneManagerRatisClient(String omNodeId, RaftGroup raftGroup,
+  private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
       RpcType rpcType, RetryPolicy retryPolicy,
       Configuration config) {
     this.raftGroup = raftGroup;
-    this.omNodeID = omNodeId;
+    this.omID = omId;
     this.rpcType = rpcType;
     this.retryPolicy = retryPolicy;
     this.conf = config;
   }
 
   public static OzoneManagerRatisClient newOzoneManagerRatisClient(
-      String omNodeId, RaftGroup raftGroup, Configuration conf) {
+      String omId, RaftGroup raftGroup, Configuration conf) {
     final String rpcType = conf.get(
         OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
         OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
@@ -90,19 +87,19 @@ public final class OzoneManagerRatisClient implements Closeable {
     final RetryPolicy retryPolicy = RetryPolicies
         .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
 
-    return new OzoneManagerRatisClient(omNodeId, raftGroup,
+    return new OzoneManagerRatisClient(omId, raftGroup,
         SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
   }
 
   public void connect() {
     LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
-        raftGroup.getGroupId().getUuid().toString(), omNodeID);
+        raftGroup.getGroupId().getUuid().toString(), omID);
 
     // TODO : XceiverClient ratis should pass the config value of
     // maxOutstandingRequests so as to set the upper bound on max no of async
     // requests to be handled by raft client
 
-    raftClient = OMRatisHelper.newRaftClient(rpcType, omNodeID, raftGroup,
+    raftClient = OMRatisHelper.newRaftClient(rpcType, omID, raftGroup,
         retryPolicy, conf);
   }
 
@@ -122,12 +119,13 @@ public final class OzoneManagerRatisClient implements Closeable {
    * @param request Request
    * @return Response to the command
    */
-  public OMResponse sendCommand(OMRequest request) throws ServiceException {
+  public OMResponse sendCommand(OMRequest request) {
     try {
       CompletableFuture<OMResponse> reply = sendCommandAsync(request);
       return reply.get();
     } catch (ExecutionException | InterruptedException e) {
-      throw new ServiceException(e);
+      LOG.error("Failed to execute command: " + request, e);
+      return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
     }
   }
 
@@ -154,10 +152,9 @@ public final class OzoneManagerRatisClient implements Closeable {
                 if (raftRetryFailureException != null) {
                   throw new CompletionException(raftRetryFailureException);
                 }
-
                 OMResponse response = OMRatisHelper
-                    .getOMResponseFromRaftClientReply(reply);
-
+                    .convertByteStringToOMResponse(reply.getMessage()
+                        .getContent());
                 return response;
               } catch (InvalidProtocolBufferException e) {
                 throw new CompletionException(e);
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 2f1d64d8..5684fa5 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -80,8 +80,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
   /**
    * Submits request to OM's Ratis server.
    */
-  private OMResponse submitRequestToRatis(OMRequest request)
-      throws ServiceException {
+  private OMResponse submitRequestToRatis(OMRequest request) {
     return omRatisClient.sendCommand(request);
   }
 
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 8a8be35..83d2245 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -34,6 +34,8 @@ import org.apache.hadoop.ozone.om.OMConfigKeys;
 import org.apache.hadoop.ozone.om.OMNodeDetails;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
     .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+    .OMResponse;
 import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -108,6 +110,27 @@ public class TestOzoneManagerRatisServer {
   }
 
   /**
+   * Submit any request to OM Ratis server and check that the dummy response
+   * message is received.
+   */
+  @Test
+  public void testSubmitRatisRequest() throws Exception {
+    // Wait for leader election
+    Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
+    OMRequest request = OMRequest.newBuilder()
+        .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+        .setClientId(clientId)
+        .build();
+
+    OMResponse response = omRatisClient.sendCommand(request);
+
+    Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
+        response.getCmdType());
+    Assert.assertEquals(false, response.getSuccess());
+    Assert.assertEquals(false, response.hasCreateVolumeResponse());
+  }
+
+  /**
    * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
    * categorized in {@link OmUtils#isReadOnly(OMRequest)}.
    */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


Mime
View raw message