activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1505805 - /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Date Mon, 22 Jul 2013 20:53:28 GMT
Author: tabish
Date: Mon Jul 22 20:53:27 2013
New Revision: 1505805

URL: http://svn.apache.org/r1505805
Log:
[LevelDB]

https://issues.apache.org/jira/browse/AMQ-4296

Fixes remainder of failing unit tests.  The LevelDB wasn't incrementing or decrementing reference
counts on messages added to the store which causes the expectations of certain memory limit
based tests to fail as the memory usage was being updates after the store add instead of during
so a message could get placed into the batch list of a cursor when we did not expect that
it would.  This could also cause a browse to return fewer message than we want as the in memory
messages would top out the usage limit so we'd never page in one batch of messages. 

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1505805&r1=1505804&r2=1505805&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Mon Jul 22 20:53:27 2013
@@ -624,10 +624,13 @@ class LevelDBStore extends LockableServi
 
     def doAdd(uow: DelayableUOW, message: Message, delay:Boolean): CountDownFuture[AnyRef]
= {
       val seq = lastSeq.incrementAndGet()
+      message.incrementReferenceCount()
+      uow.addCompleteListener({
+        message.decrementReferenceCount()
+      })
       uow.enqueue(key, seq, message, delay)
     }
 
-
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message) = asyncAddQueueMessage(context,
message, false)
     override def asyncAddQueueMessage(context: ConnectionContext, message: Message, delay:
Boolean): Future[AnyRef] = {
       message.getMessageId.setEntryLocator(null)
@@ -718,7 +721,6 @@ class LevelDBStore extends LockableServi
     }
   }
 
-
   def getTopicGCPositions = {
     import collection.JavaConversions._
     val topics = this.synchronized {



Mime
View raw message