hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d...@apache.org
Subject hbase git commit: HBASE-13036. Meta scanner should use its own threadpool
Date Wed, 18 Feb 2015 19:00:28 GMT
Repository: hbase
Updated Branches:
  refs/heads/branch-1 eb1f46f2b -> e8a34fb38


HBASE-13036. Meta scanner should use its own threadpool


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e8a34fb3
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e8a34fb3
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e8a34fb3

Branch: refs/heads/branch-1
Commit: e8a34fb3845ddce36dc3aa2afc0b5f7fce62a77d
Parents: eb1f46f
Author: Devaraj Das <ddas@apache.org>
Authored: Wed Feb 18 10:23:02 2015 -0800
Committer: Devaraj Das <ddas@apache.org>
Committed: Wed Feb 18 10:54:31 2015 -0800

----------------------------------------------------------------------
 .../hadoop/hbase/client/ConnectionManager.java  | 103 +++++++++++++------
 .../org/apache/hadoop/hbase/client/HTable.java  |   1 +
 .../hbase/client/TestMetaWithReplicas.java      |  20 ++++
 3 files changed, 90 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a34fb3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
index 4b267c0..17d1378 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ConnectionManager.java
@@ -564,6 +564,9 @@ class ConnectionManager {
     // thread executor shared by all HTableInterface instances created
     // by this connection
     private volatile ExecutorService batchPool = null;
+    // meta thread executor shared by all HTableInterface instances created
+    // by this connection
+    private volatile ExecutorService metaLookupPool = null;
     private volatile boolean cleanupPool = false;
 
     private final Configuration conf;
@@ -750,52 +753,84 @@ class ConnectionManager {
 
     private ExecutorService getBatchPool() {
       if (batchPool == null) {
-        // shared HTable thread executor not yet initialized
         synchronized (this) {
           if (batchPool == null) {
-            int maxThreads = conf.getInt("hbase.hconnection.threads.max", 256);
-            int coreThreads = conf.getInt("hbase.hconnection.threads.core", 256);
-            if (maxThreads == 0) {
-              maxThreads = Runtime.getRuntime().availableProcessors() * 8;
-            }
-            if (coreThreads == 0) {
-              coreThreads = Runtime.getRuntime().availableProcessors() * 8;
-            }
-            long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime",
60);
-            LinkedBlockingQueue<Runnable> workQueue =
-              new LinkedBlockingQueue<Runnable>(maxThreads *
-                conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
-                  HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
-            ThreadPoolExecutor tpe = new ThreadPoolExecutor(
-                coreThreads,
-                maxThreads,
-                keepAliveTime,
-                TimeUnit.SECONDS,
-                workQueue,
-                Threads.newDaemonThreadFactory(toString() + "-shared-"));
-            tpe.allowCoreThreadTimeOut(true);
-            this.batchPool = tpe;
+            this.batchPool = getThreadPool(conf.getInt("hbase.hconnection.threads.max", 256),
+                conf.getInt("hbase.hconnection.threads.core", 256), "-shared-");
+            this.cleanupPool = true;
           }
-          this.cleanupPool = true;
         }
       }
       return this.batchPool;
     }
 
+    private ExecutorService getThreadPool(int maxThreads, int coreThreads, String nameHint)
{
+      // shared HTable thread executor not yet initialized
+      if (maxThreads == 0) {
+        maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+      }
+      if (coreThreads == 0) {
+        coreThreads = Runtime.getRuntime().availableProcessors() * 8;
+      }
+      long keepAliveTime = conf.getLong("hbase.hconnection.threads.keepalivetime", 60);
+      LinkedBlockingQueue<Runnable> workQueue =
+          new LinkedBlockingQueue<Runnable>(maxThreads *
+              conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
+                  HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
+      ThreadPoolExecutor tpe = new ThreadPoolExecutor(
+          coreThreads,
+          maxThreads,
+          keepAliveTime,
+          TimeUnit.SECONDS,
+          workQueue,
+          Threads.newDaemonThreadFactory(toString() + "-shared-"));
+      tpe.allowCoreThreadTimeOut(true);
+      return tpe;
+    }
+
+    private ExecutorService getMetaLookupPool() {
+      if (this.metaLookupPool == null) {
+        synchronized (this) {
+          if (this.metaLookupPool == null) {
+            //The meta lookup can happen on replicas of the meta (if the appropriate configs
+            //are enabled).In a replicated-meta setup, the number '3' is assumed as the max
+            //number of replicas by default (unless it is configured to be of a higher value).
+            //In a non-replicated-meta setup, only one thread would be active.
+            this.metaLookupPool = getThreadPool(
+               conf.getInt("hbase.hconnection.meta.lookup.threads.max", 3),
+               conf.getInt("hbase.hconnection.meta.lookup.threads.max.core", 3),
+               "-metaLookup-shared-");
+          }
+        }
+      }
+      return this.metaLookupPool;
+    }
+
+    protected ExecutorService getCurrentMetaLookupPool() {
+      return metaLookupPool;
+    }
+
     protected ExecutorService getCurrentBatchPool() {
       return batchPool;
     }
 
-    private void shutdownBatchPool() {
+    private void shutdownPools() {
       if (this.cleanupPool && this.batchPool != null && !this.batchPool.isShutdown())
{
-        this.batchPool.shutdown();
-        try {
-          if (!this.batchPool.awaitTermination(10, TimeUnit.SECONDS)) {
-            this.batchPool.shutdownNow();
-          }
-        } catch (InterruptedException e) {
-          this.batchPool.shutdownNow();
+        shutdownBatchPool(this.batchPool);
+      }
+      if (this.metaLookupPool != null && !this.metaLookupPool.isShutdown()) {
+        shutdownBatchPool(this.metaLookupPool);
+      }
+    }
+
+    private void shutdownBatchPool(ExecutorService pool) {
+      pool.shutdown();
+      try {
+        if (!pool.awaitTermination(10, TimeUnit.SECONDS)) {
+          pool.shutdownNow();
         }
+      } catch (InterruptedException e) {
+        pool.shutdownNow();
       }
     }
 
@@ -1191,7 +1226,7 @@ class ConnectionManager {
           ReversedClientScanner rcs = null;
           try {
             rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
-              rpcCallerFactory, rpcControllerFactory, getBatchPool(), 0);
+              rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
             regionInfoRow = rcs.next();
           } finally {
             if (rcs != null) {
@@ -2277,7 +2312,7 @@ class ConnectionManager {
         return;
       }
       closeMaster();
-      shutdownBatchPool();
+      shutdownPools();
       this.closed = true;
       closeZooKeeperWatcher();
       this.stubs.clear();

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a34fb3/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
index 4afaf22..d36cd0f 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/HTable.java
@@ -1447,6 +1447,7 @@ public class HTable implements HTableInterface {
           terminated = this.pool.awaitTermination(60, TimeUnit.SECONDS);
         } while (!terminated);
       } catch (InterruptedException e) {
+        this.pool.shutdownNow();
         LOG.warn("waitForTermination interrupted");
       }
     }

http://git-wip-us.apache.org/repos/asf/hbase/blob/e8a34fb3/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
index dcd4a5e..3118821 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestMetaWithReplicas.java
@@ -19,10 +19,12 @@
 package org.apache.hadoop.hbase.client;
 
 import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation;
 import org.apache.hadoop.hbase.regionserver.StorefileRefresherChore;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -222,6 +225,23 @@ public class TestMetaWithReplicas {
   }
 
   @Test
+  public void testMetaLookupThreadPoolCreated() throws Exception {
+    byte[] TABLE = Bytes.toBytes("testMetaLookupThreadPoolCreated");
+    byte[][] FAMILIES = new byte[][] { Bytes.toBytes("foo") };
+    if (TEST_UTIL.getHBaseAdmin().tableExists(TABLE)) {
+      TEST_UTIL.getHBaseAdmin().disableTable(TABLE);
+      TEST_UTIL.getHBaseAdmin().deleteTable(TABLE);
+    }
+    Table htable = TEST_UTIL.createTable(TABLE, FAMILIES, TEST_UTIL.getConfiguration());
+    byte[] row = "test".getBytes();
+    HConnectionImplementation c = ((HConnectionImplementation)((HTable)htable).connection);
+    // check that metalookup pool would get created
+    c.relocateRegion(TABLE, row);
+    ExecutorService ex = c.getCurrentMetaLookupPool();
+    assert(ex != null);
+  }
+
+  @Test
   public void testChangingReplicaCount() throws Exception {
     // tests changing the replica count across master restarts
     // reduce the replica count from 3 to 2


Mime
View raw message