activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: https://issues.apache.org/jira/browse/AMQ-5125
Date Mon, 07 Jul 2014 21:53:57 GMT
Repository: activemq
Updated Branches:
  refs/heads/trunk c6d0aaa81 -> e62e90aba


https://issues.apache.org/jira/browse/AMQ-5125

Fix for potential deadlock when external classes synchronize on the
LevelDBStore instance which can deadlock the hawtDispatch runner thread
if a task also attempts to take the lock to protect some mutable state
values.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e62e90ab
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e62e90ab
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e62e90ab

Branch: refs/heads/trunk
Commit: e62e90abafdfb42a2c74f07a333ed26365936b54
Parents: c6d0aaa
Author: Timothy Bish <tabish121@gmail.com>
Authored: Mon Jul 7 17:53:46 2014 -0400
Committer: Timothy Bish <tabish121@gmail.com>
Committed: Mon Jul 7 17:53:46 2014 -0400

----------------------------------------------------------------------
 .../apache/activemq/leveldb/LevelDBStore.scala  | 20 +++++++++++---------
 1 file changed, 11 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/e62e90ab/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
----------------------------------------------------------------------
diff --git a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
index 4d33a2c..01d5170 100644
--- a/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
+++ b/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
@@ -184,6 +184,8 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
   val topicsById = collection.mutable.HashMap[Long, LevelDBStore#LevelDBTopicMessageStore]()
   val plists = collection.mutable.HashMap[String, LevelDBStore#LevelDBPList]()
 
+  private val lock = new Object();
+
   def check_running = {
     if( this.isStopped ) {
       throw new SuppressReplyException("Store has been stopped")
@@ -540,12 +542,12 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
 
 
   def getPList(name: String): PList = {
-    this.synchronized(plists.get(name)).getOrElse(db.createPList(name))
+    lock.synchronized(plists.get(name)).getOrElse(db.createPList(name))
   }
 
   def createPList(name: String, key: Long):LevelDBStore#LevelDBPList = {
     var rc = new LevelDBPList(name, key)
-    this.synchronized {
+    lock.synchronized {
       plists.put(name, rc)
     }
     rc
@@ -573,30 +575,30 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
   }
 
   def createQueueMessageStore(destination: ActiveMQQueue):LevelDBStore#LevelDBMessageStore
= {
-    this.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
+    lock.synchronized(queues.get(destination)).getOrElse(db.createQueueStore(destination))
   }
 
   def createQueueMessageStore(destination: ActiveMQQueue, key: Long):LevelDBStore#LevelDBMessageStore
= {
     var rc = new LevelDBMessageStore(destination, key)
-    this.synchronized {
+    lock.synchronized {
       queues.put(destination, rc)
     }
     rc
   }
 
-  def removeQueueMessageStore(destination: ActiveMQQueue): Unit = this synchronized {
+  def removeQueueMessageStore(destination: ActiveMQQueue): Unit = lock synchronized {
     queues.remove(destination).foreach { store=>
       db.destroyQueueStore(store.key)
     }
   }
 
   def createTopicMessageStore(destination: ActiveMQTopic):LevelDBStore#LevelDBTopicMessageStore
= {
-    this.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
+    lock.synchronized(topics.get(destination)).getOrElse(db.createTopicStore(destination))
   }
 
   def createTopicMessageStore(destination: ActiveMQTopic, key: Long):LevelDBStore#LevelDBTopicMessageStore
= {
     var rc = new LevelDBTopicMessageStore(destination, key)
-    this synchronized {
+    lock synchronized {
       topics.put(destination, rc)
       topicsById.put(key, rc)
     }
@@ -777,7 +779,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
   // This gts called when the store is first loading up, it restores
   // the existing durable subs..
   def createSubscription(sub:DurableSubscription) = {
-    this.synchronized(topicsById.get(sub.topicKey)) match {
+    lock.synchronized(topicsById.get(sub.topicKey)) match {
       case Some(topic) =>
         topic.synchronized {
           topic.subscriptions.put((sub.info.getClientId, sub.info.getSubcriptionName), sub)
@@ -790,7 +792,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware
with P
 
   def getTopicGCPositions = {
     import collection.JavaConversions._
-    val topics = this.synchronized {
+    val topics = lock.synchronized {
       new ArrayList(topicsById.values())
     }
     topics.flatMap(_.gcPosition).toSeq


Mime
View raw message