hbase-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From te...@apache.org
Subject hbase git commit: HBASE-16638 Reduce the number of Connection's created in classes of hbase-spark module - Addendum (Weiqing Yang)
Date Tue, 11 Oct 2016 17:22:58 GMT
Repository: hbase
Updated Branches:
  refs/heads/master 9d304d3b2 -> ee6f0ddef


HBASE-16638 Reduce the number of Connection's created in classes of hbase-spark module - Addendum
(Weiqing Yang)


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/ee6f0dde
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/ee6f0dde
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/ee6f0dde

Branch: refs/heads/master
Commit: ee6f0ddef67e7840302640f7237d8308fc3eeadb
Parents: 9d304d3
Author: tedyu <yuzhihong@gmail.com>
Authored: Tue Oct 11 10:22:50 2016 -0700
Committer: tedyu <yuzhihong@gmail.com>
Committed: Tue Oct 11 10:22:50 2016 -0700

----------------------------------------------------------------------
 .../hbase/spark/HBaseConnectionCache.scala      |   4 +-
 .../hbase/spark/HBaseConnectionCacheSuit.scala  | 196 -------------------
 .../hbase/spark/HBaseConnectionCacheSuite.scala | 196 +++++++++++++++++++
 3 files changed, 198 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/ee6f0dde/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
index aa2625e..678e769 100644
--- a/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
+++ b/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCache.scala
@@ -79,14 +79,14 @@ private[spark] object HBaseConnectionCache extends Logging {
       connectionMap.foreach {
         x => {
           if(x._2.refCount < 0) {
-            logError("Bug to be fixed: negative refCount")
+            logError(s"Bug to be fixed: negative refCount of connection ${x._2}")
           }
 
           if(forceClean || ((x._2.refCount <= 0) && (tsNow - x._2.timestamp >
timeout))) {
             try{
               x._2.connection.close()
             } catch {
-              case e: IOException => logWarning("Fail to close connection", e)
+              case e: IOException => logWarning(s"Fail to close connection ${x._2}", e)
             }
             connectionMap.remove(x._1)
           }

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee6f0dde/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuit.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuit.scala
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuit.scala
deleted file mode 100644
index 83a434c..0000000
--- a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuit.scala
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.spark
-
-import java.util.concurrent.ExecutorService
-import scala.util.Random
-
-import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
-  Connection, BufferedMutatorParams, Admin}
-import org.apache.hadoop.conf.Configuration
-import org.apache.hadoop.hbase.TableName
-import org.apache.spark.Logging
-import org.scalatest.FunSuite
-
-case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) {
-  override def hashCode: Int = {
-    confId
-  }
-
-  override def equals(obj: Any): Boolean = {
-    if(!obj.isInstanceOf[HBaseConnectionKeyMocker])
-      false
-    else
-      confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId
-  }
-}
-
-class ConnectionMocker extends Connection {
-  var isClosed: Boolean = false
-
-  def getRegionLocator (tableName: TableName): RegionLocator = null
-  def getConfiguration: Configuration = null
-  def getTable (tableName: TableName): Table = null
-  def getTable(tableName: TableName, pool: ExecutorService): Table = null
-  def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
-  def getBufferedMutator (tableName: TableName): BufferedMutator = null
-  def getAdmin: Admin = null
-
-  def close(): Unit = {
-    if (isClosed)
-      throw new IllegalStateException()
-    isClosed = true
-  }
-
-  def isAborted: Boolean = true
-  def abort(why: String, e: Throwable) = {}
-}
-
-class HBaseConnectionCacheSuit extends FunSuite with Logging {
-  /*
-   * These tests must be performed sequentially as they operate with an
-   * unique running thread and resource.
-   *
-   * It looks there's no way to tell FunSuite to do so, so making those
-   * test cases normal functions which are called sequentially in a single
-   * test case.
-   */
-  test("all test cases") {
-    testBasic()
-    testWithPressureWithoutClose()
-    testWithPressureWithClose()
-  }
-
-  def testBasic() {
-    HBaseConnectionCache.setTimeout(1 * 1000)
-
-    val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
-    val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
-    val connKeyMocker2 = new HBaseConnectionKeyMocker(2)
-
-    val c1 = HBaseConnectionCache
-      .getConnection(connKeyMocker1, new ConnectionMocker)
-    val c1a = HBaseConnectionCache
-      .getConnection(connKeyMocker1a, new ConnectionMocker)
-
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 1)
-    }
-
-    val c2 = HBaseConnectionCache
-      .getConnection(connKeyMocker2, new ConnectionMocker)
-
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 2)
-    }
-
-    c1.close()
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 2)
-    }
-
-    c1a.close()
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 2)
-    }
-
-    Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 1)
-      assert(HBaseConnectionCache.connectionMap.iterator.next()._1
-        .asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
-    }
-
-    c2.close()
-  }
-
-  def testWithPressureWithoutClose() {
-    class TestThread extends Runnable {
-      override def run() {
-        for (i <- 0 to 999) {
-          val c = HBaseConnectionCache.getConnection(
-            new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
-        }
-      }
-    }
-
-    HBaseConnectionCache.setTimeout(500)
-    val threads: Array[Thread] = new Array[Thread](100)
-    for (i <- 0 to 99) {
-      threads.update(i, new Thread(new TestThread()))
-      threads(i).run()
-    }
-    try {
-      threads.foreach { x => x.join() }
-    } catch {
-      case e: InterruptedException => println(e.getMessage)
-    }
-
-    Thread.sleep(1000)
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 10)
-      var totalRc : Int = 0
-      HBaseConnectionCache.connectionMap.foreach {
-        x => totalRc += x._2.refCount
-      }
-      assert(totalRc === 100 * 1000)
-      HBaseConnectionCache.connectionMap.foreach {
-        x => {
-          x._2.refCount = 0
-          x._2.timestamp = System.currentTimeMillis() - 1000
-        }
-      }
-    }
-    Thread.sleep(1000)
-    assert(HBaseConnectionCache.connectionMap.size === 0)
-  }
-
-  def testWithPressureWithClose() {
-    class TestThread extends Runnable {
-      override def run() {
-        for (i <- 0 to 999) {
-          val c = HBaseConnectionCache.getConnection(
-            new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
-          Thread.`yield`()
-          c.close()
-        }
-      }
-    }
-
-    HBaseConnectionCache.setTimeout(3 * 1000)
-    val threads: Array[Thread] = new Array[Thread](100)
-    for (i <- threads.indices) {
-      threads.update(i, new Thread(new TestThread()))
-      threads(i).run()
-    }
-    try {
-      threads.foreach { x => x.join() }
-    } catch {
-      case e: InterruptedException => println(e.getMessage)
-    }
-
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 10)
-    }
-
-    Thread.sleep(6 * 1000)
-    HBaseConnectionCache.connectionMap.synchronized {
-      assert(HBaseConnectionCache.connectionMap.size === 0)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/hbase/blob/ee6f0dde/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
----------------------------------------------------------------------
diff --git a/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
new file mode 100644
index 0000000..066d534
--- /dev/null
+++ b/hbase-spark/src/test/scala/org/apache/hadoop/hbase/spark/HBaseConnectionCacheSuite.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.spark
+
+import java.util.concurrent.ExecutorService
+import scala.util.Random
+
+import org.apache.hadoop.hbase.client.{BufferedMutator, Table, RegionLocator,
+  Connection, BufferedMutatorParams, Admin}
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.hbase.TableName
+import org.apache.spark.Logging
+import org.scalatest.FunSuite
+
+case class HBaseConnectionKeyMocker (confId: Int) extends HBaseConnectionKey (null) {
+  override def hashCode: Int = {
+    confId
+  }
+
+  override def equals(obj: Any): Boolean = {
+    if(!obj.isInstanceOf[HBaseConnectionKeyMocker])
+      false
+    else
+      confId == obj.asInstanceOf[HBaseConnectionKeyMocker].confId
+  }
+}
+
+class ConnectionMocker extends Connection {
+  var isClosed: Boolean = false
+
+  def getRegionLocator (tableName: TableName): RegionLocator = null
+  def getConfiguration: Configuration = null
+  def getTable (tableName: TableName): Table = null
+  def getTable(tableName: TableName, pool: ExecutorService): Table = null
+  def getBufferedMutator (params: BufferedMutatorParams): BufferedMutator = null
+  def getBufferedMutator (tableName: TableName): BufferedMutator = null
+  def getAdmin: Admin = null
+
+  def close(): Unit = {
+    if (isClosed)
+      throw new IllegalStateException()
+    isClosed = true
+  }
+
+  def isAborted: Boolean = true
+  def abort(why: String, e: Throwable) = {}
+}
+
+class HBaseConnectionCacheSuite extends FunSuite with Logging {
+  /*
+   * These tests must be performed sequentially as they operate with an
+   * unique running thread and resource.
+   *
+   * It looks there's no way to tell FunSuite to do so, so making those
+   * test cases normal functions which are called sequentially in a single
+   * test case.
+   */
+  test("all test cases") {
+    testBasic()
+    testWithPressureWithoutClose()
+    testWithPressureWithClose()
+  }
+
+  def testBasic() {
+    HBaseConnectionCache.setTimeout(1 * 1000)
+
+    val connKeyMocker1 = new HBaseConnectionKeyMocker(1)
+    val connKeyMocker1a = new HBaseConnectionKeyMocker(1)
+    val connKeyMocker2 = new HBaseConnectionKeyMocker(2)
+
+    val c1 = HBaseConnectionCache
+      .getConnection(connKeyMocker1, new ConnectionMocker)
+    val c1a = HBaseConnectionCache
+      .getConnection(connKeyMocker1a, new ConnectionMocker)
+
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 1)
+    }
+
+    val c2 = HBaseConnectionCache
+      .getConnection(connKeyMocker2, new ConnectionMocker)
+
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 2)
+    }
+
+    c1.close()
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 2)
+    }
+
+    c1a.close()
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 2)
+    }
+
+    Thread.sleep(3 * 1000) // Leave housekeeping thread enough time
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 1)
+      assert(HBaseConnectionCache.connectionMap.iterator.next()._1
+        .asInstanceOf[HBaseConnectionKeyMocker].confId === 2)
+    }
+
+    c2.close()
+  }
+
+  def testWithPressureWithoutClose() {
+    class TestThread extends Runnable {
+      override def run() {
+        for (i <- 0 to 999) {
+          val c = HBaseConnectionCache.getConnection(
+            new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
+        }
+      }
+    }
+
+    HBaseConnectionCache.setTimeout(500)
+    val threads: Array[Thread] = new Array[Thread](100)
+    for (i <- 0 to 99) {
+      threads.update(i, new Thread(new TestThread()))
+      threads(i).run()
+    }
+    try {
+      threads.foreach { x => x.join() }
+    } catch {
+      case e: InterruptedException => println(e.getMessage)
+    }
+
+    Thread.sleep(1000)
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 10)
+      var totalRc : Int = 0
+      HBaseConnectionCache.connectionMap.foreach {
+        x => totalRc += x._2.refCount
+      }
+      assert(totalRc === 100 * 1000)
+      HBaseConnectionCache.connectionMap.foreach {
+        x => {
+          x._2.refCount = 0
+          x._2.timestamp = System.currentTimeMillis() - 1000
+        }
+      }
+    }
+    Thread.sleep(1000)
+    assert(HBaseConnectionCache.connectionMap.size === 0)
+  }
+
+  def testWithPressureWithClose() {
+    class TestThread extends Runnable {
+      override def run() {
+        for (i <- 0 to 999) {
+          val c = HBaseConnectionCache.getConnection(
+            new HBaseConnectionKeyMocker(Random.nextInt(10)), new ConnectionMocker)
+          Thread.`yield`()
+          c.close()
+        }
+      }
+    }
+
+    HBaseConnectionCache.setTimeout(3 * 1000)
+    val threads: Array[Thread] = new Array[Thread](100)
+    for (i <- threads.indices) {
+      threads.update(i, new Thread(new TestThread()))
+      threads(i).run()
+    }
+    try {
+      threads.foreach { x => x.join() }
+    } catch {
+      case e: InterruptedException => println(e.getMessage)
+    }
+
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 10)
+    }
+
+    Thread.sleep(6 * 1000)
+    HBaseConnectionCache.connectionMap.synchronized {
+      assert(HBaseConnectionCache.connectionMap.size === 0)
+    }
+  }
+}


Mime
View raw message