hadoop-ozone-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From shashik...@apache.org
Subject [hadoop-ozone] 01/01: Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"
Date Thu, 23 Jan 2020 09:14:40 GMT
This is an automated email from the ASF dual-hosted git repository.

shashikant pushed a commit to branch revert-472-HDDS-2920
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit 145ae53b651405d8f7d8a6fef43309ed47878c3e
Author: bshashikant <sbanerjee@hortonworks.com>
AuthorDate: Thu Jan 23 14:44:32 2020 +0530

    Revert "HDDS-2920. Remove ozone ratis client specific config keys. (#472)"
    
    This reverts commit ab557dba89e4666e8c2679fa9b5fbd95dfd45e0b.
---
 .../apache/hadoop/hdds/scm/XceiverClientGrpc.java  |  2 +-
 .../hadoop/hdds/scm/XceiverClientManager.java      | 42 +--------------
 .../apache/hadoop/hdds/scm/XceiverClientRatis.java | 29 ++++++++---
 .../hadoop/hdds/scm/storage/BlockOutputStream.java |  6 ++-
 .../hadoop/hdds/scm/storage/CommitWatcher.java     |  8 ++-
 .../org/apache/hadoop/hdds/ratis/RatisHelper.java  | 46 +++++++++++++----
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  7 ++-
 .../apache/hadoop/hdds/scm/XceiverClientSpi.java   |  3 +-
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   | 11 ++++
 .../common/src/main/resources/ozone-default.xml    | 16 ++++++
 .../ozone/client/io/BlockOutputStreamEntry.java    |  2 +-
 .../hadoop/ozone/client/io/KeyOutputStream.java    |  5 ++
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |  8 +++
 .../org/apache/hadoop/ozone/RatisTestHelper.java   |  9 +++-
 .../ozone/client/rpc/Test2WayCommitInRatis.java    |  4 +-
 .../ozone/client/rpc/TestBlockOutputStream.java    |  1 +
 .../rpc/TestBlockOutputStreamWithFailures.java     |  1 +
 .../rpc/TestCloseContainerHandlingByClient.java    |  1 +
 .../hadoop/ozone/client/rpc/TestCommitWatcher.java |  5 +-
 .../client/rpc/TestFailureHandlingByClient.java    |  2 +
 .../ozone/client/rpc/TestKeyInputStream.java       |  1 +
 .../rpc/TestMultiBlockWritesWithDnFailures.java    |  2 +
 .../rpc/TestOzoneClientRetriesOnException.java     |  1 +
 .../hadoop/ozone/client/rpc/TestReadRetries.java   |  2 +-
 .../ozone/client/rpc/TestWatchForCommit.java       | 60 ++++++++++++++++++++--
 .../hadoop/ozone/freon/TestDataValidate.java       |  2 +
 .../ozone/freon/TestOzoneClientKeyGenerator.java   |  2 +
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |  2 +
 .../hadoop/ozone/freon/DatanodeChunkGenerator.java |  2 +-
 29 files changed, 206 insertions(+), 76 deletions(-)

diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
index 0ded84a..9a4da38 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientGrpc.java
@@ -473,7 +473,7 @@ public class XceiverClientGrpc extends XceiverClientSpi {
   }
 
   @Override
-  public XceiverClientReply watchForCommit(long index)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     // there is no notion of watch for commit index in standalone pipeline
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
index dc3b215..d46456a 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientManager.java
@@ -342,12 +342,11 @@ public class XceiverClientManager implements Closeable {
   /**
    * Configuration for ratis client.
    */
-  @ConfigGroup(prefix = "raft.client")
+  @ConfigGroup(prefix = "dfs.ratis.client")
   public static class DFSRatisClientConfig {
 
-    @Config(key = "async.outstanding-requests.max",
+    @Config(key = "async.max.outstanding.requests",
         defaultValue = "64",
-        type = ConfigType.INT,
         tags = {OZONE, CLIENT, PERFORMANCE},
         description =
             "Controls the maximum number of outstanding async requests that can"
@@ -362,43 +361,6 @@ public class XceiverClientManager implements Closeable {
     public void setMaxOutstandingRequests(int maxOutstandingRequests) {
       this.maxOutstandingRequests = maxOutstandingRequests;
     }
-
-    @Config(key = "rpc.request.timeout",
-        defaultValue = "60s",
-        type = ConfigType.TIME,
-        tags = {OZONE, CLIENT, PERFORMANCE},
-        description = "The timeout duration for ratis client request (except " +
-            "for watch request). It should be set greater than leader " +
-            "election timeout in Ratis."
-    )
-    private long requestTimeOut = 60 * 1000;
-
-    public long getRequestTimeOut() {
-      return requestTimeOut;
-    }
-
-    public void setRequestTimeOut(long requestTimeOut) {
-      this.requestTimeOut = requestTimeOut;
-    }
-
-    @Config(key = "watch.request.timeout",
-        defaultValue = "180s",
-        type = ConfigType.TIME,
-        tags = {OZONE, CLIENT, PERFORMANCE},
-        description = "The timeout duration for ratis client watch request. " +
-            "Timeout for the watch API in Ratis client to acknowledgea " +
-            "particular request getting replayed to all servers."
-    )
-    private long watchRequestTimeOut = 180 * 1000;
-
-    public long getWatchRequestTimeOut() {
-      return watchRequestTimeOut;
-    }
-
-    public void setWatchRequestTimeOut(long watchRequestTimeOut) {
-      this.watchRequestTimeOut = watchRequestTimeOut;
-    }
   }
 
-
 }
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
index 0d12355..6f102d8 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientRatis.java
@@ -30,6 +30,7 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
@@ -58,6 +59,7 @@ import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -87,19 +89,25 @@ public final class XceiverClientRatis extends XceiverClientSpi {
     final String rpcType = ozoneConf
         .get(ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_KEY,
             ScmConfigKeys.DFS_CONTAINER_RATIS_RPC_TYPE_DEFAULT);
+    final TimeDuration clientRequestTimeout =
+        RatisHelper.getClientRequestTimeout(ozoneConf);
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(ozoneConf);
     final RetryPolicy retryPolicy = RatisHelper.createRetryPolicy(ozoneConf);
     final GrpcTlsConfig tlsConfig = RatisHelper.createTlsClientConfig(new
         SecurityConfig(ozoneConf), caCert);
     return new XceiverClientRatis(pipeline,
-        SupportedRpcType.valueOfIgnoreCase(rpcType),
-        retryPolicy, tlsConfig, ozoneConf);
+        SupportedRpcType.valueOfIgnoreCase(rpcType), maxOutstandingRequests,
+        retryPolicy, tlsConfig, clientRequestTimeout, ozoneConf);
   }
 
   private final Pipeline pipeline;
   private final RpcType rpcType;
   private final AtomicReference<RaftClient> client = new AtomicReference<>();
+  private final int maxOutstandingRequests;
   private final RetryPolicy retryPolicy;
   private final GrpcTlsConfig tlsConfig;
+  private final TimeDuration clientRequestTimeout;
   private final Configuration ozoneConfiguration;
 
   // Map to track commit index at every server
@@ -111,14 +119,17 @@ public final class XceiverClientRatis extends XceiverClientSpi {
    * Constructs a client.
    */
   private XceiverClientRatis(Pipeline pipeline, RpcType rpcType,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      int maxOutStandingChunks, RetryPolicy retryPolicy,
+      GrpcTlsConfig tlsConfig, TimeDuration timeout,
       Configuration configuration) {
     super();
     this.pipeline = pipeline;
     this.rpcType = rpcType;
+    this.maxOutstandingRequests = maxOutStandingChunks;
     this.retryPolicy = retryPolicy;
     commitInfoMap = new ConcurrentHashMap<>();
     this.tlsConfig = tlsConfig;
+    this.clientRequestTimeout = timeout;
     metrics = XceiverClientManager.getXceiverClientMetrics();
     this.ozoneConfiguration = configuration;
   }
@@ -170,7 +181,8 @@ public final class XceiverClientRatis extends XceiverClientSpi {
 
     if (!client.compareAndSet(null,
         RatisHelper.newRaftClient(rpcType, getPipeline(), retryPolicy,
-            tlsConfig, ozoneConfiguration))) {
+            maxOutstandingRequests, tlsConfig, clientRequestTimeout,
+            ozoneConfiguration))) {
       throw new IllegalStateException("Client is already connected.");
     }
   }
@@ -244,7 +256,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
   }
 
   @Override
-  public XceiverClientReply watchForCommit(long index)
+  public XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException {
     long commitIndex = getReplicatedMinCommitIndex();
@@ -255,11 +267,14 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       clientReply.setLogIndex(commitIndex);
       return clientReply;
     }
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("commit index : {} watch timeout : {}", index, timeout);
+    }
     RaftClientReply reply;
     try {
       CompletableFuture<RaftClientReply> replyFuture = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.ALL_COMMITTED);
-      replyFuture.get();
+      replyFuture.get(timeout, TimeUnit.MILLISECONDS);
     } catch (Exception e) {
       Throwable t = HddsClientUtils.checkForException(e);
       LOG.warn("3 way commit failed on pipeline {}", pipeline, e);
@@ -268,7 +283,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
       }
       reply = getClient()
           .sendWatchAsync(index, RaftProtos.ReplicationLevel.MAJORITY_COMMITTED)
-          .get();
+          .get(timeout, TimeUnit.MILLISECONDS);
       List<RaftProtos.CommitInfoProto> commitInfoProtoList =
           reply.getCommitInfos().stream()
               .filter(i -> i.getCommitIndex() < index)
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
index 9131f5c..15aebe1 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockOutputStream.java
@@ -125,6 +125,7 @@ public class BlockOutputStream extends OutputStream {
    * @param bufferPool           pool of buffers
    * @param streamBufferFlushSize flush size
    * @param streamBufferMaxSize   max size of the currentBuffer
+   * @param watchTimeout          watch timeout
    * @param checksumType          checksum type
    * @param bytesPerChecksum      Bytes per checksum
    */
@@ -132,7 +133,8 @@ public class BlockOutputStream extends OutputStream {
   public BlockOutputStream(BlockID blockID,
       XceiverClientManager xceiverClientManager, Pipeline pipeline,
       int chunkSize, long streamBufferFlushSize, long streamBufferMaxSize,
-      BufferPool bufferPool, ChecksumType checksumType, int bytesPerChecksum)
+      long watchTimeout, BufferPool bufferPool, ChecksumType checksumType,
+      int bytesPerChecksum)
       throws IOException {
     this.blockID = new AtomicReference<>(blockID);
     this.chunkSize = chunkSize;
@@ -152,7 +154,7 @@ public class BlockOutputStream extends OutputStream {
 
     // A single thread executor handle the responses of async requests
     responseExecutor = Executors.newSingleThreadExecutor();
-    commitWatcher = new CommitWatcher(bufferPool, xceiverClient);
+    commitWatcher = new CommitWatcher(bufferPool, xceiverClient, watchTimeout);
     bufferList = null;
     totalDataFlushedLength = 0;
     writtenDataLength = 0;
diff --git a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
index 34d0d7c..ebcc6dc 100644
--- a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
+++ b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/CommitWatcher.java
@@ -71,13 +71,17 @@ public class CommitWatcher {
 
   private XceiverClientSpi xceiverClient;
 
+  private final long watchTimeout;
+
   // total data which has been successfully flushed and acknowledged
   // by all servers
   private long totalAckDataLength;
 
-  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient) {
+  public CommitWatcher(BufferPool bufferPool, XceiverClientSpi xceiverClient,
+      long watchTimeout) {
     this.bufferPool = bufferPool;
     this.xceiverClient = xceiverClient;
+    this.watchTimeout = watchTimeout;
     commitIndex2flushedDataMap = new ConcurrentSkipListMap<>();
     totalAckDataLength = 0;
     futureMap = new ConcurrentHashMap<>();
@@ -187,7 +191,7 @@ public class CommitWatcher {
     long index;
     try {
       XceiverClientReply reply =
-          xceiverClient.watchForCommit(commitIndex);
+          xceiverClient.watchForCommit(commitIndex, watchTimeout);
       if (reply == null) {
         index = 0;
       } else {
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
index 75e07c3..98c36b6 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/ratis/RatisHelper.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.ozone.OzoneConsts;
 
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcFactory;
@@ -148,12 +149,26 @@ public interface RatisHelper {
   }
 
   static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      RetryPolicy retryPolicy, int maxOutStandingRequest,
+      GrpcTlsConfig tlsConfig, TimeDuration timeout,
       Configuration ozoneConfiguration) throws IOException {
     return newRaftClient(rpcType,
         toRaftPeerId(pipeline.getLeaderNode()),
         newRaftGroup(RaftGroupId.valueOf(pipeline.getId().getId()),
-            pipeline.getNodes()), retryPolicy, tlsConfig, ozoneConfiguration);
+            pipeline.getNodes()), retryPolicy, maxOutStandingRequest, tlsConfig,
+        timeout, ozoneConfiguration);
+  }
+
+  static TimeDuration getClientRequestTimeout(Configuration conf) {
+    // Set the client requestTimeout
+    final TimeUnit timeUnit =
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getUnit();
+    final long duration = conf.getTimeDuration(
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+        OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
+            .getDuration(), timeUnit);
+    return TimeDuration.valueOf(duration, timeUnit);
   }
 
   static RpcType getRpcType(Configuration conf) {
@@ -163,30 +178,35 @@ public interface RatisHelper {
   }
 
   static RaftClient newRaftClient(RaftPeer leader, Configuration conf) {
-    return newRaftClient(getRpcType(conf), leader,
-        RatisHelper.createRetryPolicy(conf), conf);
+    return newRaftClient(getRpcType(conf), leader, RetryPolicies.noRetry(),
+        GrpcConfigKeys.OutputStream.OUTSTANDING_APPENDS_MAX_DEFAULT,
+        getClientRequestTimeout(conf), conf);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
-      RetryPolicy retryPolicy, GrpcTlsConfig tlsConfig,
+      RetryPolicy retryPolicy, int maxOutstandingRequests,
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
       Configuration configuration) {
     return newRaftClient(rpcType, leader.getId(),
         newRaftGroup(Collections.singletonList(leader)), retryPolicy,
-        tlsConfig, configuration);
+        maxOutstandingRequests, tlsConfig, clientRequestTimeout, configuration);
   }
 
   static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader,
-      RetryPolicy retryPolicy,
+      RetryPolicy retryPolicy, int maxOutstandingRequests,
+      TimeDuration clientRequestTimeout,
       Configuration ozoneConfiguration) {
     return newRaftClient(rpcType, leader.getId(),
-        newRaftGroup(Collections.singletonList(leader)), retryPolicy, null,
+        newRaftGroup(Collections.singletonList(leader)), retryPolicy,
+        maxOutstandingRequests, null, clientRequestTimeout,
         ozoneConfiguration);
   }
 
   @SuppressWarnings("checkstyle:ParameterNumber")
   static RaftClient newRaftClient(RpcType rpcType, RaftPeerId leader,
-      RaftGroup group, RetryPolicy retryPolicy,
-      GrpcTlsConfig tlsConfig, Configuration ozoneConfiguration) {
+      RaftGroup group, RetryPolicy retryPolicy, int maxOutStandingRequest,
+      GrpcTlsConfig tlsConfig, TimeDuration clientRequestTimeout,
+      Configuration ozoneConfiguration) {
     if (LOG.isTraceEnabled()) {
       LOG.trace("newRaftClient: {}, leader={}, group={}",
           rpcType, leader, group);
@@ -200,10 +220,16 @@ public interface RatisHelper {
     createRaftGrpcProperties(ozoneConfiguration, properties);
 
     RaftConfigKeys.Rpc.setType(properties, rpcType);
+    RaftClientConfigKeys.Rpc
+        .setRequestTimeout(properties, clientRequestTimeout);
 
     GrpcConfigKeys.setMessageSizeMax(properties,
         SizeInBytes.valueOf(OzoneConsts.OZONE_SCM_CHUNK_MAX_SIZE));
 
+    // set async max outstanding requests.
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties,
+        maxOutStandingRequest);
+
     RaftClient.Builder builder =  RaftClient.newBuilder()
         .setRaftGroup(group)
         .setLeaderId(leader)
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index ab17a52..737add0 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -114,7 +114,12 @@ public final class ScmConfigKeys {
       "dfs.container.ratis.leader.pending.bytes.limit";
   public static final String
       DFS_CONTAINER_RATIS_LEADER_PENDING_BYTES_LIMIT_DEFAULT = "1GB";
-  
+
+  public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+      "dfs.ratis.client.request.timeout.duration";
+  public static final TimeDuration
+      DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+      TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       "dfs.ratis.client.request.max.retries";
   public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT = 180;
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
index 3287777..f938448 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/XceiverClientSpi.java
@@ -170,6 +170,7 @@ public abstract class XceiverClientSpi implements Closeable {
   /**
    * Check if an specfic commitIndex is replicated to majority/all servers.
    * @param index index to watch for
+   * @param timeout timeout provided for the watch operation to complete
    * @return reply containing the min commit index replicated to all or majority
    *         servers in case of a failure
    * @throws InterruptedException
@@ -177,7 +178,7 @@ public abstract class XceiverClientSpi implements Closeable {
    * @throws TimeoutException
    * @throws IOException
    */
-  public abstract XceiverClientReply watchForCommit(long index)
+  public abstract XceiverClientReply watchForCommit(long index, long timeout)
       throws InterruptedException, ExecutionException, TimeoutException,
       IOException;
 
diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index f2f4a6a..857f1de 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -134,6 +134,12 @@ public final class OzoneConfigKeys {
   public static final String OZONE_CLIENT_STREAM_BUFFER_MAX_SIZE_DEFAULT =
       "128MB";
 
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT =
+      "ozone.client.watch.request.timeout";
+
+  public static final String OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT =
+      "30s";
+
   public static final String OZONE_CLIENT_MAX_RETRIES =
       "ozone.client.max.retries";
   public static final int OZONE_CLIENT_MAX_RETRIES_DEFAULT = 100;
@@ -263,6 +269,11 @@ public final class OzoneConfigKeys {
 
   public static final String DFS_CONTAINER_RATIS_DATANODE_STORAGE_DIR =
       "dfs.container.ratis.datanode.storage.dir";
+  public static final String DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY =
+      ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY;
+  public static final TimeDuration
+      DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT =
+      ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT;
   public static final String DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY =
       ScmConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY;
   public static final int DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT =
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index a49c198..c8682bd 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -253,6 +253,14 @@
     </description>
   </property>
   <property>
+    <name>dfs.ratis.client.request.timeout.duration</name>
+    <value>3s</value>
+    <tag>OZONE, RATIS, MANAGEMENT</tag>
+    <description>The timeout duration for ratis client request.It should be
+        set greater than leader election timeout in Ratis.
+    </description>
+  </property>
+  <property>
     <name>dfs.ratis.client.request.max.retries</name>
     <value>180</value>
     <tag>OZONE, RATIS, MANAGEMENT</tag>
@@ -428,6 +436,14 @@
     </description>
   </property>
   <property>
+    <name>ozone.client.watch.request.timeout</name>
+    <value>30s</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Timeout for the watch API in Ratis client to acknowledge
+      a particular request getting replayed to all servers.
+    </description>
+  </property>
+  <property>
     <name>ozone.client.max.retries</name>
     <value>100</value>
     <tag>OZONE, CLIENT</tag>
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
index 4af792a..1aa10d8 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/BlockOutputStreamEntry.java
@@ -111,7 +111,7 @@ public final class BlockOutputStreamEntry extends OutputStream {
       this.outputStream =
           new BlockOutputStream(blockID, xceiverClientManager,
               pipeline, chunkSize, streamBufferFlushSize,
-              streamBufferMaxSize, bufferPool, checksumType,
+              streamBufferMaxSize, watchTimeout, bufferPool, checksumType,
               bytesPerChecksum);
     }
   }
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
index 7cf55d6..28916f9 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyOutputStream.java
@@ -586,6 +586,11 @@ public class KeyOutputStream extends OutputStream {
       return this;
     }
 
+    public Builder setWatchTimeout(long timeout) {
+      this.watchTimeout = timeout;
+      return this;
+    }
+
     public Builder setChecksumType(ChecksumType cType) {
       this.checksumType = cType;
       return this;
diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
index 2982e38..66b789f 100644
--- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
+++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/rpc/RpcClient.java
@@ -99,6 +99,7 @@ import java.net.URI;
 import java.security.InvalidKeyException;
 import java.security.SecureRandom;
 import java.util.*;
+import java.util.concurrent.TimeUnit;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
@@ -127,6 +128,7 @@ public class RpcClient implements ClientProtocol {
   private final long streamBufferFlushSize;
   private final long streamBufferMaxSize;
   private final long blockSize;
+  private final long watchTimeout;
   private final ClientId clientId = ClientId.randomId();
   private final int maxRetryCount;
   private final long retryInterval;
@@ -186,6 +188,10 @@ public class RpcClient implements ClientProtocol {
             StorageUnit.BYTES);
     blockSize = (long) conf.getStorageSize(OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE,
         OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES);
+    watchTimeout =
+        conf.getTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+            OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT_DEFAULT,
+            TimeUnit.MILLISECONDS);
 
     int configuredChecksumSize = (int) conf.getStorageSize(
         OzoneConfigKeys.OZONE_CLIENT_BYTES_PER_CHECKSUM,
@@ -884,6 +890,7 @@ public class RpcClient implements ClientProtocol {
             .setFactor(openKey.getKeyInfo().getFactor())
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setBytesPerChecksum(bytesPerChecksum)
             .setChecksumType(checksumType)
@@ -1184,6 +1191,7 @@ public class RpcClient implements ClientProtocol {
             .setFactor(HddsProtos.ReplicationFactor.valueOf(factor.getValue()))
             .setStreamBufferFlushSize(streamBufferFlushSize)
             .setStreamBufferMaxSize(streamBufferMaxSize)
+            .setWatchTimeout(watchTimeout)
             .setBlockSize(blockSize)
             .setChecksumType(checksumType)
             .setBytesPerChecksum(bytesPerChecksum)
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
index 3f5d33e..f862fd2 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/RatisTestHelper.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 import org.apache.hadoop.hdds.ratis.RatisHelper;
+import org.apache.hadoop.hdds.scm.client.HddsClientUtils;
 import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
 import org.apache.hadoop.ozone.client.protocol.ClientProtocol;
 import org.apache.hadoop.ozone.client.rpc.RpcClient;
@@ -38,6 +39,7 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.util.TimeDuration;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -121,8 +123,13 @@ public interface RatisTestHelper {
       RpcType rpc, DatanodeDetails dd, Pipeline pipeline) throws IOException {
     final RaftPeer p = RatisHelper.toRaftPeer(dd);
     final OzoneConfiguration conf = new OzoneConfiguration();
+    final int maxOutstandingRequests =
+        HddsClientUtils.getMaxOutstandingRequests(conf);
+    final TimeDuration requestTimeout =
+        RatisHelper.getClientRequestTimeout(conf);
     final RaftClient client =
-        newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf), conf);
+        newRaftClient(rpc, p, RatisHelper.createRetryPolicy(conf),
+            maxOutstandingRequests, requestTimeout, conf);
     client.groupAdd(RatisHelper.newRaftGroup(pipeline), p.getId());
   }
 }
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
index fda6228..fd2cea3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/Test2WayCommitInRatis.java
@@ -113,6 +113,8 @@ public class Test2WayCommitInRatis {
   @Test
   public void test2WayCommitForRetryfailure() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     startCluster(conf);
     GenericTestUtils.LogCapturer logCapturer =
@@ -140,7 +142,7 @@ public class Test2WayCommitInRatis {
         .getCloseContainer(pipeline,
             container1.getContainerInfo().getContainerID()));
     reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex(), 20000);
 
     // commitInfo Map will be reduced to 2 here
     Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
index 96226d8..2b41012 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStream.java
@@ -76,6 +76,7 @@ public class TestBlockOutputStream {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
index 3f1d9ff..3cbe06d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestBlockOutputStreamWithFailures.java
@@ -83,6 +83,7 @@ public class TestBlockOutputStreamWithFailures {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "1s");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 5, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
index c2444c1..e18b222 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCloseContainerHandlingByClient.java
@@ -85,6 +85,7 @@ public class TestCloseContainerHandlingByClient {
   public static void init() throws Exception {
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
index eaceb04..8089ac3 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestCommitWatcher.java
@@ -87,6 +87,7 @@ public class TestCommitWatcher {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
@@ -138,7 +139,7 @@ public class TestCommitWatcher {
     Assert.assertEquals(1, xceiverClient.getRefcount());
     Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
     BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
     List<XceiverClientReply> replies = new ArrayList<>();
     long length = 0;
@@ -212,7 +213,7 @@ public class TestCommitWatcher {
     Assert.assertEquals(1, xceiverClient.getRefcount());
     Assert.assertTrue(xceiverClient instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis) xceiverClient;
-    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient);
+    CommitWatcher watcher = new CommitWatcher(bufferPool, ratisClient, 10000);
     BlockID blockID = ContainerTestHelper.getTestBlockID(containerId);
     List<XceiverClientReply> replies = new ArrayList<>();
     long length = 0;
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
index d1f2016..a7f5960 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestFailureHandlingByClient.java
@@ -85,6 +85,8 @@ public class TestFailureHandlingByClient {
     conf = new OzoneConfiguration();
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index bb7b6f0..d834350 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -75,6 +75,7 @@ public class TestKeyInputStream {
     flushSize = 4 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 3, TimeUnit.SECONDS);
     conf.setQuietMode(false);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
index f532f4d..5717f58 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestMultiBlockWritesWithDnFailures.java
@@ -77,6 +77,8 @@ public class TestMultiBlockWritesWithDnFailures {
     conf = new OzoneConfiguration();
     chunkSize = (int) OzoneConsts.MB;
     blockSize = 4 * chunkSize;
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 5,
+        TimeUnit.SECONDS);
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
     conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 100, TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
index 0151c6e..6758f4f 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestOzoneClientRetriesOnException.java
@@ -84,6 +84,7 @@ public class TestOzoneClientRetriesOnException {
     flushSize = 2 * chunkSize;
     maxFlushSize = 2 * flushSize;
     blockSize = 2 * maxFlushSize;
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     conf.setTimeDuration(HDDS_SCM_WATCHER_TIMEOUT, 1000, TimeUnit.MILLISECONDS);
    // conf.setTimeDuration(OZONE_SCM_STALENODE_INTERVAL, 30, TimeUnit.SECONDS);
     conf.set(OzoneConfigKeys.OZONE_CLIENT_CHECKSUM_TYPE, "NONE");
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
index 9f9d5af..1343a03 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestReadRetries.java
@@ -174,7 +174,7 @@ public class TestReadRetries {
     Assert.assertTrue(clientSpi instanceof XceiverClientRatis);
     XceiverClientRatis ratisClient = (XceiverClientRatis)clientSpi;
 
-    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId());
+    ratisClient.watchForCommit(keyInfo.getBlockCommitSequenceId(), 5000);
     // shutdown the datanode
     cluster.shutdownHddsDatanode(datanodeDetails);
 
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
index 645db6a..5808655 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestWatchForCommit.java
@@ -136,6 +136,8 @@ public class TestWatchForCommit {
     // and will be captured in keyOutputStream and the failover will happen
     // to a different block
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     conf.setTimeDuration(
         OzoneConfigKeys.DFS_RATIS_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
@@ -269,9 +271,53 @@ public class TestWatchForCommit {
   }
 
   @Test
+  public void testWatchForCommitWithSmallerTimeoutValue() throws Exception {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
+    startCluster(conf);
+    XceiverClientManager clientManager = new XceiverClientManager(conf);
+    ContainerWithPipeline container1 = storageContainerLocationClient
+        .allocateContainer(HddsProtos.ReplicationType.RATIS,
+            HddsProtos.ReplicationFactor.THREE, OzoneConsts.OZONE);
+    XceiverClientSpi xceiverClient = clientManager
+        .acquireClient(container1.getPipeline());
+    Assert.assertEquals(1, xceiverClient.getRefcount());
+    Assert.assertEquals(container1.getPipeline(),
+        xceiverClient.getPipeline());
+    Pipeline pipeline = xceiverClient.getPipeline();
+    XceiverClientReply reply = xceiverClient.sendCommandAsync(
+        ContainerTestHelper.getCreateContainerRequest(
+            container1.getContainerInfo().getContainerID(),
+            xceiverClient.getPipeline()));
+    reply.getResponse().get();
+    long index = reply.getLogIndex();
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(0));
+    cluster.shutdownHddsDatanode(pipeline.getNodes().get(1));
+    try {
+      // just watch for a log index which in not updated in the commitInfo Map
+      // as well as there is no logIndex generate in Ratis.
+      // The basic idea here is just to test if its throws an exception.
+      xceiverClient
+          .watchForCommit(index + new Random().nextInt(100) + 10, 3000);
+      Assert.fail("expected exception not thrown");
+    } catch (Exception e) {
+      Assert.assertTrue(
+          HddsClientUtils.checkForException(e) instanceof TimeoutException);
+    }
+    // After releasing the xceiverClient, this connection should be closed
+    // and any container operations should fail
+    clientManager.releaseClient(xceiverClient, false);
+    shutdown();
+  }
+
+  @Test
   public void testWatchForCommitForRetryfailure() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
-    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 10);
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT,
+        100, TimeUnit.SECONDS);
+    conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
     startCluster(conf);
     XceiverClientManager clientManager = new XceiverClientManager(conf);
     ContainerWithPipeline container1 = storageContainerLocationClient
@@ -297,7 +343,7 @@ public class TestWatchForCommit {
       // as well as there is no logIndex generate in Ratis.
       // The basic idea here is just to test if its throws an exception.
       xceiverClient
-          .watchForCommit(index + new Random().nextInt(100) + 10);
+          .watchForCommit(index + new Random().nextInt(100) + 10, 20000);
       Assert.fail("expected exception not thrown");
     } catch (Exception e) {
       Assert.assertTrue(e instanceof ExecutionException);
@@ -314,8 +360,9 @@ public class TestWatchForCommit {
   @Test
   public void test2WayCommitForTimeoutException() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 3,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
-    conf.set("raft.client.watch.request.timeout", "3s");
     startCluster(conf);
     GenericTestUtils.LogCapturer logCapturer =
         GenericTestUtils.LogCapturer.captureLogs(XceiverClientRatis.LOG);
@@ -342,7 +389,7 @@ public class TestWatchForCommit {
         .getCloseContainer(pipeline,
             container1.getContainerInfo().getContainerID()));
     reply.getResponse().get();
-    xceiverClient.watchForCommit(reply.getLogIndex());
+    xceiverClient.watchForCommit(reply.getLogIndex(), 3000);
 
     // commitInfo Map will be reduced to 2 here
     Assert.assertEquals(2, ratisClient.getCommitInfoMap().size());
@@ -358,6 +405,8 @@ public class TestWatchForCommit {
   @Test
   public void testWatchForCommitForGroupMismatchException() throws Exception {
     OzoneConfiguration conf = new OzoneConfiguration();
+    conf.setTimeDuration(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, 20,
+        TimeUnit.SECONDS);
     conf.setInt(OzoneConfigKeys.DFS_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY, 20);
 
     // mark the node stale early so that pipleline gets destroyed quickly
@@ -391,7 +440,8 @@ public class TestWatchForCommit {
       // as well as there is no logIndex generate in Ratis.
       // The basic idea here is just to test if its throws an exception.
       xceiverClient
-          .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10);
+          .watchForCommit(reply.getLogIndex() + new Random().nextInt(100) + 10,
+              20000);
       Assert.fail("Expected exception not thrown");
     } catch(Exception e) {
       Assert.assertTrue(HddsClientUtils
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
index 7857e1f..fdcb822 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestDataValidate.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -39,6 +40,7 @@ public abstract class TestDataValidate {
    *
    */
   static void startCluster(OzoneConfiguration conf) throws Exception {
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
index bef3330..315d1ee 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestOzoneClientKeyGenerator.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.freon;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -67,6 +68,7 @@ public class TestOzoneClientKeyGenerator {
     if (conf == null) {
       conf = new OzoneConfiguration();
     }
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     MiniOzoneCluster cluster = MiniOzoneCluster.newBuilder(conf)
         .setNumDatanodes(5)
         .build();
diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
index 218c570..45ea23d 100644
--- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
+++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/freon/TestRandomKeyGenerator.java
@@ -22,6 +22,7 @@ import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -44,6 +45,7 @@ public class TestRandomKeyGenerator {
   @BeforeClass
   public static void init() throws Exception {
     conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_CLIENT_WATCH_REQUEST_TIMEOUT, "5000ms");
     cluster = MiniOzoneCluster.newBuilder(conf).setNumDatanodes(5).build();
     cluster.waitForClusterToBeReady();
   }
diff --git a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
index 16973ac..c4c84cb 100644
--- a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
+++ b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/freon/DatanodeChunkGenerator.java
@@ -181,7 +181,7 @@ public class DatanodeChunkGenerator extends BaseFreonGenerator implements
         XceiverClientReply xceiverClientReply =
             xceiverClientSpi.sendCommandAsync(request);
         xceiverClientSpi
-            .watchForCommit(xceiverClientReply.getLogIndex());
+            .watchForCommit(xceiverClientReply.getLogIndex(), 1000L);
 
       } else {
         xceiverClientSpi.sendCommand(request);


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: ozone-commits-help@hadoop.apache.org


Mime
View raw message