hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-17047 Add an API to get HBase connection cache statistics (Weiqing Yang)
Date Fri, 11 Nov 2016 14:50:08 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 8a6d6aa23 -> 81623a353


HBASE-17047 Add an API to get HBase connection cache statistics (Weiqing Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/81623a35
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/81623a35
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/81623a35

Branch: refs/heads/master
Commit: 81623a353cdfbeef231f36d60aaa3419267f22d4
Parents: 8a6d6aa
Author: tedyu <yuzhihong@gmail.com>
Authored: Fri Nov 11 06:50:01 2016 -0800
Committer: tedyu <yuzhihong@gmail.com>
Committed: Fri Nov 11 06:50:01 2016 -0800

----------------------------------------------------------------------
 .../hbase/spark/HBaseConnectionCache.scala      | 38 ++++++++++++++----
 .../hbase/spark/HBaseConnectionCacheSuite.scala | 41 +++++++++++++++++++-
 2 files changed, 69 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/81623a35/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index 678e769..fb5833e 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -34,6 +34,8 @@ private[spark] object HBaseConnectionCache extends Logging {
   // A hashmap of Spark-HBase connections. Key is HBaseConnectionKey.
   val connectionMap = new mutable.HashMap[HBaseConnectionKey, SmartConnection]()
 
+  val cacheStat = HBaseConnectionCacheStat(0, 0, 0)
+
   // in milliseconds
   private final val DEFAULT_TIME_OUT: Long = HBaseSparkConf.connectionCloseDelay
   private var timeout = DEFAULT_TIME_OUT
@@ -58,6 +60,13 @@ private[spark] object HBaseConnectionCache extends Logging {
   housekeepingThread.setDaemon(true)
   housekeepingThread.start()
 
+  def getStat: HBaseConnectionCacheStat = {
+    connectionMap.synchronized {
+      cacheStat.numActiveConnections = connectionMap.size
+      cacheStat.copy()
+    }
+  }
+
   def close(): Unit = {
     try {
       connectionMap.synchronized {
@@ -100,7 +109,9 @@ private[spark] object HBaseConnectionCache extends Logging {
     connectionMap.synchronized {
       if (closed)
         return null
-      val sc = connectionMap.getOrElseUpdate(key, new SmartConnection(conn))
+      cacheStat.numTotalRequests += 1
+      val sc = connectionMap.getOrElseUpdate(key, {cacheStat.numActualConnectionsCreated
+= 1
+        new SmartConnection(conn)})
       sc.refCount += 1
       sc
     }
@@ -136,13 +147,13 @@ private[hbase] case class SmartConnection (
 }
 
 /**
-  * Denotes a unique key to an HBase Connection instance.
-  * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
-  *
-  * In essence, this class captures the properties in Configuration
-  * that may be used in the process of establishing a connection.
-  *
-  */
+ * Denotes a unique key to an HBase Connection instance.
+ * Please refer to 'org.apache.hadoop.hbase.client.HConnectionKey'.
+ *
+ * In essence, this class captures the properties in Configuration
+ * that may be used in the process of establishing a connection.
+ *
+ */
 class HBaseConnectionKey(c: Configuration) extends Logging {
   val conf: Configuration = c
   val CONNECTION_PROPERTIES: Array[String] = Array[String](
@@ -240,4 +251,15 @@ class HBaseConnectionKey(c: Configuration) extends Logging {
   }
 }
 
+/**
+ * To log the state of 'HBaseConnectionCache'
+ *
+ * @param numTotalRequests number of total connection requests to the cache
+ * @param numActualConnectionsCreated number of actual HBase connections the cache ever created
+ * @param numActiveConnections number of current alive HBase connections the cache is holding
+ */
+case class HBaseConnectionCacheStat(var numTotalRequests: Long,
+                                    var numActualConnectionsCreated: Long,
+                                    var numActiveConnections: Long)
+
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/81623a35/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
index c9edcc4..6ebf044 100644
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
@@ -76,11 +76,18 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
     testWithPressureWithClose()
   }
 
-  def testBasic() {
-    HBaseConnectionCache.setTimeout(1 * 1000)
+  def cleanEnv() {
     HBaseConnectionCache.connectionMap.synchronized {
       HBaseConnectionCache.connectionMap.clear()
+      HBaseConnectionCache.cacheStat.numActiveConnections = 0
+      HBaseConnectionCache.cacheStat.numActualConnectionsCreated = 0
+      HBaseConnectionCache.cacheStat.numTotalRequests = 0
     }
+  }
+
+  def testBasic() {
+    cleanEnv()
+    HBaseConnectionCache.setTimeout(1 * 1000)
 
     val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
     val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
@@ -88,11 +95,20 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
 
     val c1 = HBaseConnectionCache
       .getConnection(connKeyMocker1, new ConnectionMocker)
+
+    assert(HBaseConnectionCache.connectionMap.size === 1)
+    assert(HBaseConnectionCache.getStat.numTotalRequests === 1)
+    assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
+    assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
+
     val c1a = HBaseConnectionCache
       .getConnection(connKeyMocker1a, new ConnectionMocker)
 
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 1)
+      assert(HBaseConnectionCache.getStat.numTotalRequests === 2)
+      assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 1)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
     }
 
     val c2 = HBaseConnectionCache
@@ -100,16 +116,21 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
 
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 2)
+      assert(HBaseConnectionCache.getStat.numTotalRequests === 3)
+      assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 2)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
     }
 
     c1.close()
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 2)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
     }
 
     c1a.close()
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 2)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 2)
     }
 
     Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
@@ -117,12 +138,15 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
       assert(HBaseConnectionCache.connectionMap.size === 1)
       assert(HBaseConnectionCache.connectionMap.iterator.next()._1
         .asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 1)
     }
 
     c2.close()
   }
 
   def testWithPressureWithoutClose() {
+    cleanEnv()
+
     class TestThread extends Runnable {
       override def run() {
         for (i <- 0 to 999) {
@@ -147,6 +171,10 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
     Thread.sleep(1000)
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 10)
+      assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
+      assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
+
       var totalRc : Int = 0
       HBaseConnectionCache.connectionMap.foreach {
         x => totalRc += x._2.refCount
@@ -161,9 +189,13 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
     }
     Thread.sleep(1000)
     assert(HBaseConnectionCache.connectionMap.size === 0)
+    assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
+    assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
   }
 
   def testWithPressureWithClose() {
+    cleanEnv()
+
     class TestThread extends Runnable {
       override def run() {
         for (i <- 0 to 999) {
@@ -189,11 +221,16 @@ class HBaseConnectionCacheSuite extends FunSuite with Logging {
 
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 10)
+      assert(HBaseConnectionCache.getStat.numTotalRequests === 100 * 1000)
+      assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 10)
     }
 
     Thread.sleep(6 * 1000)
     HBaseConnectionCache.connectionMap.synchronized {
       assert(HBaseConnectionCache.connectionMap.size === 0)
+      assert(HBaseConnectionCache.getStat.numActualConnectionsCreated === 10)
+      assert(HBaseConnectionCache.getStat.numActiveConnections === 0)
     }
   }
 }


Mime
View raw message