Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D675D200BBE for ; Fri, 11 Nov 2016 15:50:10 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D50F8160AF5; Fri, 11 Nov 2016 14:50:10 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 01599160AE4 for ; Fri, 11 Nov 2016 15:50:09 +0100 (CET) Received: (qmail 29164 invoked by uid 500); 11 Nov 2016 14:50:09 -0000 Mailing-List: contact commits-help@hbase.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@hbase.apache.org Delivered-To: mailing list commits@hbase.apache.org Received: (qmail 29155 invoked by uid 99); 11 Nov 2016 14:50:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 11 Nov 2016 14:50:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 13F6DE01F4; Fri, 11 Nov 2016 14:50:08 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: tedyu@apache.org To: commits@hbase.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hbase git commit: HBASE-17047 Add an API to get HBase connection cache statistics (Weiqing Yang) Date: Fri, 11 Nov 2016 14:50:08 +0000 (UTC) archived-at: Fri, 11 Nov 2016 14:50:11 -0000 Repository: hbase Updated Branches: refs/heads/master 8a6d6aa23 -> 81623a353 HBASE-17047 Add an API to get HBase connection cache statistics (Weiqing Yang) Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/81623a35 Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/81623a35 Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/81623a35 Branch: refs/heads/master Commit: 81623a353cdfbeef231f36d60aaa3419267f22d4 Parents: 8a6d6aa Author: tedyu Authored: Fri Nov 11 06:50:01 2016 -0800 Committer: tedyu Committed: Fri Nov 11 06:50:01 2016 -0800 ---------------------------------------------------------------------- .../hbase/spark/HBaseConnectionCache.scala | 38 ++++++++++++++---- .../hbase/spark/HBaseConnectionCacheSuite.scala | 41 +++++++++++++++++++- 2 files changed, 69 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/81623a35/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala index 678e769..fb5833e 100644 --- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala +++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala @@ -34,6 +34,8 @@ private[spark] object HBaseConnectionCache extends Logging { // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey. val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]() + val cacheStat = HBaseConnectionCacheStat(0, 0, 0) + // in milliseconds private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay private var timeout = DEFAULT_TIME_OUT @@ -58,6 +60,13 @@ private[spark] object HBaseConnectionCache extends Logging { housekeepingThread.setDaemon(true) housekeepingThread.start() + def getStat: HBaseConnectionCacheStat = { + connectionMap.synchronized { + cacheStat.numActiveConnections = connectionMap.size + cacheStat.copy() + } + } + def close(): Unit = { try { connectionMap.synchronized { @@ -100,7 +109,9 @@ private[spark] object HBaseConnectionCache extends Logging { connectionMap.synchronized { if (closed) return null - val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn)) + cacheStat.numTotalRequests += 1 + val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated += 1 + new SmartConnection(conn)}) sc.refCount += 1 sc } @@ -136,13 +147,13 @@ private[hbase] case class SmartConnection ( } /** - * Denotes a unique key to an HBase Connection instance. - * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. - * - * In essence, this class captures the properties in Configuration - * that may be used in the process of establishing a connection. - * - */ + * Denotes a unique key to an HBase Connection instance. + * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'. + * + * In essence, this class captures the properties in Configuration + * that may be used in the process of establishing a connection. + * + */ class HBaseConnectionKey(c: Configuration) extends Logging { val conf: Configuration = c val CONNECTION_PROPERTIES: Array[String] = Array[String]( @@ -240,4 +251,15 @@ class HBaseConnectionKey(c: Configuration) extends Logging { } } +/** + * To log the state of 'HBaseConnectionCache' + * + * @param numTotalRequests number of total connection requests to the cache + * @param numActualConnectionsCreated number of actual HBase connections the cache ever created + * @param numActiveConnections number of current alive HBase connections the cache is holding + */ +case class HBaseConnectionCacheStat(var numTotalRequests: Long, + var numActualConnectionsCreated: Long, + var numActiveConnections: Long) + http://git-wip-us.apache.org/repos/asf/hbase/blob/81623a35/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala ---------------------------------------------------------------------- diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala index c9edcc4..6ebf044 100644 --- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala +++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala @@ -76,11 +76,18 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { testWithPressureWithClose() } - def testBasic() { - HBaseConnectionCache.setTimeout(1 * 1000) + def cleanEnv() { HBaseConnectionCache.connectionMap.synchronized { HBaseConnectionCache.connectionMap.clear() + HBaseConnectionCache.cacheStat.numActiveConnections = 0 + HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0 + HBaseConnectionCache.cacheStat.numTotalRequests = 0 } + } + + def testBasic() { + cleanEnv() + HBaseConnectionCache.setTimeout(1 * 1000) val connKeyMocker1 = new HBaseConnectionKeyMocker(1) val connKeyMocker1a = new HBaseConnectionKeyMocker(1) @@ -88,11 +95,20 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { val c1 = HBaseConnectionCache .getConnection(connKeyMocker1, new ConnectionMocker) + + assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 1) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) + val c1a = HBaseConnectionCache .getConnection(connKeyMocker1a, new ConnectionMocker) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 1) + assert(HBaseConnectionCache.getStat.numTotalRequests === 2) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) } val c2 = HBaseConnectionCache @@ -100,16 +116,21 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numTotalRequests === 3) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } c1.close() HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } c1a.close() HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 2) } Thread.sleep(3 * 1000) // Leave housekeeping thread enough time @@ -117,12 +138,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { assert(HBaseConnectionCache.connectionMap.size === 1) assert(HBaseConnectionCache.connectionMap.iterator.next()._1 .asInstanceOf[HBaseConnectionKeyMocker].confId === 2) + assert(HBaseConnectionCache.getStat.numActiveConnections === 1) } c2.close() } def testWithPressureWithoutClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -147,6 +171,10 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { Thread.sleep(1000) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) + var totalRc : Int = 0 HBaseConnectionCache.connectionMap.foreach { x => totalRc += x._2.refCount @@ -161,9 +189,13 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { } Thread.sleep(1000) assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } def testWithPressureWithClose() { + cleanEnv() + class TestThread extends Runnable { override def run() { for (i <- 0 to 999) { @@ -189,11 +221,16 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging { HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 10) + assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 10) } Thread.sleep(6 * 1000) HBaseConnectionCache.connectionMap.synchronized { assert(HBaseConnectionCache.connectionMap.size === 0) + assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10) + assert(HBaseConnectionCache.getStat.numActiveConnections === 0) } } }