hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From st...@apache.org
Subject [hbase] branch branch-1.4 updated: HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time (#743)
Date Fri, 01 Nov 2019 19:16:22 GMT
This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch branch-1.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-1.4 by this push:
     new abeb598  HBASE-23185 Fix high cpu usage because getTable()#put() gets config value
every time (#743)
abeb598 is described below

commit abeb5980a3b119434f1824cdc304eb2a917beceb
Author: bitterfox <bitterfoxc@gmail.com>
AuthorDate: Sat Nov 2 04:16:10 2019 +0900

    HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time (#743)
    
    * HBASE-23185 Fix high cpu usage because getTable()#put() gets config value every time
    
    * Revert fix import order
---
 .../apache/hadoop/hbase/client/AsyncProcess.java   | 90 ++++++++++++++--------
 .../hadoop/hbase/client/BufferedMutatorImpl.java   | 21 +++--
 .../hbase/client/ConnectionConfiguration.java      | 48 +++++++++++-
 .../hadoop/hbase/client/ConnectionManager.java     | 13 +---
 .../org/apache/hadoop/hbase/client/HTable.java     |  8 +-
 5 files changed, 121 insertions(+), 59 deletions(-)

diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
index 4c571e4..0c22473 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncProcess.java
@@ -323,35 +323,60 @@ class AsyncProcess {
 
     this.id = COUNTER.incrementAndGet();
 
-    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-        HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
pause);
-    if (configuredPauseForCQTBE < pause) {
-      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
-          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
-          + ", will use " + pause + " instead.");
-      this.pauseForCQTBE = pause;
-    } else {
-      this.pauseForCQTBE = configuredPauseForCQTBE;
-    }
-    this.numTries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
-        HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+    ConnectionConfiguration connConf =
+      hc.getConfiguration() == conf
+        ? hc.getConnectionConfiguration()
+        // Slow: parse conf in ConnectionConfiguration constructor
+        : new ConnectionConfiguration(conf);
+    if (connConf == null) {
+      // Slow: parse conf in ConnectionConfiguration constructor
+      connConf = new ConnectionConfiguration(conf);
+    }
+
+    this.pause = connConf.getPause();
+    this.pauseForCQTBE = connConf.getPauseForCQTBE();
+
+    this.numTries = connConf.getRetriesNumber();
     this.rpcTimeout = rpcTimeout;
-    this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
-    this.primaryCallTimeoutMicroseconds = conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000);
-
-    this.maxTotalConcurrentTasks = conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-      HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS);
-    this.maxConcurrentTasksPerServer = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS);
-    this.maxConcurrentTasksPerRegion = conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
-          HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS);
-    this.maxHeapSizePerRequest = conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
-          DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE);
-    this.maxHeapSizeSubmit = conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE);
+    this.operationTimeout = connConf.getOperationTimeout();
+
+    // Parse config once and reuse config values of hc's AsyncProcess in AsyncProcess for
put
+    // Can be null when constructing hc's AsyncProcess or it's not reusable
+    AsyncProcess globalAsyncProcess = hc.getConfiguration() == conf ? hc.getAsyncProcess()
: null;
+
+    this.primaryCallTimeoutMicroseconds =
+      globalAsyncProcess == null
+        ? conf.getInt(PRIMARY_CALL_TIMEOUT_KEY, 10000)
+        : globalAsyncProcess.primaryCallTimeoutMicroseconds;
+
+    this.maxTotalConcurrentTasks =
+      globalAsyncProcess == null
+        ? conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+              HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS)
+        : globalAsyncProcess.maxTotalConcurrentTasks;
+    this.maxConcurrentTasksPerServer =
+      globalAsyncProcess == null
+        ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERSERVER_TASKS,
+              HConstants.DEFAULT_HBASE_CLIENT_MAX_PERSERVER_TASKS)
+        : globalAsyncProcess.maxConcurrentTasksPerServer;
+    this.maxConcurrentTasksPerRegion =
+      globalAsyncProcess == null
+        ? conf.getInt(HConstants.HBASE_CLIENT_MAX_PERREGION_TASKS,
+              HConstants.DEFAULT_HBASE_CLIENT_MAX_PERREGION_TASKS)
+        : globalAsyncProcess.maxConcurrentTasksPerRegion;
+    this.maxHeapSizePerRequest =
+      globalAsyncProcess == null
+        ? conf.getLong(HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE,
+              DEFAULT_HBASE_CLIENT_MAX_PERREQUEST_HEAPSIZE)
+        : globalAsyncProcess.maxHeapSizePerRequest;
+    this.maxHeapSizeSubmit =
+      globalAsyncProcess == null
+        ? conf.getLong(HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE, DEFAULT_HBASE_CLIENT_MAX_SUBMIT_HEAPSIZE)
+        : globalAsyncProcess.maxHeapSizeSubmit;
     this.startLogErrorsCnt =
-        conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT);
+      globalAsyncProcess == null
+        ? conf.getInt(START_LOG_ERRORS_AFTER_COUNT_KEY, DEFAULT_START_LOG_ERRORS_AFTER_COUNT)
+        : globalAsyncProcess.startLogErrorsCnt;
 
     if (this.maxTotalConcurrentTasks <= 0) {
       throw new IllegalArgumentException("maxTotalConcurrentTasks=" + maxTotalConcurrentTasks);
@@ -387,11 +412,16 @@ class AsyncProcess {
 
     this.rpcCallerFactory = rpcCaller;
     this.rpcFactory = rpcFactory;
-    this.logBatchErrorDetails = conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false);
+    this.logBatchErrorDetails =
+      globalAsyncProcess == null
+        ? conf.getBoolean(LOG_DETAILS_FOR_BATCH_ERROR, false)
+        : globalAsyncProcess.logBatchErrorDetails;
 
     this.thresholdToLogUndoneTaskDetails =
-        conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
-          DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS);
+      globalAsyncProcess == null
+        ? conf.getInt(THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS,
+              DEFAULT_THRESHOLD_TO_LOG_UNDONE_TASK_DETAILS)
+        : globalAsyncProcess.thresholdToLogUndoneTaskDetails;
   }
 
   public void setRpcTimeout(int rpcTimeout) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
index d207a82..41a1be7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/BufferedMutatorImpl.java
@@ -20,7 +20,6 @@ import java.io.Closeable;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants; // Needed for write rpc timeout
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
 import org.apache.hadoop.hbase.classification.InterfaceStability;
@@ -99,18 +98,18 @@ public class BufferedMutatorImpl implements BufferedMutator {
     this.pool = params.getPool();
     this.listener = params.getListener();
 
-    ConnectionConfiguration tableConf = new ConnectionConfiguration(conf);
+    ConnectionConfiguration connConf = conn.getConnectionConfiguration();
+    if (connConf == null) {
+      // Slow: parse conf in ConnectionConfiguration constructor
+      connConf = new ConnectionConfiguration(conf);
+    }
     this.writeBufferSize = params.getWriteBufferSize() != BufferedMutatorParams.UNSET ?
-        params.getWriteBufferSize() : tableConf.getWriteBufferSize();
+        params.getWriteBufferSize() : connConf.getWriteBufferSize();
     this.maxKeyValueSize = params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET ?
-        params.getMaxKeyValueSize() : tableConf.getMaxKeyValueSize();
-
-    this.writeRpcTimeout = conn.getConfiguration().getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
-        conn.getConfiguration().getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.operationTimeout = conn.getConfiguration().getInt(
-        HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
-        HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
+        params.getMaxKeyValueSize() : connConf.getMaxKeyValueSize();
+
+    this.writeRpcTimeout = connConf.getWriteRpcTimeout();
+    this.operationTimeout = connConf.getOperationTimeout();
     // puts need to track errors globally due to how the APIs currently work.
     ap = new AsyncProcess(connection, conf, pool, rpcCallerFactory, true, rpcFactory, writeRpcTimeout);
   }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
index 65ddc78..2002cea 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionConfiguration.java
@@ -11,6 +11,8 @@
 
 package org.apache.hadoop.hbase.client;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.classification.InterfaceAudience;
@@ -27,6 +29,7 @@ import com.google.common.annotations.VisibleForTesting;
  */
 @InterfaceAudience.Private
 public class ConnectionConfiguration {
+  static final Log LOG = LogFactory.getLog(ConnectionConfiguration.class);
 
   public static final String WRITE_BUFFER_SIZE_KEY = "hbase.client.write.buffer";
   public static final long WRITE_BUFFER_SIZE_DEFAULT = 2097152;
@@ -43,6 +46,10 @@ public class ConnectionConfiguration {
   private final int metaReplicaCallTimeoutMicroSecondScan;
   private final int retries;
   private final int maxKeyValueSize;
+  private final int readRpcTimeout;
+  private final int writeRpcTimeout;
+  private final long pause;
+  private final long pauseForCQTBE;
 
   /**
    * Constructor
@@ -75,9 +82,28 @@ public class ConnectionConfiguration {
             HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT);
 
     this.retries = conf.getInt(
-       HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
+        HConstants.HBASE_CLIENT_RETRIES_NUMBER, HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
 
     this.maxKeyValueSize = conf.getInt(MAX_KEYVALUE_SIZE_KEY, MAX_KEYVALUE_SIZE_DEFAULT);
+
+    this.readRpcTimeout = conf.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
+        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+    this.writeRpcTimeout = conf.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
+        conf.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
+            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+
+    this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE, HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
+    long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
pause);
+    if (configuredPauseForCQTBE < pause) {
+      LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
+          + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
+          + ", will use " + pause + " instead.");
+      this.pauseForCQTBE = pause;
+    } else {
+      this.pauseForCQTBE = configuredPauseForCQTBE;
+    }
   }
 
   /**
@@ -98,6 +124,10 @@ public class ConnectionConfiguration {
         HConstants.HBASE_CLIENT_META_REPLICA_SCAN_TIMEOUT_DEFAULT;
     this.retries = HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER;
     this.maxKeyValueSize = MAX_KEYVALUE_SIZE_DEFAULT;
+    this.readRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.writeRpcTimeout = HConstants.DEFAULT_HBASE_RPC_TIMEOUT;
+    this.pause = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
+    this.pauseForCQTBE = HConstants.DEFAULT_HBASE_CLIENT_PAUSE;
   }
 
   public long getWriteBufferSize() {
@@ -139,4 +169,20 @@ public class ConnectionConfiguration {
   public long getScannerMaxResultSize() {
     return scannerMaxResultSize;
   }
+
+  public int getReadRpcTimeout() {
+    return readRpcTimeout;
+  }
+
+  public int getWriteRpcTimeout() {
+    return writeRpcTimeout;
+  }
+
+  public long getPause() {
+    return pause;
+  }
+
+  public long getPauseForCQTBE() {
+    return pauseForCQTBE;
+  }
 }
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 35ffa3e..11bad47 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -667,17 +667,8 @@ class ConnectionManager {
       this.managed = managed;
       this.connectionConfig = new ConnectionConfiguration(conf);
       this.closed = false;
-      this.pause = conf.getLong(HConstants.HBASE_CLIENT_PAUSE,
-          HConstants.DEFAULT_HBASE_CLIENT_PAUSE);
-      long configuredPauseForCQTBE = conf.getLong(HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE,
pause);
-      if (configuredPauseForCQTBE < pause) {
-        LOG.warn("The " + HConstants.HBASE_CLIENT_PAUSE_FOR_CQTBE + " setting: "
-            + configuredPauseForCQTBE + " is smaller than " + HConstants.HBASE_CLIENT_PAUSE
-            + ", will use " + pause + " instead.");
-        this.pauseForCQTBE = pause;
-      } else {
-        this.pauseForCQTBE = configuredPauseForCQTBE;
-      }
+      this.pause = connectionConfig.getPause();
+      this.pauseForCQTBE = connectionConfig.getPauseForCQTBE();
       this.useMetaReplicas = conf.getBoolean(HConstants.USE_META_REPLICAS,
           HConstants.DEFAULT_USE_META_REPLICAS);
       this.metaReplicaCallTimeoutScanInMicroSecond =
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 91a8f92..cbb7b01 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -366,12 +366,8 @@ public class HTable implements HTableInterface, RegionLocator {
     }
     this.operationTimeout = tableName.isSystemTable() ?
         connConfiguration.getMetaOperationTimeout() : connConfiguration.getOperationTimeout();
-    this.readRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_READ_TIMEOUT_KEY,
-        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
-    this.writeRpcTimeout = configuration.getInt(HConstants.HBASE_RPC_WRITE_TIMEOUT_KEY,
-        configuration.getInt(HConstants.HBASE_RPC_TIMEOUT_KEY,
-            HConstants.DEFAULT_HBASE_RPC_TIMEOUT));
+    this.readRpcTimeout = connConfiguration.getReadRpcTimeout();
+    this.writeRpcTimeout = connConfiguration.getWriteRpcTimeout();
     this.scannerCaching = connConfiguration.getScannerCaching();
     this.scannerMaxResultSize = connConfiguration.getScannerMaxResultSize();
     if (this.rpcCallerFactory == null) {


Mime
View raw message