hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhang...@apache.org
Subject hbase git commit: HBASE-16285 Drop RPC requests if it must be considered as timeout at client
Date Wed, 10 Aug 2016 08:58:27 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 cebba7b4d -> 38044ada3


HBASE-16285 Drop RPC requests if it must be considered as timeout at client

Signed-off-by: zhangduo <zhangduo@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/38044ada
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/38044ada
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/38044ada

Branch: refs/heads/branch-1
Commit: 38044ada3253f2606eee553405f9aeedd0f51e5d
Parents: cebba7b
Author: Phil Yang <ud1937@gmail.com>
Authored: Thu Aug 4 15:55:18 2016 +0800
Committer: zhangduo <zhangduo@apache.org>
Committed: Wed Aug 10 16:16:15 2016 +0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/ipc/CallRunner.java |  7 ++-
 .../org/apache/hadoop/hbase/ipc/RpcServer.java  | 12 +++--
 .../hadoop/hbase/ipc/RpcServerInterface.java    | 11 +++--
 .../org/apache/hadoop/hbase/client/TestHCM.java | 46 +++++++++++++++++++-
 4 files changed, 65 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/38044ada/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
index e91699a..5e91beb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/CallRunner.java
@@ -93,6 +93,11 @@ public class CallRunner {
         }
         return;
       }
+      call.startTime = System.currentTimeMillis();
+      if (call.startTime > call.deadline) {
+        RpcServer.LOG.info("Drop timeout call: " + call);
+        return;
+      }
       this.status.setStatus("Setting up call");
       this.status.setConnection(call.connection.getHostAddress(), call.connection.getRemotePort());
       if (RpcServer.LOG.isTraceEnabled()) {
@@ -116,7 +121,7 @@ public class CallRunner {
         }
         // make the call
         resultPair = this.rpcServer.call(call.service, call.md, call.param, call.cellScanner,
-          call.timestamp, this.status, call.timeout);
+          call.timestamp, this.status, call.startTime, call.timeout);
       } catch (Throwable e) {
         RpcServer.LOG.debug(Thread.currentThread().getName() + ": " + call.toShortString(),
e);
         errorThrowable = e;

http://git-wip-us.apache.org/repos/asf/hbase/blob/38044ada/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 404f2ec..1576c2a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -266,6 +266,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   private static final String WARN_RESPONSE_TIME = "hbase.ipc.warn.response.time";
   private static final String WARN_RESPONSE_SIZE = "hbase.ipc.warn.response.size";
 
+
   /**
    * Minimum allowable timeout (in milliseconds) in rpc request's header. This
    * configuration exists to prevent the rpc service regarding this request as timeout immediately.
@@ -315,6 +316,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
     protected long timestamp;      // the time received when response is null
                                    // the time served when response is not null
     protected int timeout;
+    protected long startTime;
+    protected long deadline;// the deadline to handle this call, if exceed we can drop it.
+
     /**
      * Chain of buffers to send as response.
      */
@@ -356,6 +360,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       this.retryImmediatelySupported =
           connection == null? null: connection.retryImmediatelySupported;
       this.timeout = timeout;
+      this.deadline = this.timeout > 0 ? this.timestamp + this.timeout : Long.MAX_VALUE;
     }
 
     /**
@@ -1933,7 +1938,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
           ? new TraceInfo(header.getTraceInfo().getTraceId(), header.getTraceInfo().getParentId())
           : null;
       int timeout = 0;
-      if (header.hasTimeout()){
+      if (header.hasTimeout() && header.getTimeout() > 0){
         timeout = Math.max(minClientRequestTimeout, header.getTimeout());
       }
       Call call = new Call(id, this.service, md, header, param, cellScanner, this, responder,
@@ -2239,7 +2244,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor
md,
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
       throws IOException {
-    return call(service, md, param, cellScanner, receiveTime, status, 0);
+    return call(service, md, param, cellScanner, receiveTime, status, System.currentTimeMillis(),0);
   }
 
   /**
@@ -2250,7 +2255,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
   @Override
   public Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor
md,
       Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
-      int timeout)
+      long startTime, int timeout)
   throws IOException {
     try {
       status.setRPC(md.getName(), new Object[]{param}, receiveTime);
@@ -2258,7 +2263,6 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver
{
       status.setRPCPacket(param);
       status.resume("Servicing call");
       //get an instance of the method arg type
-      long startTime = System.currentTimeMillis();
       PayloadCarryingRpcController controller = new PayloadCarryingRpcController(cellScanner);
       controller.setCallTimeout(timeout);
       Message result = service.callBlockingMethod(md, controller, param);

http://git-wip-us.apache.org/repos/asf/hbase/blob/38044ada/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
index 12b158d..5a9092b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServerInterface.java
@@ -48,14 +48,17 @@ public interface RpcServerInterface {
   void setSocketSendBufSize(int size);
   InetSocketAddress getListenerAddress();
 
+  /**
+   * @deprecated As of release 1.3, this will be removed in HBase 3.0
+   */
+  @Deprecated
   Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
     Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status)
   throws IOException, ServiceException;
 
-  Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md,
-      Message param, CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status,
-      int timeout)
-      throws IOException, ServiceException;
+  Pair<Message, CellScanner> call(BlockingService service, MethodDescriptor md, Message
param,
+      CellScanner cellScanner, long receiveTime, MonitoredRPCHandler status, long startTime,
+      int timeout) throws IOException, ServiceException;
 
   void setErrorHandler(HBaseRPCErrorHandler handler);
   HBaseRPCErrorHandler getErrorHandler();

http://git-wip-us.apache.org/repos/asf/hbase/blob/38044ada/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index 443cd12..d61ad42 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hbase.client;
 
 import static org.junit.Assert.*;
+import com.google.common.collect.Lists;
 
 import java.io.IOException;
 import java.lang.reflect.Field;
@@ -83,7 +84,12 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.rules.TestRule;
 
-import com.google.common.collect.Lists;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 /**
  * This class is for testing HBaseConnectionManager features
@@ -110,6 +116,7 @@ public class TestHCM {
   private static final byte[] ROW = Bytes.toBytes("bbb");
   private static final byte[] ROW_X = Bytes.toBytes("xxx");
   private static Random _randy = new Random();
+  private static final int RPC_RETRY = 5;
 
 /**
 * This copro sleeps 20 second. The first call it fails. The second time, it works.
@@ -160,12 +167,32 @@ public class TestHCM {
     }
   }
 
+  public static class SleepLongerAtFirstCoprocessor extends BaseRegionObserver {
+    public static final int SLEEP_TIME = 2000;
+    static final AtomicLong ct = new AtomicLong(0);
+
+    @Override
+    public void preGetOp(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Get get, final List<Cell> results) throws IOException {
+      // After first sleep, all requests are timeout except the last retry. If we handle
+      // all the following requests, finally the last request is also timeout. If we drop
all
+      // timeout requests, we can handle the last request immediately and it will not timeout.
+      if (ct.incrementAndGet() <= 1) {
+        Threads.sleep(SLEEP_TIME * (RPC_RETRY-1) * 2);
+      } else {
+        Threads.sleep(SLEEP_TIME);
+      }
+    }
+  }
+
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
     TEST_UTIL.getConfiguration().setBoolean(HConstants.STATUS_PUBLISHED, true);
     // Up the handlers; this test needs more than usual.
     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
10);
-    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
+    // simulate queue blocking in testDropTimeoutRequest
+    TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
     TEST_UTIL.startMiniCluster(2);
   }
 
@@ -438,6 +465,21 @@ public class TestHCM {
     }
   }
 
+  @Test
+  public void testDropTimeoutRequest() throws Exception {
+    // Simulate the situation that the server is slow and client retries for several times
because
+    // of timeout. When a request can be handled after waiting in the queue, we will drop
it if
+    // it has been considered as timeout at client. If we don't drop it, the server will
waste time
+    // on handling timeout requests and finally all requests timeout and client throws exception.
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testDropTimeputRequest");
+    hdt.addCoprocessor(SleepLongerAtFirstCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    try (Table t = TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c)) {
+      t.setRpcTimeout(SleepLongerAtFirstCoprocessor.SLEEP_TIME * 2);
+      t.get(new Get(FAM_NAM));
+    }
+  }
+
   /**
    * Test starting from 0 index when RpcRetryingCaller calculate the backoff time.
    */


Mime
View raw message