hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject hbase git commit: HBASE-16388 Prevent client threads being blocked by only one slow region server
Date Wed, 14 Sep 2016 16:28:29 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 ac1ee77f4 -> 069d1f73f


HBASE-16388 Prevent client threads being blocked by only one slow region server

Signed-off-by: stack <stack@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/069d1f73
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/069d1f73
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/069d1f73

Branch: refs/heads/branch-1
Commit: 069d1f73fa7e1a2b4c21ba95dea867d077a51068
Parents: ac1ee77
Author: Phil Yang <ud1937@gmail.com>
Authored: Wed Sep 14 14:02:34 2016 +0800
Committer: stack <stack@apache.org>
Committed: Wed Sep 14 09:28:20 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hbase/ipc/AbstractRpcClient.java     |  25 +++++
 .../hbase/ipc/ServerTooBusyException.java       |  38 +++++++
 .../org/apache/hadoop/hbase/HConstants.java     |  12 +++
 .../src/main/resources/hbase-default.xml        |  16 ++-
 .../org/apache/hadoop/hbase/client/TestHCM.java | 107 ++++++++++++++++++-
 5 files changed, 194 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/069d1f73/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
index 6e9f61b..3740b7f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/AbstractRpcClient.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hbase.ipc;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.protobuf.BlockingRpcChannel;
 import com.google.protobuf.Descriptors;
 import com.google.protobuf.Message;
@@ -31,6 +34,8 @@ import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -80,6 +85,16 @@ public abstract class AbstractRpcClient implements RpcClient {
   protected final int readTO;
   protected final int writeTO;
 
+  private int maxConcurrentCallsPerServer;
+
+  private static final LoadingCache<InetSocketAddress, AtomicInteger> concurrentCounterCache
=
+      CacheBuilder.newBuilder().expireAfterAccess(1, TimeUnit.HOURS).
+          build(new CacheLoader<InetSocketAddress, AtomicInteger>() {
+            @Override public AtomicInteger load(InetSocketAddress key) throws Exception {
+              return new AtomicInteger(0);
+            }
+          });
+
   /**
    * Construct an IPC client for the cluster <code>clusterId</code>
    *
@@ -110,6 +125,9 @@ public abstract class AbstractRpcClient implements RpcClient {
     this.readTO = conf.getInt(SOCKET_TIMEOUT_READ, DEFAULT_SOCKET_TIMEOUT_READ);
     this.writeTO = conf.getInt(SOCKET_TIMEOUT_WRITE, DEFAULT_SOCKET_TIMEOUT_WRITE);
     this.metrics = metrics;
+    this.maxConcurrentCallsPerServer = conf.getInt(
+        HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
+        HConstants.DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD);
 
     // login the server principal (if using secure Hadoop)
     if (LOG.isDebugEnabled()) {
@@ -221,7 +239,12 @@ public abstract class AbstractRpcClient implements RpcClient {
     }
 
     Pair<Message, CellScanner> val;
+    AtomicInteger counter = concurrentCounterCache.getUnchecked(isa);
+    int count = counter.incrementAndGet();
     try {
+      if (count > maxConcurrentCallsPerServer) {
+        throw new ServerTooBusyException(isa, count);
+      }
       final MetricsConnection.CallStats cs = MetricsConnection.newCallStats();
       cs.setStartTime(EnvironmentEdgeManager.currentTime());
       val = call(pcrc, md, param, returnType, ticket, isa, cs);
@@ -238,6 +261,8 @@ public abstract class AbstractRpcClient implements RpcClient {
       return val.getFirst();
     } catch (Throwable e) {
       throw new ServiceException(e);
+    } finally {
+      counter.decrementAndGet();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/069d1f73/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
new file mode 100644
index 0000000..7ee2a3b
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/ipc/ServerTooBusyException.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.net.InetSocketAddress;
+
+import org.apache.hadoop.hbase.DoNotRetryIOException;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.classification.InterfaceStability;
+
+/**
+ * Throw this in rpc call if there are too many pending requests for one region server
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public class ServerTooBusyException extends DoNotRetryIOException {
+
+  public ServerTooBusyException(InetSocketAddress address, long count){
+    super("There are "+count+" concurrent rpc requests for "+address);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hbase/blob/069d1f73/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 61b54ee..ad9241f 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -723,6 +723,18 @@ public final class HConstants {
   public static final int DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS = 1;
 
   /**
+   * The maximum number of concurrent pending RPC requests for one server in process level.
+   */
+  public static final String HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD =
+      "hbase.client.perserver.requests.threshold";
+
+  /**
+   * Default value of {@link #HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD}.
+   */
+  public static final int DEFAULT_HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD = Integer.MAX_VALUE;
+
+
+  /**
    * Parameter name for server pause value, used mostly as value to wait before
    * running a retry of a failed operation.
    */

http://git-wip-us.apache.org/repos/asf/hbase/blob/069d1f73/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index 0659dec..16c8849 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -493,24 +493,34 @@ possible configurations would overwhelm and obscure the important.
   <property>
     <name>hbase.client.max.total.tasks</name>
     <value>100</value>
-    <description>The maximum number of concurrent tasks a single HTable instance will
+    <description>The maximum number of concurrent mutation tasks a single HTable instance
will
     send to the cluster.</description>
   </property>
   <property>
     <name>hbase.client.max.perserver.tasks</name>
     <value>5</value>
-    <description>The maximum number of concurrent tasks a single HTable instance will
+    <description>The maximum number of concurrent mutation tasks a single HTable instance
will
     send to a single region server.</description>
   </property>
   <property>
     <name>hbase.client.max.perregion.tasks</name>
     <value>1</value>
-    <description>The maximum number of concurrent connections the client will
+    <description>The maximum number of concurrent mutation tasks the client will
     maintain to a single Region. That is, if there is already
     hbase.client.max.perregion.tasks writes in progress for this region, new puts
     won't be sent to this region until some writes finishes.</description>
   </property>
   <property>
+    <name>hbase.client.perserver.requests.threshold</name>
+    <value>2147483647</value>
+    <description>The max number of concurrent pending requests for one server in all
client threads
+    (process level). Exceeding requests will be thrown ServerTooBusyException immediately
to prevent
+    user's threads being occupied and blocked by only one slow region server. If you use
a fix
+    number of threads to access HBase in a synchronous way, set this to a suitable value
which is
+    related to the number of threads will help you. See
+    https://issues.apache.org/jira/browse/HBASE-16388 for details.</description>
+  </property>
+  <property>
     <name>hbase.client.scanner.caching</name>
     <value>2147483647</value>
     <description>Number of rows that we try to fetch when calling next

http://git-wip-us.apache.org/repos/asf/hbase/blob/069d1f73/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
index d61ad42..8436563 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestHCM.java
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.client;
 
-import static org.junit.Assert.*;
 import com.google.common.collect.Lists;
 
 import java.io.IOException;
@@ -64,10 +63,12 @@ import org.apache.hadoop.hbase.filter.Filter;
 import org.apache.hadoop.hbase.filter.FilterBase;
 import org.apache.hadoop.hbase.ipc.RpcClient;
 import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
+import org.apache.hadoop.hbase.ipc.ServerTooBusyException;
 import org.apache.hadoop.hbase.master.HMaster;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
@@ -155,6 +156,12 @@ public class TestHCM {
         final Get get, final List<Cell> results) throws IOException {
       Threads.sleep(SLEEP_TIME);
     }
+
+    @Override
+    public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,
+        final Put put, final WALEdit edit, final Durability durability) throws IOException
{
+      Threads.sleep(SLEEP_TIME);
+    }
   }
 
   public static class SleepWriteCoprocessor extends BaseRegionObserver {
@@ -191,8 +198,13 @@ public class TestHCM {
     // Up the handlers; this test needs more than usual.
     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HIGH_PRIORITY_HANDLER_COUNT,
10);
     TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, RPC_RETRY);
+
     // simulate queue blocking in testDropTimeoutRequest
     TEST_UTIL.getConfiguration().setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 1);
+
+    // Used in testServerBusyException
+    TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_PERSERVER_REQUESTS_THRESHOLD,
3);
+
     TEST_UTIL.startMiniCluster(2);
   }
 
@@ -1500,4 +1512,97 @@ public class TestHCM {
     table.close();
     connection.close();
   }
+
+  private class TestGetThread extends Thread {
+
+    Table table;
+    int getServerBusyException = 0;
+
+    TestGetThread(Table table){
+      this.table = table;
+    }
+
+    @Override
+    public void run() {
+      try {
+        table.get(new Get(ROW));
+      } catch (ServerTooBusyException e) {
+        getServerBusyException = 1;
+      } catch (IOException ignore) {
+      }
+    }
+  }
+
+  private class TestPutThread extends Thread {
+    Table table;
+    int getServerBusyException = 0;
+
+    TestPutThread(Table table){
+      this.table = table;
+    }
+
+    @Override
+    public void run() {
+      try {
+        Put p = new Put(ROW);
+        p.addColumn(FAM_NAM,new byte[]{0}, new byte[]{0});
+        table.put(p);
+      } catch (RetriesExhaustedWithDetailsException e) {
+        // For put we use AsyncProcess and it will wrap all exceptions to this.
+        if (e.exceptions.get(0) instanceof ServerTooBusyException) {
+          getServerBusyException = 1;
+        }
+      } catch (IOException ignore) {
+      }
+    }
+  }
+
+  @Test()
+  public void testServerBusyException() throws Exception {
+    HTableDescriptor hdt = TEST_UTIL.createTableDescriptor("HCM-testServerBusy");
+    hdt.addCoprocessor(SleepCoprocessor.class.getName());
+    Configuration c = new Configuration(TEST_UTIL.getConfiguration());
+    TEST_UTIL.createTable(hdt, new byte[][] { FAM_NAM }, c);
+
+    TestGetThread tg1 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestGetThread tg2 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestGetThread tg3 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestGetThread tg4 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestGetThread tg5 = new TestGetThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    tg1.start();
+    tg2.start();
+    tg3.start();
+    tg4.start();
+    tg5.start();
+    tg1.join();
+    tg2.join();
+    tg3.join();
+    tg4.join();
+    tg5.join();
+    assertEquals(2,
+        tg1.getServerBusyException + tg2.getServerBusyException + tg3.getServerBusyException
+            + tg4.getServerBusyException + tg5.getServerBusyException);
+
+    // Put has its own logic in HTable, test Put alone. We use AsyncProcess for Put (use
multi at
+    // RPC level) and it wrap exceptions to RetriesExhaustedWithDetailsException.
+
+    TestPutThread tp1 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestPutThread tp2 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestPutThread tp3 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestPutThread tp4 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    TestPutThread tp5 = new TestPutThread(TEST_UTIL.getConnection().getTable(hdt.getTableName()));
+    tp1.start();
+    tp2.start();
+    tp3.start();
+    tp4.start();
+    tp5.start();
+    tp1.join();
+    tp2.join();
+    tp3.join();
+    tp4.join();
+    tp5.join();
+    assertEquals(2,
+        tp1.getServerBusyException + tp2.getServerBusyException + tp3.getServerBusyException
+            + tp4.getServerBusyException + tp5.getServerBusyException);
+  }
 }


Mime
View raw message