activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chir...@apache.org
Subject svn commit: r1222970 - /activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Date Sat, 24 Dec 2011 13:32:26 GMT
Author: chirino
Date: Sat Dec 24 13:32:26 2011
New Revision: 1222970

URL: http://svn.apache.org/viewvc?rev=1222970&view=rev
Log:
Dry up log ref accounting, also fix bug where log refs were not being reduced when a destination
is deleted while holding messages.

Modified:
    activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala

Modified: activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
URL: http://svn.apache.org/viewvc/activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala?rev=1222970&r1=1222969&r2=1222970&view=diff
==============================================================================
--- activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
(original)
+++ activemq/activemq-apollo/trunk/apollo-leveldb/src/main/scala/org/apache/activemq/apollo/broker/store/leveldb/leveldb/LevelDBClient.scala
Sat Dec 24 13:32:26 2011
@@ -307,38 +307,27 @@ class LevelDBClient(store: LevelDBStore)
                     index.put(encode_key(queue_entry_prefix, record.queue_key, record.entry_seq),
data)
                     
                     // Figure out which log file this message reference is pointing at..
-                    val log_key = (if(record.message_locator!=null) {
+                    val pos = (if(record.message_locator!=null) {
                       Some(decode_locator(record.message_locator)._1)
                     } else {
                       index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
-                    }).flatMap(log.log_info(_)).map(_.position)
+                    })
                     
                     // Increment it.
-                    log_key.foreach { log_key=>
-                      log_refs.getOrElseUpdate(log_key, new LongCounter()).incrementAndGet()
-                    }
-                    
+                    pos.foreach(log_ref_increment(_))
                   case LOG_REMOVE_QUEUE_ENTRY =>
 
                     index.get(data, new ReadOptions).foreach { value=>
                       val record: QueueEntryRecord = value
   
                       // Figure out which log file this message reference is pointing at..
-                      val log_key = (if(record.message_locator!=null) {
+                      val pos = (if(record.message_locator!=null) {
                         Some(decode_locator(record.message_locator)._1)
                       } else {
                         index.get(encode_key(message_prefix, record.message_key)).map(decode_locator(_)._1)
-                      }).flatMap(log.log_info(_)).map(_.position)
-                      
-                      // Decrement it.
-                      log_key.foreach { log_key=>
-                        log_refs.get(log_key).foreach{ counter=>
-                          if( counter.decrementAndGet() == 0 ) {
-                            log_refs.remove(log_key)
-                          }
-                        }
-                      }
-                      
+                      })
+                      pos.foreach(log_ref_decrement(_))
+
                       index.delete(data)
                     }
                     
@@ -552,6 +541,22 @@ class LevelDBClient(store: LevelDBStore)
     callback.run
   }
 
+  def log_ref_decrement(pos: Long) {
+    log.log_info(pos).foreach { log_info =>
+      log_refs.get(log_info.position).foreach { counter =>
+        if (counter.decrementAndGet() == 0) {
+          log_refs.remove(log_info.position)
+        }
+      }
+    }
+  }
+
+  def log_ref_increment(pos: Long) {
+    log.log_info(pos).foreach { log_info =>
+      log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
+    }
+  }
+
   def remove_queue(queue_key: Long, callback:Runnable) = {
     retry_using_index {
       log.appender { appender =>
@@ -560,8 +565,14 @@ class LevelDBClient(store: LevelDBStore)
         ro.verifyChecksums(verify_checksums)
         appender.append(LOG_REMOVE_QUEUE, encode_vlong(queue_key))
         index.delete(encode_key(queue_prefix, queue_key))
-        index.cursor_keys_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { key=>
+        index.cursor_prefixed(encode_key(queue_entry_prefix, queue_key), ro) { (key, value)=>
           index.delete(key)
+
+          // Figure out what log file that message entry was in so we can,
+          // decrement the log file reference.
+          val entry_record:QueueEntryRecord = value
+          val pos = decode_locator(entry_record.getMessageLocator)._1
+          log_ref_decrement(pos)
           true
         }
       }
@@ -617,14 +628,7 @@ class LevelDBClient(store: LevelDBStore)
                 val key = encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq)
                 appender.append(LOG_REMOVE_QUEUE_ENTRY, key)
                 batch.delete(key)
-
-                log.log_info(pos).foreach { log_info=>
-                  log_refs.get(log_info.position).foreach{ counter=>
-                    if( counter.decrementAndGet() == 0 ) {
-                      log_refs.remove(log_info.position)
-                    }
-                  }
-                }
+                log_ref_decrement(pos)
               }
 
               action.enqueues.foreach { entry =>
@@ -634,9 +638,7 @@ class LevelDBClient(store: LevelDBStore)
                 batch.put(encode_key(queue_entry_prefix, entry.queue_key, entry.entry_seq),
encoded)
                 
                 // Increment it.
-                log.log_info(pos).foreach { log_info=>
-                  log_refs.getOrElseUpdate(log_info.position, new LongCounter()).incrementAndGet()
-                }
+                log_ref_increment(pos)
                 
               }
             }



Mime
View raw message