activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1483530 - in /activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb: DBManager.scala LevelDBClient.scala LevelDBStore.scala
Date Thu, 16 May 2013 19:34:48 GMT
Author: chirino
Date: Thu May 16 19:34:48 2013
New Revision: 1483530

URL: http://svn.apache.org/r1483530
Log:
Fix for AMQ-4296: Don't GC past LevelDB records pointed to by prepared XA transactions.

Also avoid double logging a LOG_ADD_ENTRY.

Modified:
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
    activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala?rev=1483530&r1=1483529&r2=1483530&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/DBManager.scala
Thu May 16 19:34:48 2013
@@ -792,6 +792,7 @@ class DBManager(val parent:LevelDBStore)
 
           var sub = DurableSubscription(key, sr.getTopicKey, info)
           sub.lastAckPosition = client.getAckPosition(key);
+          sub.gcPosition = sub.lastAckPosition
           parent.createSubscription(sub)
         case TRANSACTION_COLLECTION_TYPE =>
           val meta = record.getMeta

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala?rev=1483530&r1=1483529&r2=1483530&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBClient.scala
Thu May 16 19:34:48 2013
@@ -1297,10 +1297,7 @@ class LevelDBClient(store: LevelDBStore)
           index_record.setValueLength(dataLocator.len)
           batch.put(key, encodeEntryRecord(index_record.freeze()).toByteArray)
 
-          val log_data = encodeEntryRecord(log_record.freeze())
           val index_data = encodeEntryRecord(index_record.freeze()).toByteArray
-
-          appender.append(LOG_ADD_ENTRY, log_data)
           batch.put(key, index_data)
 
           for (key <- logRefKey(dataLocator.pos, log_info)) {
@@ -1322,7 +1319,7 @@ class LevelDBClient(store: LevelDBStore)
                 throw new RuntimeException("Unexpected locator type")
             }
           }
-          println(dataLocator)
+//          println(dataLocator)
 
           val el = ack.getLastMessageId.getEntryLocator.asInstanceOf[EntryLocator];
           val os = new DataByteArrayOutputStream()

Modified: activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala?rev=1483530&r1=1483529&r2=1483530&view=diff
==============================================================================
--- activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
(original)
+++ activemq/trunk/activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb/LevelDBStore.scala
Thu May 16 19:34:48 2013
@@ -76,6 +76,7 @@ object LevelDBStore extends Log {
 }
 
 case class DurableSubscription(subKey:Long, topicKey:Long, info: SubscriptionInfo) {
+  var gcPosition = 0L
   var lastAckPosition = 0L
   var cursorPosition = 0L
 }
@@ -381,6 +382,7 @@ class LevelDBStore extends LockableServi
 
         def commit(uow:DelayableUOW) = {
           store.doUpdateAckPosition(uow, sub, position)
+          sub.gcPosition = position
         }
         def prepare(uow:DelayableUOW) = {
           prev_position = sub.lastAckPosition
@@ -756,8 +758,8 @@ class LevelDBStore extends LockableServi
       var pos = lastSeq.get()
       subscriptions.synchronized {
         subscriptions.values.foreach { sub =>
-          if( sub.lastAckPosition < pos ) {
-            pos = sub.lastAckPosition
+          if( sub.gcPosition < pos ) {
+            pos = sub.gcPosition
           }
         }
         if( firstSeq != pos+1) {
@@ -775,6 +777,7 @@ class LevelDBStore extends LockableServi
         subscriptions.put((info.getClientId, info.getSubcriptionName), sub)
       }
       sub.lastAckPosition = if (retroactive) 0 else lastSeq.get()
+      sub.gcPosition = sub.lastAckPosition
       waitOn(withUow{ uow=>
         uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
         uow.countDownFuture
@@ -801,6 +804,7 @@ class LevelDBStore extends LockableServi
 
     def doUpdateAckPosition(uow: DelayableUOW, sub: DurableSubscription, position: Long)
= {
       sub.lastAckPosition = position
+      sub.gcPosition = position
       uow.updateAckPosition(sub.subKey, sub.lastAckPosition)
     }
 



Mime
View raw message