activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1485810 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: DBManager.scala LevelDBStore.scala
Date Thu, 23 May 2013 18:33:06 GMT
Author: chirino
Date: Thu May 23 18:33:06 2013
New Revision: 1485810

URL: http://svn.apache.org/r1485810
Log:
related to AMQ-4296 : Fixes leveldb store cursoring.  It was recovering too many messages
and sometimes not the right messages.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1485810&r1=1485809&r2=1485810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Thu May 23 18:33:06 2013
@@ -667,17 +667,24 @@ class DBManager(val parent:LevelDBStore)
   }
   
   def cursorMessages(key:Long, listener:MessageRecoveryListener, startPos:Long) = {
-    var nextPos = startPos;
-    client.queueCursor(key, nextPos) { msg =>
+    var lastmsgid:MessageId = null
+    client.queueCursor(key, startPos) { msg =>
       if( listener.hasSpace ) {
-        listener.recoverMessage(msg)
-        nextPos += 1
-        true
+        if( listener.recoverMessage(msg) ) {
+          lastmsgid = msg.getMessageId
+          true
+        } else {
+          false
+        }
       } else {
         false
       }
     }
-    nextPos
+    if( lastmsgid==null ) {
+      startPos
+    } else {
+      lastmsgid.getEntryLocator.asInstanceOf[EntryLocator].seq+1
+    }
   }
 
   def getXAActions(key:Long) = {

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1485810&r1=1485809&r2=1485810&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Thu May 23 18:33:06 2013
@@ -678,24 +678,26 @@ class LevelDBStore extends LockableServi
     }
 
     def recover(listener: MessageRecoveryListener): Unit = {
-      cursorPosition = db.cursorMessages(key, preparedExcluding(listener), 0)
+      cursorPosition = db.cursorMessages(key, PreparedExcluding(listener), 0)
     }
 
-    def preparedExcluding(listener: MessageRecoveryListener) = new MessageRecoveryListener
{
+    case class PreparedExcluding(listener: MessageRecoveryListener) extends MessageRecoveryListener
{
       def isDuplicate(ref: MessageId) = listener.isDuplicate(ref)
       def hasSpace = listener.hasSpace
       def recoverMessageReference(ref: MessageId) = {
         if (!preparedAcks.contains(ref)) {
           listener.recoverMessageReference(ref)
+        } else {
+          true
         }
-        true
       }
 
       def recoverMessage(message: Message) = {
         if (!preparedAcks.contains(message.getMessageId)) {
           listener.recoverMessage(message)
+        } else {
+          true
         }
-        true
       }
     }
 
@@ -704,7 +706,8 @@ class LevelDBStore extends LockableServi
     }
 
     def recoverNextMessages(maxReturned: Int, listener: MessageRecoveryListener): Unit =
{
-      cursorPosition = db.cursorMessages(key, preparedExcluding(LimitingRecoveryListener(maxReturned,
listener)), cursorPosition)
+      val excluding = PreparedExcluding(LimitingRecoveryListener(maxReturned, listener))
+      cursorPosition = db.cursorMessages(key, excluding, cursorPosition)
     }
 
     override def setBatch(id: MessageId): Unit = {
@@ -714,7 +717,7 @@ class LevelDBStore extends LockableServi
   }
 
   case class LimitingRecoveryListener(max: Int, listener: MessageRecoveryListener) extends
MessageRecoveryListener {
-    private var recovered: Int = 0
+    var recovered: Int = 0
     def hasSpace = recovered < max
     def recoverMessage(message: Message) = {
       recovered += 1;
@@ -849,7 +852,7 @@ class LevelDBStore extends LockableServi
     
     def recoverNextMessages(clientId: String, subscriptionName: String, maxReturned: Int,
listener: MessageRecoveryListener): Unit = {
       lookup(clientId, subscriptionName).foreach { sub =>
-        sub.cursorPosition = db.cursorMessages(key,  preparedExcluding(LimitingRecoveryListener(maxReturned,
listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
+        sub.cursorPosition = db.cursorMessages(key,  PreparedExcluding(LimitingRecoveryListener(maxReturned,
listener)), sub.cursorPosition.max(sub.lastAckPosition+1))
       }
     }
     



Mime
View raw message